@@ -168,17 +168,17 @@ public void collect() throws Exception {
168168 "previous schema changes in progress, schemaChangePhase: "
169169 + schemaChangePhase .get ());
170170 }
171- runningTask .triggerSchemaChangeBeforeCheckpoint ().get ();
172171 schemaChangePhase .set (SchemaChangePhase .createBeforePhase ());
172+ runningTask .triggerSchemaChangeBeforeCheckpoint ().get ();
173173 log .info ("triggered schema-change-before checkpoint, stopping collect data" );
174174 } else if (collector .captureSchemaChangeAfterCheckpointSignal ()) {
175175 if (schemaChangePhase .get () != null ) {
176176 throw new IllegalStateException (
177177 "previous schema changes in progress, schemaChangePhase: "
178178 + schemaChangePhase .get ());
179179 }
180- runningTask .triggerSchemaChangeAfterCheckpoint ().get ();
181180 schemaChangePhase .set (SchemaChangePhase .createAfterPhase ());
181+ runningTask .triggerSchemaChangeAfterCheckpoint ().get ();
182182 log .info ("triggered schema-change-after checkpoint, stopping collect data" );
183183 }
184184 } else {
@@ -284,25 +284,32 @@ public void triggerBarrier(Barrier barrier) throws Exception {
284284 currentTaskLocation );
285285
286286 CheckpointType checkpointType = ((CheckpointBarrier ) barrier ).getCheckpointType ();
287- if (schemaChanging () && checkpointType .isSchemaChangeCheckpoint ()) {
288- if (checkpointType .isSchemaChangeBeforeCheckpoint ()
289- && schemaChangePhase .get ().isBeforePhase ()) {
290- schemaChangePhase .get ().setCheckpointId (barrier .getId ());
291- } else if (checkpointType .isSchemaChangeAfterCheckpoint ()
292- && schemaChangePhase .get ().isAfterPhase ()) {
293- schemaChangePhase .get ().setCheckpointId (barrier .getId ());
287+ if (checkpointType .isSchemaChangeCheckpoint ()) {
288+ if (schemaChanging ()) {
289+ if (checkpointType .isSchemaChangeBeforeCheckpoint ()
290+ && schemaChangePhase .get ().isBeforePhase ()) {
291+ schemaChangePhase .get ().setCheckpointId (barrier .getId ());
292+ } else if (checkpointType .isSchemaChangeAfterCheckpoint ()
293+ && schemaChangePhase .get ().isAfterPhase ()) {
294+ schemaChangePhase .get ().setCheckpointId (barrier .getId ());
295+ } else {
296+ throw new IllegalStateException (
297+ String .format (
298+ "schema-change checkpoint[%s,%s] and phase[%s] is not matched" ,
299+ barrier .getId (),
300+ checkpointType ,
301+ schemaChangePhase .get ().getPhase ()));
302+ }
303+ log .info (
304+ "lock checkpoint[{}] waiting for complete..., phase: [{}]" ,
305+ barrier .getId (),
306+ schemaChangePhase .get ().getPhase ());
294307 } else {
295308 throw new IllegalStateException (
296309 String .format (
297- "schema-change checkpoint[%s,%s] and phase[%s] is not matched" ,
298- barrier .getId (),
299- checkpointType ,
300- schemaChangePhase .get ().getPhase ()));
310+ "schema-change checkpoint[%s] and phase[%s] is not matched" ,
311+ barrier .getId (), checkpointType ));
301312 }
302- log .info (
303- "lock checkpoint[{}] waiting for complete..., phase: [{}]" ,
304- barrier .getId (),
305- schemaChangePhase .get ().getPhase ());
306313 }
307314 }
308315
0 commit comments