Skip to content

Commit e1a81ac

Browse files
authored
[Hotfix][Zeta] Fix job deadlock when schema change (apache#6389)
1 parent 045d3e3 commit e1a81ac

File tree

1 file changed

+24
-17
lines changed
  • seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow

1 file changed

+24
-17
lines changed

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java

+24-17
Original file line numberDiff line numberDiff line change
@@ -168,17 +168,17 @@ public void collect() throws Exception {
168168
"previous schema changes in progress, schemaChangePhase: "
169169
+ schemaChangePhase.get());
170170
}
171-
runningTask.triggerSchemaChangeBeforeCheckpoint().get();
172171
schemaChangePhase.set(SchemaChangePhase.createBeforePhase());
172+
runningTask.triggerSchemaChangeBeforeCheckpoint().get();
173173
log.info("triggered schema-change-before checkpoint, stopping collect data");
174174
} else if (collector.captureSchemaChangeAfterCheckpointSignal()) {
175175
if (schemaChangePhase.get() != null) {
176176
throw new IllegalStateException(
177177
"previous schema changes in progress, schemaChangePhase: "
178178
+ schemaChangePhase.get());
179179
}
180-
runningTask.triggerSchemaChangeAfterCheckpoint().get();
181180
schemaChangePhase.set(SchemaChangePhase.createAfterPhase());
181+
runningTask.triggerSchemaChangeAfterCheckpoint().get();
182182
log.info("triggered schema-change-after checkpoint, stopping collect data");
183183
}
184184
} else {
@@ -284,25 +284,32 @@ public void triggerBarrier(Barrier barrier) throws Exception {
284284
currentTaskLocation);
285285

286286
CheckpointType checkpointType = ((CheckpointBarrier) barrier).getCheckpointType();
287-
if (schemaChanging() && checkpointType.isSchemaChangeCheckpoint()) {
288-
if (checkpointType.isSchemaChangeBeforeCheckpoint()
289-
&& schemaChangePhase.get().isBeforePhase()) {
290-
schemaChangePhase.get().setCheckpointId(barrier.getId());
291-
} else if (checkpointType.isSchemaChangeAfterCheckpoint()
292-
&& schemaChangePhase.get().isAfterPhase()) {
293-
schemaChangePhase.get().setCheckpointId(barrier.getId());
287+
if (checkpointType.isSchemaChangeCheckpoint()) {
288+
if (schemaChanging()) {
289+
if (checkpointType.isSchemaChangeBeforeCheckpoint()
290+
&& schemaChangePhase.get().isBeforePhase()) {
291+
schemaChangePhase.get().setCheckpointId(barrier.getId());
292+
} else if (checkpointType.isSchemaChangeAfterCheckpoint()
293+
&& schemaChangePhase.get().isAfterPhase()) {
294+
schemaChangePhase.get().setCheckpointId(barrier.getId());
295+
} else {
296+
throw new IllegalStateException(
297+
String.format(
298+
"schema-change checkpoint[%s,%s] and phase[%s] is not matched",
299+
barrier.getId(),
300+
checkpointType,
301+
schemaChangePhase.get().getPhase()));
302+
}
303+
log.info(
304+
"lock checkpoint[{}] waiting for complete..., phase: [{}]",
305+
barrier.getId(),
306+
schemaChangePhase.get().getPhase());
294307
} else {
295308
throw new IllegalStateException(
296309
String.format(
297-
"schema-change checkpoint[%s,%s] and phase[%s] is not matched",
298-
barrier.getId(),
299-
checkpointType,
300-
schemaChangePhase.get().getPhase()));
310+
"schema-change checkpoint[%s] and phase[%s] is not matched",
311+
barrier.getId(), checkpointType));
301312
}
302-
log.info(
303-
"lock checkpoint[{}] waiting for complete..., phase: [{}]",
304-
barrier.getId(),
305-
schemaChangePhase.get().getPhase());
306313
}
307314
}
308315

0 commit comments

Comments
 (0)