티스토리 뷰
LeaseCoordinator
아래 두 작업을 주기적으로 실행하는 스레드.
worker랑 1:1...? (예) ShardRecordProcessor-0001에 LeaseCoordinator-0001)
- LeaseTaker
owner가 없는, 혹은 다른 worker가 오랜 기간 renew하는 데 실패한 lease를 가져감.
- LeaseRenewer
lease 갱신 (TEP 토큰 reclaim이랑 비슷한 듯)
2023-11-19 14:55:44.104 [LeaseCoordinator-0001] INFO s.a.k.l.dynamodb.DynamoDBLeaseTaker - {} Worker ed8f5dba-03ce-4082-93d4-264518669baa saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
2023-11-19 14:55:44.184 [LeaseCoordinator-0001] INFO s.a.k.l.d.DynamoDBLeaseRefresher - {} Transferred lease shardId-000000000000 ownership from 35947589-4a4c-42a3-bb0e-700cd9298312 to ed8f5dba-03ce-4082-93d4-264518669baa
2023-11-19 14:55:44.184 [LeaseCoordinator-0001] INFO s.a.k.l.dynamodb.DynamoDBLeaseTaker - {} Worker ed8f5dba-03ce-4082-93d4-264518669baa successfully took 1 leases: shardId-000000000000
- LeaseCleanupManager
주기적으로 만료된/닫힌 샤드의 lease 정리
- PeriodicShardSyncManager
주기적으로 샤드 할당/동기화
2023-11-19 14:58:23.793 [main] INFO s.a.k.c.DiagnosticEventLogger - {} Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647)
2023-11-19 14:58:23.964 [pool-14-thread-1] INFO s.a.k.leases.LeaseCleanupManager - {} Number of pending leases to clean before the scan : 0
2023-11-19 14:58:23.986 [pool-13-thread-1] INFO s.a.k.c.PeriodicShardSyncManager - {} WorkerId ed8f5dba-03ce-4082-93d4-264518669baa is leader, running the periodic shard sync task
2023-11-19 14:58:24.008 [pool-13-thread-1] INFO s.a.k.c.PeriodicShardSyncManager - {} Skipping shard sync for blackbox-stream-local due to the reason - Hash Ranges are complete for blackbox-stream-local
2023-11-19 14:58:25.799 [main] INFO s.a.kinesis.coordinator.Scheduler - {} Current stream shard assignments: shardId-000000000000
2023-11-19 14:58:25.800 [main] INFO s.a.kinesis.coordinator.Scheduler - {} Sleeping ...
ShardProcessor
데이터 레코드를 fetch해와서 실질적으로 처리하는 스레드.
For any running Kinesis Client Library (KCL) application, a shard only has one owner.
2023-11-19 14:47:00.777 [ShardRecordProcessor-0001] INFO com.dealicious.CommandApplication - {} start processing >>>>> 현재 실행 중인 스레드: ShardRecordProcessor-0001
2023-11-19 14:47:01.027 [ShardRecordProcessor-0001] INFO s.a.k.l.d.DynamoDBLeaseRenewer - {} Updating lease from Lease(leaseKey=shardId-000000000000, leaseOwner=35947589-4a4c-42a3-bb0e-700cd9298312, leaseCounter=159, concurrencyToken=6ab791e2-82bc-43a6-b711-5808a005a8cd, lastCounterIncrementNanos=136635543906361, checkpoint={SequenceNumber: LATEST,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=0, endingHashKey=340282366920938463463374607431768211455)) to Lease(leaseKey=shardId-000000000000, leaseOwner=35947589-4a4c-42a3-bb0e-700cd9298312, leaseCounter=159, concurrencyToken=6ab791e2-82bc-43a6-b711-5808a005a8cd, lastCounterIncrementNanos=136635543906361, checkpoint={SequenceNumber: 49646570394637888094484467868330817441454198745418694658,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=0, endingHashKey=340282366920938463463374607431768211455))
2023-11-19 14:47:01.062 [ShardRecordProcessor-0001] INFO s.a.k.l.d.DynamoDBLeaseRefresher - {} Updated lease shardId-000000000000.
Lease Table
- dynamodb에 applicationName과 동일한 테이블로 생성.
checkpoint |
이 샤드의 가장 최근 checkpoint (offset 같은거) | |
hashKeyRagne | EndingHashKey | PeriodicShardSyncManager에서 주기적으로 sync하면서 lease 테이블에서 빠져있는 샤드 찾고, lease 만드는데 사용됨. |
StartingHashKey | ||
leaseCounter |
lease 버저닝에 사용됨. 다른 worker가 가져가는지 판별하는데 사용됨. | |
ownerSwitchesSinceCheckpoint | 마지막 checkpoint가 써진 이후로 얼마나 lease 소유한 worker가 바뀌었는지 횟수. | |
checkpointSubSequenceNumber | producer 라이브러리에서 aggregate 기능을 사용할 때 필요 | |
leaseKey | 각 샤드에 대한 lease id | |
leaseOwner | lease 점유중인 worker |
테스트 해보기
0. 스트림 샤드 개수 4개로 실행
1. consumer app1 실행
- workerId => 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3 (leader)
- leaseTable
leaseKey | leaseOwner |
shardId-000000000000 | 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3 |
shardId-000000000001 | 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3 |
shardId-000000000002 | 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3 |
shardId-000000000003 | 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3 |
2. consumer app2 실행
- workerId => 6b82a5c9-659d-4766-9bac-69b09fcfcd4d
- leaseTable
leaseKey | leaseOwner |
shardId-000000000000 | 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3 |
shardId-000000000001 | 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3 |
shardId-000000000002 | 6b82a5c9-659d-4766-9bac-69b09fcfcd4d |
shardId-000000000003 | 6b82a5c9-659d-4766-9bac-69b09fcfcd4d |
3. consumer app3 실행
- workerId => ec22ef68-e781-4983-bf85-8256d623cb53
- leaseTable
leaseKey | leaseOwner |
shardId-000000000000 | 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3 |
shardId-000000000001 | 0c00f96f-9ab4-4ab0-bb29-f6a3c45480a3![]() |
shardId-000000000002 | ec22ef68-e781-4983-bf85-8256d623cb53 |
shardId-000000000003 | 6b82a5c9-659d-4766-9bac-69b09fcfcd4d |
결과 => worker별로 가져가는 샤드 개수가 적절히 분배 되도록, 새로운 worker가 뜨면 lease steal이 이루어짐.
References
https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-scaling.html
kinesis conusmer trouble shooting
https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html
lease table
'시리즈 > Stream Processing' 카테고리의 다른 글
이벤트 스트림 parallel processing (kafka, kinesis, axon) (1) | 2023.11.19 |
---|---|
kafka rebalancing (1) | 2023.11.19 |
mysql 쿼리 처리 방식 - 스트리밍 vs 버퍼링 (0) | 2022.12.10 |
iterable, iterator 그리고 generator (0) | 2022.08.08 |
iterator 패턴 (0) | 2022.07.31 |