28
28
package checkpoint
29
29
30
30
import (
31
+ "errors"
32
+ "fmt"
31
33
"time"
32
34
33
35
"github.com/aws/aws-sdk-go/aws"
@@ -61,6 +63,7 @@ type DynamoCheckpoint struct {
61
63
svc dynamodbiface.DynamoDBAPI
62
64
kclConfig * config.KinesisClientLibConfiguration
63
65
Retries int
66
+ lastLeaseSync time.Time
64
67
}
65
68
66
69
func NewDynamoCheckpoint (kclConfig * config.KinesisClientLibConfiguration ) * DynamoCheckpoint {
@@ -124,8 +127,22 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
124
127
return err
125
128
}
126
129
130
+ isClaimRequestExpired := shard .IsClaimRequestExpired (checkpointer .kclConfig )
131
+
132
+ var claimRequest string
133
+ if checkpointer .kclConfig .EnableLeaseStealing {
134
+ if currentCheckpointClaimRequest , ok := currentCheckpoint [ClaimRequestKey ]; ok && currentCheckpointClaimRequest .S != nil {
135
+ claimRequest = * currentCheckpointClaimRequest .S
136
+ if newAssignTo != claimRequest && ! isClaimRequestExpired {
137
+ checkpointer .log .Debugf ("another worker: %s has a claim on this shard. Not going to renew the lease" , claimRequest )
138
+ return errors .New (ErrShardClaimed )
139
+ }
140
+ }
141
+ }
142
+
127
143
assignedVar , assignedToOk := currentCheckpoint [LeaseOwnerKey ]
128
144
leaseVar , leaseTimeoutOk := currentCheckpoint [LeaseTimeoutKey ]
145
+
129
146
var conditionalExpression string
130
147
var expressionAttributeValues map [string ]* dynamodb.AttributeValue
131
148
@@ -140,8 +157,14 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
140
157
return err
141
158
}
142
159
143
- if time .Now ().UTC ().Before (currentLeaseTimeout ) && assignedTo != newAssignTo {
144
- return ErrLeaseNotAcquired {"current lease timeout not yet expired" }
160
+ if checkpointer .kclConfig .EnableLeaseStealing {
161
+ if time .Now ().UTC ().Before (currentLeaseTimeout ) && assignedTo != newAssignTo && ! isClaimRequestExpired {
162
+ return ErrLeaseNotAcquired {"current lease timeout not yet expired" }
163
+ }
164
+ } else {
165
+ if time .Now ().UTC ().Before (currentLeaseTimeout ) && assignedTo != newAssignTo {
166
+ return ErrLeaseNotAcquired {"current lease timeout not yet expired" }
167
+ }
145
168
}
146
169
147
170
checkpointer .log .Debugf ("Attempting to get a lock for shard: %s, leaseTimeout: %s, assignedTo: %s, newAssignedTo: %s" , shard .ID , currentLeaseTimeout , assignedTo , newAssignTo )
@@ -175,9 +198,21 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
175
198
marshalledCheckpoint [ParentShardIdKey ] = & dynamodb.AttributeValue {S : aws .String (shard .ParentShardId )}
176
199
}
177
200
178
- if shard .GetCheckpoint () != "" {
201
+ if checkpoint := shard .GetCheckpoint (); checkpoint != "" {
179
202
marshalledCheckpoint [SequenceNumberKey ] = & dynamodb.AttributeValue {
180
- S : aws .String (shard .GetCheckpoint ()),
203
+ S : aws .String (checkpoint ),
204
+ }
205
+ }
206
+
207
+ if checkpointer .kclConfig .EnableLeaseStealing {
208
+ if claimRequest != "" && claimRequest == newAssignTo && ! isClaimRequestExpired {
209
+ if expressionAttributeValues == nil {
210
+ expressionAttributeValues = make (map [string ]* dynamodb.AttributeValue )
211
+ }
212
+ conditionalExpression = conditionalExpression + " AND ClaimRequest = :claim_request"
213
+ expressionAttributeValues [":claim_request" ] = & dynamodb.AttributeValue {
214
+ S : & claimRequest ,
215
+ }
181
216
}
182
217
}
183
218
@@ -199,7 +234,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign
199
234
200
235
// CheckpointSequence writes a checkpoint at the designated sequence ID
201
236
func (checkpointer * DynamoCheckpoint ) CheckpointSequence (shard * par.ShardStatus ) error {
202
- leaseTimeout := shard .LeaseTimeout .UTC ().Format (time .RFC3339 )
237
+ leaseTimeout := shard .GetLeaseTimeout () .UTC ().Format (time .RFC3339 )
203
238
marshalledCheckpoint := map [string ]* dynamodb.AttributeValue {
204
239
LeaseKeyKey : {
205
240
S : aws .String (shard .ID ),
@@ -208,7 +243,7 @@ func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus)
208
243
S : aws .String (shard .GetCheckpoint ()),
209
244
},
210
245
LeaseOwnerKey : {
211
- S : aws .String (shard .AssignedTo ),
246
+ S : aws .String (shard .GetLeaseOwner () ),
212
247
},
213
248
LeaseTimeoutKey : {
214
249
S : aws .String (leaseTimeout ),
@@ -239,6 +274,16 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er
239
274
if assignedTo , ok := checkpoint [LeaseOwnerKey ]; ok {
240
275
shard .SetLeaseOwner (aws .StringValue (assignedTo .S ))
241
276
}
277
+
278
+ // Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming
279
+ if leaseTimeout , ok := checkpoint [LeaseTimeoutKey ]; ok && leaseTimeout .S != nil {
280
+ currentLeaseTimeout , err := time .Parse (time .RFC3339 , aws .StringValue (leaseTimeout .S ))
281
+ if err != nil {
282
+ return err
283
+ }
284
+ shard .LeaseTimeout = currentLeaseTimeout
285
+ }
286
+
242
287
return nil
243
288
}
244
289
@@ -265,13 +310,148 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error {
265
310
},
266
311
},
267
312
UpdateExpression : aws .String ("remove " + LeaseOwnerKey ),
313
+ ExpressionAttributeValues : map [string ]* dynamodb.AttributeValue {
314
+ ":assigned_to" : {
315
+ S : aws .String (checkpointer .kclConfig .WorkerID ),
316
+ },
317
+ },
318
+ ConditionExpression : aws .String ("AssignedTo = :assigned_to" ),
268
319
}
269
320
270
321
_ , err := checkpointer .svc .UpdateItem (input )
271
322
272
323
return err
273
324
}
274
325
326
+ // ListActiveWorkers returns a map of workers and their shards
327
+ func (checkpointer * DynamoCheckpoint ) ListActiveWorkers (shardStatus map [string ]* par.ShardStatus ) (map [string ][]* par.ShardStatus , error ) {
328
+ err := checkpointer .syncLeases (shardStatus )
329
+ if err != nil {
330
+ return nil , err
331
+ }
332
+
333
+ workers := map [string ][]* par.ShardStatus {}
334
+ for _ , shard := range shardStatus {
335
+ if shard .GetCheckpoint () == ShardEnd {
336
+ continue
337
+ }
338
+
339
+ leaseOwner := shard .GetLeaseOwner ()
340
+ if leaseOwner == "" {
341
+ checkpointer .log .Debugf ("Shard Not Assigned Error. ShardID: %s, WorkerID: %s" , shard .ID , checkpointer .kclConfig .WorkerID )
342
+ return nil , ErrShardNotAssigned
343
+ }
344
+ if w , ok := workers [leaseOwner ]; ok {
345
+ workers [leaseOwner ] = append (w , shard )
346
+ } else {
347
+ workers [leaseOwner ] = []* par.ShardStatus {shard }
348
+ }
349
+ }
350
+ return workers , nil
351
+ }
352
+
353
+ // ClaimShard places a claim request on a shard to signal a steal attempt
354
+ func (checkpointer * DynamoCheckpoint ) ClaimShard (shard * par.ShardStatus , claimID string ) error {
355
+ err := checkpointer .FetchCheckpoint (shard )
356
+ if err != nil && err != ErrSequenceIDNotFound {
357
+ return err
358
+ }
359
+ leaseTimeoutString := shard .GetLeaseTimeout ().Format (time .RFC3339 )
360
+
361
+ conditionalExpression := `ShardID = :id AND LeaseTimeout = :lease_timeout AND attribute_not_exists(ClaimRequest)`
362
+ expressionAttributeValues := map [string ]* dynamodb.AttributeValue {
363
+ ":id" : {
364
+ S : aws .String (shard .ID ),
365
+ },
366
+ ":lease_timeout" : {
367
+ S : aws .String (leaseTimeoutString ),
368
+ },
369
+ }
370
+
371
+ marshalledCheckpoint := map [string ]* dynamodb.AttributeValue {
372
+ LeaseKeyKey : {
373
+ S : & shard .ID ,
374
+ },
375
+ LeaseTimeoutKey : {
376
+ S : & leaseTimeoutString ,
377
+ },
378
+ SequenceNumberKey : {
379
+ S : & shard .Checkpoint ,
380
+ },
381
+ ClaimRequestKey : {
382
+ S : & claimID ,
383
+ },
384
+ }
385
+
386
+ if leaseOwner := shard .GetLeaseOwner (); leaseOwner == "" {
387
+ conditionalExpression += " AND attribute_not_exists(AssignedTo)"
388
+ } else {
389
+ marshalledCheckpoint [LeaseOwnerKey ] = & dynamodb.AttributeValue {S : & leaseOwner }
390
+ conditionalExpression += "AND AssignedTo = :assigned_to"
391
+ expressionAttributeValues [":assigned_to" ] = & dynamodb.AttributeValue {S : & leaseOwner }
392
+ }
393
+
394
+ if checkpoint := shard .GetCheckpoint (); checkpoint == "" {
395
+ conditionalExpression += " AND attribute_not_exists(Checkpoint)"
396
+ } else if checkpoint == ShardEnd {
397
+ conditionalExpression += " AND Checkpoint <> :checkpoint"
398
+ expressionAttributeValues [":checkpoint" ] = & dynamodb.AttributeValue {S : aws .String (ShardEnd )}
399
+ } else {
400
+ conditionalExpression += " AND Checkpoint = :checkpoint"
401
+ expressionAttributeValues [":checkpoint" ] = & dynamodb.AttributeValue {S : & checkpoint }
402
+ }
403
+
404
+ if shard .ParentShardId == "" {
405
+ conditionalExpression += " AND attribute_not_exists(ParentShardId)"
406
+ } else {
407
+ marshalledCheckpoint [ParentShardIdKey ] = & dynamodb.AttributeValue {S : aws .String (shard .ParentShardId )}
408
+ conditionalExpression += " AND ParentShardId = :parent_shard"
409
+ expressionAttributeValues [":parent_shard" ] = & dynamodb.AttributeValue {S : & shard .ParentShardId }
410
+ }
411
+
412
+ return checkpointer .conditionalUpdate (conditionalExpression , expressionAttributeValues , marshalledCheckpoint )
413
+ }
414
+
415
+ func (checkpointer * DynamoCheckpoint ) syncLeases (shardStatus map [string ]* par.ShardStatus ) error {
416
+ log := checkpointer .kclConfig .Logger
417
+
418
+ if (checkpointer .lastLeaseSync .Add (time .Duration (checkpointer .kclConfig .LeaseSyncingTimeIntervalMillis ) * time .Millisecond )).After (time .Now ()) {
419
+ return nil
420
+ }
421
+
422
+ checkpointer .lastLeaseSync = time .Now ()
423
+ input := & dynamodb.ScanInput {
424
+ ProjectionExpression : aws .String (fmt .Sprintf ("%s,%s,%s" , LeaseKeyKey , LeaseOwnerKey , SequenceNumberKey )),
425
+ Select : aws .String ("SPECIFIC_ATTRIBUTES" ),
426
+ TableName : aws .String (checkpointer .kclConfig .TableName ),
427
+ }
428
+
429
+ err := checkpointer .svc .ScanPages (input ,
430
+ func (pages * dynamodb.ScanOutput , lastPage bool ) bool {
431
+ results := pages .Items
432
+ for _ , result := range results {
433
+ shardId , foundShardId := result [LeaseKeyKey ]
434
+ assignedTo , foundAssignedTo := result [LeaseOwnerKey ]
435
+ checkpoint , foundCheckpoint := result [SequenceNumberKey ]
436
+ if ! foundShardId || ! foundAssignedTo || ! foundCheckpoint {
437
+ continue
438
+ }
439
+ if shard , ok := shardStatus [aws .StringValue (shardId .S )]; ok {
440
+ shard .SetLeaseOwner (aws .StringValue (assignedTo .S ))
441
+ shard .SetCheckpoint (aws .StringValue (checkpoint .S ))
442
+ }
443
+ }
444
+ return ! lastPage
445
+ })
446
+
447
+ if err != nil {
448
+ log .Debugf ("Error performing SyncLeases. Error: %+v " , err )
449
+ return err
450
+ }
451
+ log .Debugf ("Lease sync completed. Next lease sync will occur in %s" , time .Duration (checkpointer .kclConfig .LeaseSyncingTimeIntervalMillis )* time .Millisecond )
452
+ return nil
453
+ }
454
+
275
455
func (checkpointer * DynamoCheckpoint ) createTable () error {
276
456
input := & dynamodb.CreateTableInput {
277
457
AttributeDefinitions : []* dynamodb.AttributeDefinition {
0 commit comments