티스토리 뷰

시리즈/Stream Processing

Kinesis Lease

빅또리 2023. 11. 19. 17:29

 

LeaseCoordinator 

아래 두 작업을 주기적으로 실행하는 스레드. 

worker랑 1:1...? (예) ShardRecordProcessor-0001에 LeaseCoordinator-0001)

 

- LeaseTaker

owner가 없는, 혹은 다른 worker가 오랜 기간 renew하는 데 실패한 lease를 가져감. 

- LeaseRenewer

lease 갱신 (TEP 토큰 reclaim이랑 비슷한 듯)

 

 

2023-11-19 14:55:44.104 [LeaseCoordinator-0001]  INFO  s.a.k.l.dynamodb.DynamoDBLeaseTaker - {} Worker ed8f5dba-03ce-4082-93d4-264518669baa saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
2023-11-19 14:55:44.184 [LeaseCoordinator-0001]  INFO  s.a.k.l.d.DynamoDBLeaseRefresher - {} Transferred lease shardId-000000000000 ownership from 35947589-4a4c-42a3-bb0e-700cd9298312 to ed8f5dba-03ce-4082-93d4-264518669baa
2023-11-19 14:55:44.184 [LeaseCoordinator-0001]  INFO  s.a.k.l.dynamodb.DynamoDBLeaseTaker - {} Worker ed8f5dba-03ce-4082-93d4-264518669baa successfully took 1 leases: shardId-000000000000

 

 

- LeaseCleanupManager

주기적으로 만료된/닫힌 샤드의 lease 정리

- PeriodicShardSyncManager

주기적으로 샤드 할당/동기화

2023-11-19 14:58:23.793 [main]  INFO  s.a.k.c.DiagnosticEventLogger - {} Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647)
2023-11-19 14:58:23.964 [pool-14-thread-1]  INFO  s.a.k.leases.LeaseCleanupManager - {} Number of pending leases to clean before the scan : 0
2023-11-19 14:58:23.986 [pool-13-thread-1]  INFO  s.a.k.c.PeriodicShardSyncManager - {} WorkerId ed8f5dba-03ce-4082-93d4-264518669baa is leader, running the periodic shard sync task
2023-11-19 14:58:24.008 [pool-13-thread-1]  INFO  s.a.k.c.PeriodicShardSyncManager - {} Skipping shard sync for blackbox-stream-local due to the reason - Hash Ranges are complete for blackbox-stream-local
2023-11-19 14:58:25.799 [main]  INFO  s.a.kinesis.coordinator.Scheduler - {} Current stream shard assignments: shardId-000000000000
2023-11-19 14:58:25.800 [main]  INFO  s.a.kinesis.coordinator.Scheduler - {} Sleeping ...

 

 

 

ShardProcessor

데이터 레코드를 fetch해와서 실질적으로 처리하는 스레드.

For any running Kinesis Client Library (KCL) application, a shard only has one owner.

2023-11-19 14:47:00.777 [ShardRecordProcessor-0001]  INFO  com.dealicious.CommandApplication - {} start processing >>>>> 현재 실행 중인 스레드: ShardRecordProcessor-0001
2023-11-19 14:47:01.027 [ShardRecordProcessor-0001]  INFO  s.a.k.l.d.DynamoDBLeaseRenewer - {} Updating lease from Lease(leaseKey=shardId-000000000000, leaseOwner=35947589-4a4c-42a3-bb0e-700cd9298312, leaseCounter=159, concurrencyToken=6ab791e2-82bc-43a6-b711-5808a005a8cd, lastCounterIncrementNanos=136635543906361, checkpoint={SequenceNumber: LATEST,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=0, endingHashKey=340282366920938463463374607431768211455)) to Lease(leaseKey=shardId-000000000000, leaseOwner=35947589-4a4c-42a3-bb0e-700cd9298312, leaseCounter=159, concurrencyToken=6ab791e2-82bc-43a6-b711-5808a005a8cd, lastCounterIncrementNanos=136635543906361, checkpoint={SequenceNumber: 49646570394637888094484467868330817441454198745418694658,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=0, endingHashKey=340282366920938463463374607431768211455))
2023-11-19 14:47:01.062 [ShardRecordProcessor-0001]  INFO  s.a.k.l.d.DynamoDBLeaseRefresher - {} Updated lease shardId-000000000000.

 

 

 

 

 

Lease Table

- dynamodb에 applicationName과 동일한 테이블로 생성.

checkpoint

이 샤드의 가장 최근 checkpoint (offset 같은거)
hashKeyRagne EndingHashKey PeriodicShardSyncManager에서 주기적으로 sync하면서 lease 테이블에서 빠져있는 샤드 찾고, lease 만드는데 사용됨.
StartingHashKey
leaseCounter

lease 버저닝에 사용됨. 다른 worker가 가져가는지 판별하는데 사용됨.
ownerSwitchesSinceCheckpoint 마지막 checkpoint가 써진 이후로 얼마나 lease 소유한 worker가 바뀌었는지 횟수. 
checkpointSubSequenceNumber producer 라이브러리에서 aggregate 기능을 사용할 때 필요
leaseKey 각 샤드에 대한 lease id
leaseOwner lease 점유중인 worker

 

 

 

 

테스트 해보기

 

0. 스트림 샤드 개수 4개로 실행

 

1. consumer app1 실행

- workerId => 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3 (leader)

Worker 중 Leader

- leaseTable

leaseKey leaseOwner
shardId-000000000000 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3
shardId-000000000001 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3
shardId-000000000002 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3
shardId-000000000003 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3

 

2. consumer app2 실행

- workerId => 6b82a5c9-659d-4766-9bac-69b09fcfcd4d

- leaseTable

leaseKey leaseOwner
shardId-000000000000 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3
shardId-000000000001 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3
shardId-000000000002 6b82a5c9-659d-4766-9bac-69b09fcfcd4d
shardId-000000000003 6b82a5c9-659d-4766-9bac-69b09fcfcd4d

 

 

3. consumer app3 실행

- workerId => ec22ef68-e781-4983-bf85-8256d623cb53

 

- leaseTable

leaseKey leaseOwner
shardId-000000000000 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3
shardId-000000000001 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3

shardId-000000000002 ec22ef68-e781-4983-bf85-8256d623cb53
shardId-000000000003 6b82a5c9-659d-4766-9bac-69b09fcfcd4d

 

 

결과 => worker별로 가져가는 샤드 개수가 적절히 분배 되도록, 새로운 worker가 뜨면 lease steal이 이루어짐.

 

 

 

References

https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-scaling.html

 

kinesis conusmer trouble shooting

https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html

 

lease table

https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html#shared-throughput-kcl-consumers-leasetable

댓글
공지사항
최근에 올라온 글