@@ -142,6 +142,10 @@ public class CheckpointCoordinator {
142
142
143
143
private final IMap <Object , Object > runningJobStateIMap ;
144
144
145
+ // save pending checkpoint for savepoint, to make sure the different savepoint request can be
146
+ // processed with one savepoint operation in the same time.
147
+ private PendingCheckpoint savepointPendingCheckpoint ;
148
+
145
149
private final String checkpointStateImapKey ;
146
150
147
151
@ SneakyThrows
@@ -449,15 +453,14 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
449
453
CompletableFuture <PendingCheckpoint > pendingCheckpoint =
450
454
createPendingCheckpoint (currentTimestamp , checkpointType );
451
455
startTriggerPendingCheckpoint (pendingCheckpoint );
452
- pendingCounter .incrementAndGet ();
453
456
// if checkpoint type are final type, we don't need to trigger next checkpoint
454
457
if (checkpointType .notFinalCheckpoint () && checkpointType .notSchemaChangeCheckpoint ()) {
455
458
scheduleTriggerPendingCheckpoint (coordinatorConfig .getCheckpointInterval ());
456
459
}
457
460
}
458
461
}
459
462
460
- public boolean isShutdown () {
463
+ private boolean isShutdown () {
461
464
return shutdown ;
462
465
}
463
466
@@ -472,29 +475,45 @@ public static Map<Long, Integer> getPipelineTasks(Set<TaskLocation> pipelineSubt
472
475
@ SneakyThrows
473
476
public PassiveCompletableFuture <CompletedCheckpoint > startSavepoint () {
474
477
LOG .info (String .format ("Start save point for Job (%s)" , jobId ));
478
+ if (shutdown || isCompleted ()) {
479
+ return completableFutureWithError (
480
+ CheckpointCloseReason .CHECKPOINT_COORDINATOR_SHUTDOWN );
481
+ }
475
482
if (!isAllTaskReady ) {
476
- CompletableFuture <CompletedCheckpoint > savepointFuture = new CompletableFuture <>();
477
- savepointFuture .completeExceptionally (
478
- new CheckpointException (
479
- CheckpointCloseReason .TASK_NOT_ALL_READY_WHEN_SAVEPOINT ));
480
- return new PassiveCompletableFuture <>(savepointFuture );
483
+ return completableFutureWithError (
484
+ CheckpointCloseReason .TASK_NOT_ALL_READY_WHEN_SAVEPOINT );
485
+ }
486
+ if (savepointPendingCheckpoint != null
487
+ && !savepointPendingCheckpoint .getCompletableFuture ().isDone ()) {
488
+ return savepointPendingCheckpoint .getCompletableFuture ();
481
489
}
482
490
CompletableFuture <PendingCheckpoint > savepoint ;
483
491
synchronized (lock ) {
484
- while (pendingCounter .get () > 0 ) {
492
+ while (pendingCounter .get () > 0 && ! shutdown ) {
485
493
Thread .sleep (500 );
486
494
}
495
+ if (shutdown || isCompleted ()) {
496
+ return completableFutureWithError (
497
+ CheckpointCloseReason .CHECKPOINT_COORDINATOR_SHUTDOWN );
498
+ }
487
499
savepoint = createPendingCheckpoint (Instant .now ().toEpochMilli (), SAVEPOINT_TYPE );
488
500
startTriggerPendingCheckpoint (savepoint );
489
501
}
490
- PendingCheckpoint savepointPendingCheckpoint = savepoint .join ();
502
+ savepointPendingCheckpoint = savepoint .join ();
491
503
LOG .info (
492
504
String .format (
493
505
"The save point checkpointId is %s" ,
494
506
savepointPendingCheckpoint .getCheckpointId ()));
495
507
return savepointPendingCheckpoint .getCompletableFuture ();
496
508
}
497
509
510
+ private PassiveCompletableFuture <CompletedCheckpoint > completableFutureWithError (
511
+ CheckpointCloseReason closeReason ) {
512
+ CompletableFuture <CompletedCheckpoint > future = new CompletableFuture <>();
513
+ future .completeExceptionally (new CheckpointException (closeReason ));
514
+ return new PassiveCompletableFuture <>(future );
515
+ }
516
+
498
517
private void startTriggerPendingCheckpoint (
499
518
CompletableFuture <PendingCheckpoint > pendingCompletableFuture ) {
500
519
pendingCompletableFuture .thenAccept (
@@ -577,9 +596,10 @@ private void startTriggerPendingCheckpoint(
577
596
TimeUnit .MILLISECONDS ));
578
597
}
579
598
});
599
+ pendingCounter .incrementAndGet ();
580
600
}
581
601
582
- CompletableFuture <PendingCheckpoint > createPendingCheckpoint (
602
+ private CompletableFuture <PendingCheckpoint > createPendingCheckpoint (
583
603
long triggerTimestamp , CheckpointType checkpointType ) {
584
604
synchronized (lock ) {
585
605
CompletableFuture <Long > idFuture ;
@@ -610,7 +630,7 @@ CompletableFuture<PendingCheckpoint> createPendingCheckpoint(
610
630
}
611
631
}
612
632
613
- CompletableFuture <PendingCheckpoint > triggerPendingCheckpoint (
633
+ private CompletableFuture <PendingCheckpoint > triggerPendingCheckpoint (
614
634
long triggerTimestamp ,
615
635
CompletableFuture <Long > idFuture ,
616
636
CheckpointType checkpointType ) {
@@ -944,4 +964,10 @@ protected void completeSchemaChangeAfterCheckpoint(CompletedCheckpoint checkpoin
944
964
checkpoint .getCheckpointId (), pipelineId , jobId ));
945
965
}
946
966
}
967
+
968
+ /** Only for test */
969
+ @ VisibleForTesting
970
+ public PendingCheckpoint getSavepointPendingCheckpoint () {
971
+ return savepointPendingCheckpoint ;
972
+ }
947
973
}
0 commit comments