Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ protected void completedCloseIdleTask(TaskLocation taskLocation) {
}

protected void restoreCoordinator(boolean alreadyStarted) {
LOG.info("received restore CheckpointCoordinator with alreadyStarted= " + alreadyStarted);
LOG.info("received restore CheckpointCoordinator with alreadyStarted = {}", alreadyStarted);
errorByPhysicalVertex = new AtomicReference<>();
checkpointCoordinatorFuture = new CompletableFuture<>();
updateStatus(CheckpointCoordinatorStatus.RUNNING);
Expand All @@ -492,15 +492,24 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
}
final long currentTimestamp = Instant.now().toEpochMilli();
if (checkpointType.notFinalCheckpoint() && checkpointType.notSchemaChangeCheckpoint()) {
long diffFromLastTimestamp = currentTimestamp - latestTriggerTimestamp.get();
if (diffFromLastTimestamp <= 0) {
if (!isAllTaskReady) {
LOG.info("The all task not ready, skip trigger checkpoint");
return;
}
long interval = currentTimestamp - latestTriggerTimestamp.get();
if (interval <= 0) {
LOG.error(
"The time on your server may not be incremental which can lead checkpoint to stop. The latestTriggerTimestamp: ({}), but the currentTimestamp: ({})",
latestTriggerTimestamp.get(),
currentTimestamp);
}
if (diffFromLastTimestamp < coordinatorConfig.getCheckpointInterval()
|| !isAllTaskReady) {
if (interval < coordinatorConfig.getCheckpointInterval()) {
LOG.info(
"skip trigger checkpoint because the last trigger timestamp is {} and current timestamp is {}, the interval is less than config.",
latestTriggerTimestamp.get(),
currentTimestamp);
scheduleTriggerPendingCheckpoint(
checkpointType, coordinatorConfig.getCheckpointInterval() - interval);
return;
}
}
Expand Down Expand Up @@ -534,6 +543,10 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
// if checkpoint type are final type, we don't need to trigger next checkpoint
if (checkpointType.notFinalCheckpoint() && checkpointType.notSchemaChangeCheckpoint()) {
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
} else {
LOG.info(
"skip schedule trigger checkpoint because checkpoint type is {}",
checkpointType);
}
}
}
Expand Down Expand Up @@ -596,7 +609,7 @@ private void startTriggerPendingCheckpoint(
CompletableFuture<PendingCheckpoint> pendingCompletableFuture) {
pendingCompletableFuture.thenAccept(
pendingCheckpoint -> {
LOG.info("wait checkpoint completed: " + pendingCheckpoint.getCheckpointId());
LOG.info("wait checkpoint completed: {}", pendingCheckpoint.getCheckpointId());
PassiveCompletableFuture<CompletedCheckpoint> completableFuture =
pendingCheckpoint.getCompletableFuture();
completableFuture.whenCompleteAsync(
Expand Down Expand Up @@ -647,8 +660,8 @@ private void startTriggerPendingCheckpoint(
}
if (coordinatorConfig.isCheckpointEnable()) {
LOG.debug(
"Start a scheduled task to prevent checkpoint timeouts for barrier "
+ pendingCheckpoint.getInfo());
"Start a scheduled task to prevent checkpoint timeouts for barrier {}",
pendingCheckpoint.getInfo());
long checkpointTimeout = coordinatorConfig.getCheckpointTimeout();
if (pendingCheckpoint.getCheckpointType().isSchemaChangeAfterCheckpoint()) {
checkpointTimeout =
Expand All @@ -665,8 +678,8 @@ private void startTriggerPendingCheckpoint(
!= null
&& !pendingCheckpoint.isFullyAcknowledged()) {
LOG.info(
"timeout checkpoint: "
+ pendingCheckpoint.getInfo());
"timeout checkpoint: {}",
pendingCheckpoint.getInfo());
handleCoordinatorError(
CheckpointCloseReason.CHECKPOINT_EXPIRED,
null);
Expand Down Expand Up @@ -817,7 +830,7 @@ protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
final long checkpointId = ackOperation.getBarrier().getId();
final PendingCheckpoint pendingCheckpoint = pendingCheckpoints.get(checkpointId);
if (pendingCheckpoint == null) {
LOG.info("skip already ack checkpoint " + checkpointId);
LOG.info("skip already ack checkpoint {}", checkpointId);
return;
}
TaskLocation location = ackOperation.getTaskLocation();
Expand Down Expand Up @@ -995,7 +1008,7 @@ private synchronized void updateStatus(@NonNull CheckpointCoordinatorStatus targ
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
exception -> ExceptionUtil.isOperationNeedRetryException(exception),
ExceptionUtil::isOperationNeedRetryException,
Constant.OPERATION_RETRY_SLEEP));
} catch (Exception e) {
LOG.warn(
Expand Down