diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 131a783f551..6c224ec522a 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -37,10 +37,14 @@ import org.apache.flink.cdc.common.types.TimestampType; import org.apache.flink.cdc.common.types.ZonedTimestampType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -55,6 +59,8 @@ @PublicEvolving public class SchemaUtils { + private static final Logger LOG = LoggerFactory.getLogger(SchemaUtils.class); + /** * create a list of {@link RecordData.FieldGetter} from given {@link Schema} to get Object from * RecordData. @@ -120,7 +126,36 @@ public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent eve private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema) { LinkedList columns = new LinkedList<>(oldSchema.getColumns()); + Set existingColumnNames = + columns.stream() + .map(Column::getName) + .collect(Collectors.toCollection(HashSet::new)); for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { + // Skip columns that already exist in the schema to handle duplicate AddColumnEvents + // (e.g., from gh-ost online schema migrations) + if (existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) { + Column incomingColumn = columnWithPosition.getAddColumn(); + columns.stream() + .filter(c -> c.getName().equals(incomingColumn.getName())) + .findFirst() + .ifPresent( + existingColumn -> { + if (!existingColumn + .getType() + .equals(incomingColumn.getType())) { + // No coercion is performed; the existing column + // definition is preserved as-is. + LOG.warn( + "Skipping duplicate column '{}' for table {} but types differ: " + + "existing={}, incoming={}", + incomingColumn.getName(), + event.tableId(), + existingColumn.getType(), + incomingColumn.getType()); + } + }); + continue; + } switch (columnWithPosition.getPosition()) { case FIRST: { @@ -165,10 +200,92 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema break; } } + existingColumnNames.add(columnWithPosition.getAddColumn().getName()); } return oldSchema.copy(columns); } + /** + * Filters out redundant columns from an {@link AddColumnEvent} that already exist in the + * current schema, and deduplicates columns within the same event. Returns {@link + * Optional#empty()} if all columns are redundant. + * + *

This handles cases like gh-ost online schema migrations where duplicate ADD COLUMN events + * may be emitted for the same column. + * + *

Note: Duplicate detection is based on column name only. If a duplicate + * AddColumnEvent arrives with a different type for an existing column name, a warning will be + * logged but the column will still be skipped. This is the expected behavior for online schema + * migration tools (gh-ost, pt-osc) where duplicate events are always exact copies. + */ + public static Optional filterRedundantAddColumns( + Schema currentSchema, AddColumnEvent event) { + Map existingColumns = new HashMap<>(); + for (Column col : currentSchema.getColumns()) { + existingColumns.put(col.getName(), col); + } + Set seenColumns = new HashSet<>(existingColumns.keySet()); + List nonRedundant = new ArrayList<>(); + List filteredColumnNames = new ArrayList<>(); + for (AddColumnEvent.ColumnWithPosition cwp : event.getAddedColumns()) { + String colName = cwp.getAddColumn().getName(); + if (seenColumns.add(colName)) { + nonRedundant.add(cwp); + } else { + filteredColumnNames.add(colName); + Column existingCol = existingColumns.get(colName); + if (existingCol != null + && !existingCol.getType().equals(cwp.getAddColumn().getType())) { + LOG.warn( + "Skipping duplicate column '{}' for table {} but types differ: " + + "existing={}, incoming={}", + colName, + event.tableId(), + existingCol.getType(), + cwp.getAddColumn().getType()); + } + } + } + if (nonRedundant.isEmpty()) { + return Optional.empty(); + } + if (!filteredColumnNames.isEmpty()) { + LOG.debug( + "Filtered redundant columns {} from AddColumnEvent for table {}", + filteredColumnNames, + event.tableId()); + } + if (nonRedundant.size() == event.getAddedColumns().size()) { + return Optional.of(event); + } + return Optional.of(new AddColumnEvent(event.tableId(), nonRedundant)); + } + + /** + * Filters duplicate {@link AddColumnEvent} columns that already exist in the given schema. For + * non-AddColumnEvent schema changes, the event is returned as-is. + * + * @param currentSchema the current schema to check against + * @param event the schema change event to filter + * @return the filtered event, or {@link Optional#empty()} if the event is fully redundant + */ + public static Optional filterDuplicateAddColumns( + Schema currentSchema, SchemaChangeEvent event) { + if (!(event instanceof AddColumnEvent)) { + return Optional.of(event); + } + Optional filtered = + filterRedundantAddColumns(currentSchema, (AddColumnEvent) event); + if (!filtered.isPresent()) { + LOG.debug( + "Skipping fully redundant AddColumnEvent for table {} " + + "- all columns already exist", + event.tableId()); + return Optional.empty(); + } + return Optional.of(filtered.get()); + } + private static Schema applyDropColumnEvent(DropColumnEvent event, Schema oldSchema) { List columns = oldSchema.getColumns().stream() diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java index cdf1532d24c..b86d18ced5c 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java @@ -32,9 +32,11 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; /** A test for the {@link org.apache.flink.cdc.common.utils.SchemaUtils}. */ class SchemaUtilsTest { @@ -484,4 +486,231 @@ void testInferWiderSchema() { .build())) .isExactlyInstanceOf(IllegalStateException.class); } + + // ========================== Tests for duplicate AddColumnEvent handling + // ========================== + + @Test + void testFilterRedundantAddColumns_allDuplicates() { + TableId tableId = TableId.parse("default.default.table1"); + Schema schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .build(); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("name", DataTypes.STRING()))); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("age", DataTypes.INT()))); + AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns); + + Optional result = + SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent); + Assertions.assertThat(result).isEmpty(); + } + + @Test + void testFilterRedundantAddColumns_noDuplicates() { + TableId tableId = TableId.parse("default.default.table1"); + Schema schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .build(); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("age", DataTypes.INT()))); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("email", DataTypes.STRING()))); + AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns); + + Optional result = + SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent); + Assertions.assertThat(result).isPresent(); + Assertions.assertThat(result.get().getAddedColumns()).hasSize(2); + Assertions.assertThat(result.get()).isEqualTo(addColumnEvent); + } + + @Test + void testFilterRedundantAddColumns_partialDuplicates() { + TableId tableId = TableId.parse("default.default.table1"); + Schema schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .build(); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("name", DataTypes.STRING()))); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("age", DataTypes.INT()))); + AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns); + + Optional result = + SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent); + Assertions.assertThat(result).isPresent(); + Assertions.assertThat(result.get().getAddedColumns()).hasSize(1); + Assertions.assertThat(result.get().getAddedColumns().get(0).getAddColumn().getName()) + .isEqualTo("age"); + } + + @Test + void testFilterRedundantAddColumns_emptyAddedColumns() { + TableId tableId = TableId.parse("default.default.table1"); + Schema schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .build(); + + AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, Collections.emptyList()); + + Optional result = + SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent); + Assertions.assertThat(result).isEmpty(); + } + + @Test + void testApplyAddColumnEvent_idempotent() { + TableId tableId = TableId.parse("default.default.table1"); + Schema schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .build(); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("name", DataTypes.STRING()))); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("age", DataTypes.INT()))); + AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns); + + Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent); + Assertions.assertThat(result) + .isEqualTo( + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .build()); + } + + @Test + void testApplyAddColumnEvent_allDuplicates() { + TableId tableId = TableId.parse("default.default.table1"); + Schema schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .build(); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("name", DataTypes.STRING()))); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("age", DataTypes.INT()))); + AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns); + + Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent); + Assertions.assertThat(result) + .isEqualTo( + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .build()); + } + + @Test + void testFilterRedundantAddColumns_withPositions() { + TableId tableId = TableId.parse("default.default.table1"); + Schema schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .build(); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("name", DataTypes.STRING()), + AddColumnEvent.ColumnPosition.AFTER, + "id")); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("age", DataTypes.INT()), + AddColumnEvent.ColumnPosition.AFTER, + "name")); + AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns); + + Optional result = + SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent); + Assertions.assertThat(result).isPresent(); + Assertions.assertThat(result.get().getAddedColumns()).hasSize(1); + + AddColumnEvent.ColumnWithPosition remaining = result.get().getAddedColumns().get(0); + Assertions.assertThat(remaining.getAddColumn().getName()).isEqualTo("age"); + Assertions.assertThat(remaining.getPosition()) + .isEqualTo(AddColumnEvent.ColumnPosition.AFTER); + Assertions.assertThat(remaining.getExistedColumnName()).isEqualTo("name"); + } + + @Test + void testFilterRedundantAddColumns_intraEventDuplicates() { + Schema schema = Schema.newBuilder().physicalColumn("id", DataTypes.INT()).build(); + TableId tableId = TableId.tableId("default", "schema", "table"); + AddColumnEvent event = + new AddColumnEvent( + tableId, + Arrays.asList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("name", DataTypes.STRING())), + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("name", DataTypes.STRING())))); + Optional result = SchemaUtils.filterRedundantAddColumns(schema, event); + Assertions.assertThat(result).isPresent(); + Assertions.assertThat(result.get().getAddedColumns()).hasSize(1); + Assertions.assertThat(result.get().getAddedColumns().get(0).getAddColumn().getName()) + .isEqualTo("name"); + } + + @Test + void testApplyAddColumnEvent_duplicateWithinSameEvent() { + TableId tableId = TableId.parse("default.default.table1"); + Schema schema = Schema.newBuilder().physicalColumn("id", DataTypes.INT()).build(); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("name", DataTypes.STRING()))); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("name", DataTypes.STRING()))); + AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns); + + Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent); + Assertions.assertThat(result) + .isEqualTo( + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .build()); + } } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index e4da2fca351..6b4f1cc43da 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -1303,6 +1303,129 @@ void testTransformWithFallbackRules(ValuesDataSink.SinkApi apiVersion) throws Ex "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, rule_fallback], after=[], op=DELETE, meta=()}")); } + /** + * Tests that duplicate {@link AddColumnEvent}s (e.g., from gh-ost online schema migrations) are + * properly filtered and do not crash the pipeline when a transform is applied. + */ + @Test + void testDuplicateAddColumnEventWithTransform() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1"); + List events = generateDuplicateAddColumnEvents(tableId); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.singletonList( + new TransformDef( + tableId.toString(), + "id, name", + null, + null, + null, + null, + null, + null)), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + + assertThat(outputEvents) + .containsExactly( + // Initial stage + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob], op=INSERT, meta=()}", + // No AddColumnEvent in output because "extras" is not in the projection + // (transform selects only "id, name"). Both the original and the duplicate + // AddColumnEvent are handled without error; the duplicate is filtered out. + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Carol], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Carol], after=[3, Colin], op=UPDATE, meta=()}"); + } + + private List generateDuplicateAddColumnEvents(TableId tableId) { + List events = new ArrayList<>(); + + // Initial schema: id, name, age + Schema schemaV1 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(); + + events.add(new CreateTableEvent(tableId, schemaV1)); + events.add(DataChangeEvent.insertEvent(tableId, generate(schemaV1, 1, "Alice", 18))); + events.add(DataChangeEvent.insertEvent(tableId, generate(schemaV1, 2, "Bob", 20))); + + // First AddColumnEvent: add "extras" column + events.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("extras", DataTypes.STRING()))))); + + // Duplicate AddColumnEvent: add "extras" column again (e.g., from gh-ost migration) + events.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("extras", DataTypes.STRING()))))); + + // Schema after adding extras: id, name, age, extras + Schema schemaV2 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .physicalColumn("extras", DataTypes.STRING()) + .primaryKey("id") + .build(); + + events.add( + DataChangeEvent.insertEvent( + tableId, generate(schemaV2, 3, "Carol", 23, "extra_data"))); + events.add( + DataChangeEvent.updateEvent( + tableId, + generate(schemaV2, 3, "Carol", 23, "extra_data"), + generate(schemaV2, 3, "Colin", 24, "updated_extra"))); + + return events; + } + void runGenericTransformTest( ValuesDataSink.SinkApi sinkApi, List transformDefs, diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index ae0b48d57d1..1bd28acd04a 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -240,6 +240,15 @@ private Optional processSchemaChangeEvent( TableId tableId = event.tableId(); PostTransformChangeInfo info = checkNotNull(postTransformInfoMap.get(tableId)); + // Filter out redundant AddColumnEvent columns that already exist in the schema + // to handle duplicate events from tools like gh-ost online schema migrations + Optional filteredEvent = + SchemaUtils.filterDuplicateAddColumns(info.getPreTransformedSchema(), event); + if (!filteredEvent.isPresent()) { + return Optional.empty(); + } + event = filteredEvent.get(); + // Apply schema change event to the pre-transformed schema Schema prevPreSchema = info.getPreTransformedSchema(); Schema nextPreSchema = SchemaUtils.applySchemaChangeEvent(prevPreSchema, event); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index ff6cc512f44..5091ba10ee4 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -194,7 +194,6 @@ private void processEvent(Event event) { output.collect(new StreamRecord<>(event)); } else if (event instanceof SchemaChangeEvent) { SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; - preTransformProcessorMap.remove(schemaChangeEvent.tableId()); cacheChangeSchema(schemaChangeEvent) .ifPresent(e -> output.collect(new StreamRecord<>(e))); } else if (event instanceof DataChangeEvent) { @@ -215,6 +214,19 @@ private CreateTableEvent cacheCreateTable(CreateTableEvent event) { private Optional cacheChangeSchema(SchemaChangeEvent event) { TableId tableId = event.tableId(); PreTransformChangeInfo tableChangeInfo = preTransformChangeInfoMap.get(tableId); + + // Filter out redundant AddColumnEvent columns that already exist in the schema + // to handle duplicate events from tools like gh-ost online schema migrations + Optional filteredEvent = + SchemaUtils.filterDuplicateAddColumns(tableChangeInfo.getSourceSchema(), event); + if (!filteredEvent.isPresent()) { + return Optional.empty(); + } + event = filteredEvent.get(); + + // Schema actually changed, invalidate the processor cache so it gets rebuilt + preTransformProcessorMap.remove(tableId); + Schema originalSchema = SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event); Schema preTransformedSchema = tableChangeInfo.getPreTransformedSchema(); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java index 7ba5c1b74ea..b76f063dcb8 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java @@ -840,6 +840,262 @@ void testSchemaChangeWithPostWildcard() throws Exception { .runTests("inserting columns at last"); } + /** + * Tests duplicate AddColumnEvent handling with a wildcard projection ({@code *, id + age as + * computed}) and a filter ({@code name <> 'Alice'}). This exercises the full pre+post pipeline + * with a wildcard rule: the first AddColumnEvent for "extras" passes through both operators, + * while the duplicate is filtered by PreTransformOperator (which also prevents it from reaching + * PostTransformOperator). A subsequent DataChangeEvent verifies the pipeline remains functional + * after duplicate filtering, with the computed column correctly evaluated. + */ + @Test + void testDuplicateAddColumnEventPreTransform() throws Exception { + TableId tableId = TableId.tableId("my_company", "my_branch", "data_changes"); + TransformWithSchemaEvolveTestCase.of( + tableId, + "*, id + age as computed", + "name <> 'Alice'", + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .physicalColumn("computed", DataTypes.INT()) + .primaryKey("id") + .build()) + .initializeHarness() + .runTests("initializing table") + // First AddColumnEvent: add "extras" column + .evolveFromSource( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("extras", DataTypes.FLOAT()), + AddColumnEvent.ColumnPosition.LAST, + null)))) + .expectInPreTransformed( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("extras", DataTypes.FLOAT()), + AddColumnEvent.ColumnPosition.AFTER, + "age")))) + .expectInPostTransformed( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("extras", DataTypes.FLOAT()), + AddColumnEvent.ColumnPosition.AFTER, + "age")))) + .runTests("adding extras column first time") + // Duplicate AddColumnEvent: add "extras" column again (should be filtered) + .evolveFromSource( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("extras", DataTypes.FLOAT()), + AddColumnEvent.ColumnPosition.LAST, + null)))) + .expectNothingInPreTransformed() + .expectNothingInPostTransformed() + .runTests("duplicate add extras column should be filtered") + // Send a DataChangeEvent after the duplicate filtering to verify pipeline works + .insertSource(1, "Bob", 20, 1.5f) + .expectInPreTransformed(1, "Bob", 20, 1.5f) + .expectInPostTransformed(1, "Bob", 20, 1.5f, 21) + .runTests("data change after duplicate filtering should succeed") + .destroyHarness(); + } + + /** + * Tests duplicate AddColumnEvent handling with the same wildcard projection and filter as + * {@link #testDuplicateAddColumnEventPreTransform()}, but verifies that PostTransformOperator + * independently filters duplicates. Both operators see the duplicate event and each filters it + * independently. The subsequent DataChangeEvent uses different record values ({@code id=2, + * name="Charlie", age=30}) to verify the computed column ({@code id + age = 32}) is correctly + * evaluated after duplicate filtering, confirming both operators maintain consistent schema + * state. + */ + @Test + void testDuplicateAddColumnEventPostTransform() throws Exception { + TableId tableId = TableId.tableId("my_company", "my_branch", "data_changes"); + TransformWithSchemaEvolveTestCase.of( + tableId, + "*, id + age as computed", + "name <> 'Alice'", + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .physicalColumn("computed", DataTypes.INT()) + .primaryKey("id") + .build()) + .initializeHarness() + .runTests("initializing table") + // First AddColumnEvent: add "extras" column + .evolveFromSource( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("extras", DataTypes.FLOAT()), + AddColumnEvent.ColumnPosition.LAST, + null)))) + .expectInPreTransformed( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("extras", DataTypes.FLOAT()), + AddColumnEvent.ColumnPosition.AFTER, + "age")))) + .expectInPostTransformed( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("extras", DataTypes.FLOAT()), + AddColumnEvent.ColumnPosition.AFTER, + "age")))) + .runTests("adding extras column first time") + // Second AddColumnEvent: add "extras" again (duplicate, should be filtered) + .evolveFromSource( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("extras", DataTypes.FLOAT()), + AddColumnEvent.ColumnPosition.LAST, + null)))) + .expectNothingInPreTransformed() + .expectNothingInPostTransformed() + .runTests("duplicate add extras column should be filtered by both operators") + // Send a DataChangeEvent after the duplicate filtering to verify pipeline works + .insertSource(2, "Charlie", 30, 2.5f) + .expectInPreTransformed(2, "Charlie", 30, 2.5f) + .expectInPostTransformed(2, "Charlie", 30, 2.5f, 32) + .runTests("data change after duplicate filtering should succeed") + .destroyHarness(); + } + + /** + * This case tests partial duplicate AddColumnEvents. When an event adds multiple columns and + * some already exist, only the non-duplicate columns should be forwarded. + */ + @Test + void testPartialDuplicateAddColumnEvent() throws Exception { + TableId tableId = TableId.tableId("my_company", "my_branch", "data_changes"); + TransformWithSchemaEvolveTestCase.of( + tableId, + "*, id + age as computed", + "name <> 'Alice'", + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .physicalColumn("computed", DataTypes.INT()) + .primaryKey("id") + .build()) + .initializeHarness() + .runTests("initializing table") + // First AddColumnEvent: add "email" column + .evolveFromSource( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("email", DataTypes.STRING()), + AddColumnEvent.ColumnPosition.LAST, + null)))) + .expectInPreTransformed( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("email", DataTypes.STRING()), + AddColumnEvent.ColumnPosition.AFTER, + "age")))) + .expectInPostTransformed( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("email", DataTypes.STRING()), + AddColumnEvent.ColumnPosition.AFTER, + "age")))) + .runTests("adding email column") + // Second AddColumnEvent: add "email" (duplicate) and "phone" (new) + .evolveFromSource( + new AddColumnEvent( + tableId, + Arrays.asList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("email", DataTypes.STRING()), + AddColumnEvent.ColumnPosition.LAST, + null), + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("phone", DataTypes.STRING()), + AddColumnEvent.ColumnPosition.LAST, + null)))) + .expectInPreTransformed( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("phone", DataTypes.STRING()), + AddColumnEvent.ColumnPosition.AFTER, + "email")))) + .expectInPostTransformed( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("phone", DataTypes.STRING()), + AddColumnEvent.ColumnPosition.AFTER, + "email")))) + .runTests("partial duplicate: email filtered, phone added") + .destroyHarness(); + } + /** This case tests column name map when schema evolution happens. */ @Test public void testSchemaChangeWithColumnNameMap() throws Exception {