@@ -137,7 +137,7 @@ public class CheckpointCoordinator {
137137 /** Flag marking the coordinator as shut down (not accepting any messages anymore). */
138138 private volatile boolean shutdown ;
139139
140- private volatile boolean isAllTaskReady = false ;
140+ private final AtomicBoolean isAllTaskReady = new AtomicBoolean ( false ) ;
141141
142142 private final ExecutorService executorService ;
143143
@@ -345,7 +345,10 @@ private void allTaskReady() {
345345 return ;
346346 }
347347 }
348- isAllTaskReady = true ;
348+ if (!isAllTaskReady .compareAndSet (false , true )) {
349+ LOG .info ("all task already ready, skip notify task start" );
350+ return ;
351+ }
349352 InvocationFuture <?>[] futures = notifyTaskStart ();
350353 CompletableFuture .allOf (futures ).join ();
351354 notifyCompleted (latestCompletedCheckpoint );
@@ -451,7 +454,6 @@ protected void readyToCloseIdleTask(TaskLocation taskLocation) {
451454 }
452455 }
453456 readyToCloseIdleTask .addAll (subTaskList );
454- tryTriggerPendingCheckpoint (CheckpointType .CHECKPOINT_TYPE );
455457 }
456458 }
457459
@@ -470,18 +472,18 @@ protected void completedCloseIdleTask(TaskLocation taskLocation) {
470472 }
471473
472474 protected void restoreCoordinator (boolean alreadyStarted ) {
473- LOG .info ("received restore CheckpointCoordinator with alreadyStarted= " + alreadyStarted );
475+ LOG .info ("received restore CheckpointCoordinator with alreadyStarted = {}" , alreadyStarted );
474476 errorByPhysicalVertex = new AtomicReference <>();
475477 checkpointCoordinatorFuture = new CompletableFuture <>();
476478 updateStatus (CheckpointCoordinatorStatus .RUNNING );
477479 cleanPendingCheckpoint (CheckpointCloseReason .CHECKPOINT_COORDINATOR_RESET );
478480 shutdown = false ;
479481 if (alreadyStarted ) {
480- isAllTaskReady = true ;
482+ isAllTaskReady . set ( true ) ;
481483 notifyCompleted (latestCompletedCheckpoint );
482484 tryTriggerPendingCheckpoint (CHECKPOINT_TYPE );
483485 } else {
484- isAllTaskReady = false ;
486+ isAllTaskReady . set ( false ) ;
485487 }
486488 }
487489
@@ -492,15 +494,24 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
492494 }
493495 final long currentTimestamp = Instant .now ().toEpochMilli ();
494496 if (checkpointType .notFinalCheckpoint () && checkpointType .notSchemaChangeCheckpoint ()) {
495- long diffFromLastTimestamp = currentTimestamp - latestTriggerTimestamp .get ();
496- if (diffFromLastTimestamp <= 0 ) {
497+ if (!isAllTaskReady .get ()) {
498+ LOG .info ("Not all tasks are ready, skipping checkpoint trigger" );
499+ return ;
500+ }
501+ long interval = currentTimestamp - latestTriggerTimestamp .get ();
502+ if (interval <= 0 ) {
497503 LOG .error (
498504 "The time on your server may not be incremental which can lead checkpoint to stop. The latestTriggerTimestamp: ({}), but the currentTimestamp: ({})" ,
499505 latestTriggerTimestamp .get (),
500506 currentTimestamp );
501507 }
502- if (diffFromLastTimestamp < coordinatorConfig .getCheckpointInterval ()
503- || !isAllTaskReady ) {
508+ if (interval < coordinatorConfig .getCheckpointInterval ()) {
509+ LOG .info (
510+ "skip trigger checkpoint because the last trigger timestamp is {} and current timestamp is {}, the interval is less than config." ,
511+ latestTriggerTimestamp .get (),
512+ currentTimestamp );
513+ scheduleTriggerPendingCheckpoint (
514+ checkpointType , coordinatorConfig .getCheckpointInterval () - interval );
504515 return ;
505516 }
506517 }
@@ -534,6 +545,10 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
534545 // if checkpoint type are final type, we don't need to trigger next checkpoint
535546 if (checkpointType .notFinalCheckpoint () && checkpointType .notSchemaChangeCheckpoint ()) {
536547 scheduleTriggerPendingCheckpoint (coordinatorConfig .getCheckpointInterval ());
548+ } else {
549+ LOG .info (
550+ "skip schedule trigger checkpoint because checkpoint type is {}" ,
551+ checkpointType );
537552 }
538553 }
539554 }
@@ -557,7 +572,7 @@ public PassiveCompletableFuture<CompletedCheckpoint> startSavepoint() {
557572 return completableFutureWithError (
558573 CheckpointCloseReason .CHECKPOINT_COORDINATOR_SHUTDOWN );
559574 }
560- if (!isAllTaskReady ) {
575+ if (!isAllTaskReady . get () ) {
561576 return completableFutureWithError (
562577 CheckpointCloseReason .TASK_NOT_ALL_READY_WHEN_SAVEPOINT );
563578 }
@@ -596,7 +611,7 @@ private void startTriggerPendingCheckpoint(
596611 CompletableFuture <PendingCheckpoint > pendingCompletableFuture ) {
597612 pendingCompletableFuture .thenAccept (
598613 pendingCheckpoint -> {
599- LOG .info ("wait checkpoint completed: " + pendingCheckpoint .getCheckpointId ());
614+ LOG .info ("wait checkpoint completed: {}" , pendingCheckpoint .getCheckpointId ());
600615 PassiveCompletableFuture <CompletedCheckpoint > completableFuture =
601616 pendingCheckpoint .getCompletableFuture ();
602617 completableFuture .whenCompleteAsync (
@@ -647,8 +662,8 @@ private void startTriggerPendingCheckpoint(
647662 }
648663 if (coordinatorConfig .isCheckpointEnable ()) {
649664 LOG .debug (
650- "Start a scheduled task to prevent checkpoint timeouts for barrier "
651- + pendingCheckpoint .getInfo ());
665+ "Start a scheduled task to prevent checkpoint timeouts for barrier {}" ,
666+ pendingCheckpoint .getInfo ());
652667 long checkpointTimeout = coordinatorConfig .getCheckpointTimeout ();
653668 if (pendingCheckpoint .getCheckpointType ().isSchemaChangeAfterCheckpoint ()) {
654669 checkpointTimeout =
@@ -665,8 +680,8 @@ private void startTriggerPendingCheckpoint(
665680 != null
666681 && !pendingCheckpoint .isFullyAcknowledged ()) {
667682 LOG .info (
668- "timeout checkpoint: "
669- + pendingCheckpoint .getInfo ());
683+ "timeout checkpoint: {}" ,
684+ pendingCheckpoint .getInfo ());
670685 handleCoordinatorError (
671686 CheckpointCloseReason .CHECKPOINT_EXPIRED ,
672687 null );
@@ -781,7 +796,7 @@ public InvocationFuture<?>[] triggerCheckpoint(CheckpointBarrier checkpointBarri
781796
782797 protected void cleanPendingCheckpoint (CheckpointCloseReason closedReason ) {
783798 shutdown = true ;
784- isAllTaskReady = false ;
799+ isAllTaskReady . set ( false ) ;
785800 synchronized (lock ) {
786801 LOG .info ("start clean pending checkpoint cause {}" , closedReason .message ());
787802 if (!pendingCheckpoints .isEmpty ()) {
@@ -817,7 +832,7 @@ protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
817832 final long checkpointId = ackOperation .getBarrier ().getId ();
818833 final PendingCheckpoint pendingCheckpoint = pendingCheckpoints .get (checkpointId );
819834 if (pendingCheckpoint == null ) {
820- LOG .info ("skip already ack checkpoint " + checkpointId );
835+ LOG .info ("skip already ack checkpoint {}" , checkpointId );
821836 return ;
822837 }
823838 TaskLocation location = ackOperation .getTaskLocation ();
@@ -995,7 +1010,7 @@ private synchronized void updateStatus(@NonNull CheckpointCoordinatorStatus targ
9951010 new RetryUtils .RetryMaterial (
9961011 Constant .OPERATION_RETRY_TIME ,
9971012 true ,
998- exception -> ExceptionUtil . isOperationNeedRetryException ( exception ) ,
1013+ ExceptionUtil :: isOperationNeedRetryException ,
9991014 Constant .OPERATION_RETRY_SLEEP ));
10001015 } catch (Exception e ) {
10011016 LOG .warn (
0 commit comments