Skip to content

Commit 3783816

Browse files
authored
[Fix][Zeta] Fix losing checkpoint scheduling in extreme cases (#9246)
1 parent fbdf39e commit 3783816

File tree

3 files changed

+88
-20
lines changed

3 files changed

+88
-20
lines changed

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,8 @@ public void testZetaStreamingCheckpointNoInterval(TestContainer container)
186186

187187
// check sink file is right
188188
AtomicReference<Boolean> checkSinkFile = new AtomicReference<>(false);
189-
await().atMost(300000, TimeUnit.MILLISECONDS)
189+
// the default checkpoint interval is 300s, so we need to wait for 300+60s
190+
await().atMost(360000, TimeUnit.MILLISECONDS)
190191
.untilAsserted(
191192
() -> {
192193
Container.ExecResult disableSinkFileExecResult =

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public class CheckpointCoordinator {
137137
/** Flag marking the coordinator as shut down (not accepting any messages anymore). */
138138
private volatile boolean shutdown;
139139

140-
private volatile boolean isAllTaskReady = false;
140+
private final AtomicBoolean isAllTaskReady = new AtomicBoolean(false);
141141

142142
private final ExecutorService executorService;
143143

@@ -345,7 +345,10 @@ private void allTaskReady() {
345345
return;
346346
}
347347
}
348-
isAllTaskReady = true;
348+
if (!isAllTaskReady.compareAndSet(false, true)) {
349+
LOG.info("all task already ready, skip notify task start");
350+
return;
351+
}
349352
InvocationFuture<?>[] futures = notifyTaskStart();
350353
CompletableFuture.allOf(futures).join();
351354
notifyCompleted(latestCompletedCheckpoint);
@@ -451,7 +454,6 @@ protected void readyToCloseIdleTask(TaskLocation taskLocation) {
451454
}
452455
}
453456
readyToCloseIdleTask.addAll(subTaskList);
454-
tryTriggerPendingCheckpoint(CheckpointType.CHECKPOINT_TYPE);
455457
}
456458
}
457459

@@ -470,18 +472,18 @@ protected void completedCloseIdleTask(TaskLocation taskLocation) {
470472
}
471473

472474
protected void restoreCoordinator(boolean alreadyStarted) {
473-
LOG.info("received restore CheckpointCoordinator with alreadyStarted= " + alreadyStarted);
475+
LOG.info("received restore CheckpointCoordinator with alreadyStarted = {}", alreadyStarted);
474476
errorByPhysicalVertex = new AtomicReference<>();
475477
checkpointCoordinatorFuture = new CompletableFuture<>();
476478
updateStatus(CheckpointCoordinatorStatus.RUNNING);
477479
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
478480
shutdown = false;
479481
if (alreadyStarted) {
480-
isAllTaskReady = true;
482+
isAllTaskReady.set(true);
481483
notifyCompleted(latestCompletedCheckpoint);
482484
tryTriggerPendingCheckpoint(CHECKPOINT_TYPE);
483485
} else {
484-
isAllTaskReady = false;
486+
isAllTaskReady.set(false);
485487
}
486488
}
487489

@@ -492,15 +494,24 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
492494
}
493495
final long currentTimestamp = Instant.now().toEpochMilli();
494496
if (checkpointType.notFinalCheckpoint() && checkpointType.notSchemaChangeCheckpoint()) {
495-
long diffFromLastTimestamp = currentTimestamp - latestTriggerTimestamp.get();
496-
if (diffFromLastTimestamp <= 0) {
497+
if (!isAllTaskReady.get()) {
498+
LOG.info("Not all tasks are ready, skipping checkpoint trigger");
499+
return;
500+
}
501+
long interval = currentTimestamp - latestTriggerTimestamp.get();
502+
if (interval <= 0) {
497503
LOG.error(
498504
"The time on your server may not be incremental which can lead checkpoint to stop. The latestTriggerTimestamp: ({}), but the currentTimestamp: ({})",
499505
latestTriggerTimestamp.get(),
500506
currentTimestamp);
501507
}
502-
if (diffFromLastTimestamp < coordinatorConfig.getCheckpointInterval()
503-
|| !isAllTaskReady) {
508+
if (interval < coordinatorConfig.getCheckpointInterval()) {
509+
LOG.info(
510+
"skip trigger checkpoint because the last trigger timestamp is {} and current timestamp is {}, the interval is less than config.",
511+
latestTriggerTimestamp.get(),
512+
currentTimestamp);
513+
scheduleTriggerPendingCheckpoint(
514+
checkpointType, coordinatorConfig.getCheckpointInterval() - interval);
504515
return;
505516
}
506517
}
@@ -534,6 +545,10 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
534545
// if checkpoint type are final type, we don't need to trigger next checkpoint
535546
if (checkpointType.notFinalCheckpoint() && checkpointType.notSchemaChangeCheckpoint()) {
536547
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
548+
} else {
549+
LOG.info(
550+
"skip schedule trigger checkpoint because checkpoint type is {}",
551+
checkpointType);
537552
}
538553
}
539554
}
@@ -557,7 +572,7 @@ public PassiveCompletableFuture<CompletedCheckpoint> startSavepoint() {
557572
return completableFutureWithError(
558573
CheckpointCloseReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
559574
}
560-
if (!isAllTaskReady) {
575+
if (!isAllTaskReady.get()) {
561576
return completableFutureWithError(
562577
CheckpointCloseReason.TASK_NOT_ALL_READY_WHEN_SAVEPOINT);
563578
}
@@ -596,7 +611,7 @@ private void startTriggerPendingCheckpoint(
596611
CompletableFuture<PendingCheckpoint> pendingCompletableFuture) {
597612
pendingCompletableFuture.thenAccept(
598613
pendingCheckpoint -> {
599-
LOG.info("wait checkpoint completed: " + pendingCheckpoint.getCheckpointId());
614+
LOG.info("wait checkpoint completed: {}", pendingCheckpoint.getCheckpointId());
600615
PassiveCompletableFuture<CompletedCheckpoint> completableFuture =
601616
pendingCheckpoint.getCompletableFuture();
602617
completableFuture.whenCompleteAsync(
@@ -647,8 +662,8 @@ private void startTriggerPendingCheckpoint(
647662
}
648663
if (coordinatorConfig.isCheckpointEnable()) {
649664
LOG.debug(
650-
"Start a scheduled task to prevent checkpoint timeouts for barrier "
651-
+ pendingCheckpoint.getInfo());
665+
"Start a scheduled task to prevent checkpoint timeouts for barrier {}",
666+
pendingCheckpoint.getInfo());
652667
long checkpointTimeout = coordinatorConfig.getCheckpointTimeout();
653668
if (pendingCheckpoint.getCheckpointType().isSchemaChangeAfterCheckpoint()) {
654669
checkpointTimeout =
@@ -665,8 +680,8 @@ private void startTriggerPendingCheckpoint(
665680
!= null
666681
&& !pendingCheckpoint.isFullyAcknowledged()) {
667682
LOG.info(
668-
"timeout checkpoint: "
669-
+ pendingCheckpoint.getInfo());
683+
"timeout checkpoint: {}",
684+
pendingCheckpoint.getInfo());
670685
handleCoordinatorError(
671686
CheckpointCloseReason.CHECKPOINT_EXPIRED,
672687
null);
@@ -781,7 +796,7 @@ public InvocationFuture<?>[] triggerCheckpoint(CheckpointBarrier checkpointBarri
781796

782797
protected void cleanPendingCheckpoint(CheckpointCloseReason closedReason) {
783798
shutdown = true;
784-
isAllTaskReady = false;
799+
isAllTaskReady.set(false);
785800
synchronized (lock) {
786801
LOG.info("start clean pending checkpoint cause {}", closedReason.message());
787802
if (!pendingCheckpoints.isEmpty()) {
@@ -817,7 +832,7 @@ protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
817832
final long checkpointId = ackOperation.getBarrier().getId();
818833
final PendingCheckpoint pendingCheckpoint = pendingCheckpoints.get(checkpointId);
819834
if (pendingCheckpoint == null) {
820-
LOG.info("skip already ack checkpoint " + checkpointId);
835+
LOG.info("skip already ack checkpoint {}", checkpointId);
821836
return;
822837
}
823838
TaskLocation location = ackOperation.getTaskLocation();
@@ -995,7 +1010,7 @@ private synchronized void updateStatus(@NonNull CheckpointCoordinatorStatus targ
9951010
new RetryUtils.RetryMaterial(
9961011
Constant.OPERATION_RETRY_TIME,
9971012
true,
998-
exception -> ExceptionUtil.isOperationNeedRetryException(exception),
1013+
ExceptionUtil::isOperationNeedRetryException,
9991014
Constant.OPERATION_RETRY_SLEEP));
10001015
} catch (Exception e) {
10011016
LOG.warn(

seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.engine.server.checkpoint;
1919

20+
import org.apache.seatunnel.common.utils.ReflectionUtils;
2021
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
2122
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
2223
import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
@@ -29,7 +30,10 @@
2930

3031
import org.junit.jupiter.api.Assertions;
3132
import org.junit.jupiter.api.Test;
33+
import org.mockito.MockedStatic;
34+
import org.mockito.Mockito;
3235

36+
import java.time.Instant;
3337
import java.util.ArrayList;
3438
import java.util.Collections;
3539
import java.util.HashMap;
@@ -39,6 +43,7 @@
3943
import java.util.concurrent.Executors;
4044
import java.util.concurrent.TimeUnit;
4145
import java.util.concurrent.TimeoutException;
46+
import java.util.concurrent.atomic.AtomicLong;
4247

4348
import static org.apache.seatunnel.engine.common.Constant.IMAP_RUNNING_JOB_STATE;
4449

@@ -109,4 +114,51 @@ protected void handleCheckpointError(int pipelineId, boolean neverRestore) {
109114
executorService.shutdownNow();
110115
}
111116
}
117+
118+
@Test
119+
void testCheckpointContinuesWorkAfterClockDrift()
120+
throws CheckpointStorageException, ExecutionException, InterruptedException,
121+
TimeoutException {
122+
CheckpointConfig checkpointConfig = new CheckpointConfig();
123+
checkpointConfig.setStorage(new CheckpointStorageConfig());
124+
checkpointConfig.setCheckpointTimeout(5000);
125+
checkpointConfig.setCheckpointInterval(5000);
126+
Map<Integer, CheckpointPlan> planMap = new HashMap<>();
127+
planMap.put(
128+
1,
129+
CheckpointPlan.builder()
130+
.pipelineId(1)
131+
.pipelineSubtasks(Collections.singleton(new TaskLocation()))
132+
.build());
133+
ExecutorService executorService = Executors.newCachedThreadPool();
134+
CompletableFuture<Boolean> invokedHandleCheckpointError = new CompletableFuture<>();
135+
Instant now = Instant.now();
136+
Instant startTime = now.minusSeconds(10);
137+
try (MockedStatic<Instant> mockedInstant = Mockito.mockStatic(Instant.class)) {
138+
mockedInstant.when(Instant::now).thenReturn(startTime);
139+
CheckpointManager checkpointManager =
140+
new CheckpointManager(
141+
1L,
142+
false,
143+
nodeEngine,
144+
null,
145+
planMap,
146+
checkpointConfig,
147+
executorService,
148+
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {
149+
@Override
150+
protected void handleCheckpointError(int pipelineId, boolean neverRestore) {
151+
invokedHandleCheckpointError.complete(true);
152+
}
153+
};
154+
ReflectionUtils.setField(
155+
checkpointManager.getCheckpointCoordinator(1),
156+
"latestTriggerTimestamp",
157+
new AtomicLong(startTime.toEpochMilli()));
158+
checkpointManager.reportedPipelineRunning(1, true);
159+
Assertions.assertTrue(invokedHandleCheckpointError.get(1, TimeUnit.MINUTES));
160+
} finally {
161+
executorService.shutdownNow();
162+
}
163+
}
112164
}

0 commit comments

Comments
 (0)