Skip to content

Commit 76a5a25

Browse files
committed
[FLINK-37677][cdc-common][cdc-runtime] Sink supports skipping create table event
1 parent ceffbc0 commit 76a5a25

File tree

6 files changed

+98
-7
lines changed

6 files changed

+98
-7
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ default MetadataApplier setAcceptedSchemaEvolutionTypes(
4141
return this;
4242
}
4343

44-
/** Checks if this metadata applier should this event type. */
44+
/** Checks if this metadata applier should accept this event type. */
4545
default boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) {
4646
return true;
4747
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.cdc.common.event.DropColumnEvent;
2626
import org.apache.flink.cdc.common.event.RenameColumnEvent;
2727
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
28+
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2829
import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema;
2930
import org.apache.flink.cdc.common.event.TableId;
3031
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
@@ -171,7 +172,9 @@ public static List<SchemaChangeEvent> normalizeSchemaChangeEvents(
171172

172173
List<SchemaChangeEvent> finalSchemaChangeEvents = new ArrayList<>();
173174
for (SchemaChangeEvent schemaChangeEvent : rewrittenSchemaChangeEvents) {
174-
if (metadataApplier.acceptsSchemaEvolutionType(schemaChangeEvent.getType())) {
175+
// always accept create.table event
176+
if (metadataApplier.acceptsSchemaEvolutionType(schemaChangeEvent.getType())
177+
|| schemaChangeEvent.getType().equals(SchemaChangeEventType.CREATE_TABLE)) {
175178
finalSchemaChangeEvents.add(schemaChangeEvent);
176179
} else {
177180
LOG.info("Ignored schema change {}.", schemaChangeEvent);

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -462,11 +462,18 @@ private Tuple2<Set<TableId>, List<SchemaChangeEvent>> deduceEvolvedSchemaChanges
462462

463463
private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) {
464464
try {
465-
metadataApplier.applySchemaChange(schemaChangeEvent);
465+
// filter create.table schema change event
466+
if (metadataApplier.acceptsSchemaEvolutionType(schemaChangeEvent.getType())) {
467+
metadataApplier.applySchemaChange(schemaChangeEvent);
468+
LOG.info(
469+
"Successfully applied schema change event {} to external system.",
470+
schemaChangeEvent);
471+
} else {
472+
LOG.info(
473+
"Skip apply schema change event {} to external system.",
474+
schemaChangeEvent);
475+
}
466476
schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
467-
LOG.info(
468-
"Successfully applied schema change event {} to external system.",
469-
schemaChangeEvent);
470477
return true;
471478
} catch (Throwable t) {
472479
handleUnrecoverableError(

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,12 @@ private void applySchemaChange(int sourceSubTaskId) {
434434

435435
private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) {
436436
try {
437-
metadataApplier.applySchemaChange(schemaChangeEvent);
437+
// filter create.table schema change event
438+
if (metadataApplier.acceptsSchemaEvolutionType(schemaChangeEvent.getType())) {
439+
metadataApplier.applySchemaChange(schemaChangeEvent);
440+
} else {
441+
LOG.info("Skip apply schema change {}.", schemaChangeEvent);
442+
}
438443
schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
439444
LOG.info(
440445
"Successfully applied schema change event {} to external system.",

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2561,6 +2561,66 @@ tableId, buildRecord(INT, 12, INT, 0, SMALLINT, (short) 11)),
25612561
}
25622562
}
25632563

2564+
/** Tests lenient schema change behavior exclude create.table event. */
2565+
@Test
2566+
void testLenientSchemaEvolvesExcludeCreate() throws Exception {
2567+
TableId tableId = CUSTOMERS_TABLE_ID;
2568+
Schema schemaV1 =
2569+
Schema.newBuilder()
2570+
.physicalColumn("id", INT)
2571+
.physicalColumn("name", STRING.notNull())
2572+
.physicalColumn("age", SMALLINT)
2573+
.primaryKey("id")
2574+
.build();
2575+
2576+
SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT;
2577+
2578+
SchemaOperator schemaOperator =
2579+
new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
2580+
RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
2581+
RegularEventOperatorTestHarness.withDurationAndExcludeCreateTableBehavior(
2582+
schemaOperator, 5, Duration.ofSeconds(3), behavior);
2583+
harness.open();
2584+
2585+
// Test CreateTableEvent
2586+
{
2587+
List<Event> createAndInsertDataEvents =
2588+
Arrays.asList(
2589+
new CreateTableEvent(tableId, schemaV1),
2590+
DataChangeEvent.insertEvent(
2591+
tableId,
2592+
buildRecord(INT, 1, STRING, "Alice", SMALLINT, (short) 17)),
2593+
DataChangeEvent.insertEvent(
2594+
tableId,
2595+
buildRecord(INT, 2, STRING, "Bob", SMALLINT, (short) 18)),
2596+
DataChangeEvent.insertEvent(
2597+
tableId,
2598+
buildRecord(INT, 3, STRING, "Carol", SMALLINT, (short) 19)));
2599+
2600+
processEvent(schemaOperator, createAndInsertDataEvents);
2601+
2602+
FlushEvent result;
2603+
result =
2604+
new FlushEvent(
2605+
0,
2606+
Collections.singletonList(tableId),
2607+
SchemaChangeEventType.CREATE_TABLE);
2608+
2609+
Assertions.assertThat(
2610+
harness.getOutputRecords().stream()
2611+
.map(StreamRecord::getValue)
2612+
.collect(Collectors.toList()))
2613+
.isEqualTo(
2614+
ListUtils.union(
2615+
Collections.singletonList(result), createAndInsertDataEvents));
2616+
2617+
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
2618+
Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV1);
2619+
2620+
harness.clearOutputRecords();
2621+
}
2622+
}
2623+
25642624
private RecordData buildRecord(final Object... args) {
25652625
List<DataType> dataTypes = new ArrayList<>();
25662626
List<Object> objects = new ArrayList<>();

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,22 @@ RegularEventOperatorTestHarness<OP, E> withDurationAndBehavior(
167167
Collections.emptySet());
168168
}
169169

170+
public static <OP extends AbstractStreamOperator<E>, E extends Event>
171+
RegularEventOperatorTestHarness<OP, E> withDurationAndExcludeCreateTableBehavior(
172+
OP operator,
173+
int numOutputs,
174+
Duration evolveDuration,
175+
SchemaChangeBehavior behavior) {
176+
return new RegularEventOperatorTestHarness<>(
177+
operator,
178+
numOutputs,
179+
evolveDuration,
180+
DEFAULT_RPC_TIMEOUT,
181+
behavior,
182+
Arrays.stream(SchemaChangeEventTypeFamily.COLUMN).collect(Collectors.toSet()),
183+
Arrays.stream(SchemaChangeEventTypeFamily.TABLE).collect(Collectors.toSet()));
184+
}
185+
170186
public static <OP extends AbstractStreamOperator<E>, E extends Event>
171187
RegularEventOperatorTestHarness<OP, E> withDurationAndFineGrainedBehavior(
172188
OP operator,

0 commit comments

Comments
 (0)