diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 8ef9cd298be..e3d82758f67 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -121,7 +121,16 @@ public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent eve private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema) { LinkedList columns = new LinkedList<>(oldSchema.getColumns()); + Set existingColumnNames = + columns.stream() + .map(Column::getName) + .collect(Collectors.toCollection(HashSet::new)); for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { + // Skip adding the column if it already exists in the schema to ensure idempotency. + // This can happen when schema change events are replayed after a failover recovery. + if (existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) { + continue; + } switch (columnWithPosition.getPosition()) { case FIRST: { @@ -170,6 +179,7 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema break; } } + existingColumnNames.add(columnWithPosition.getAddColumn().getName()); } return oldSchema.copy(columns); } diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java index a5f3f86adda..9f745192127 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java @@ -160,6 +160,46 @@ void testApplyColumnSchemaChangeEvent() { "AFTER type AddColumnEvent error: Column %s does not exist in table %s", "col10", tableId)); + // add duplicate column should be ignored (idempotency) + addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("col3", DataTypes.STRING()), + AddColumnEvent.ColumnPosition.LAST, + null)); + addColumnEvent = new AddColumnEvent(tableId, addedColumns); + schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent); + Assertions.assertThat(schema) + .isEqualTo( + Schema.newBuilder() + .physicalColumn("col0", DataTypes.STRING()) + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .physicalColumn("col4", DataTypes.STRING()) + .physicalColumn("col5", DataTypes.STRING()) + .physicalColumn("col3", DataTypes.STRING()) + .build()); + + // add duplicate column in FIRST position should be ignored + addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("col0", DataTypes.STRING()), + AddColumnEvent.ColumnPosition.FIRST, + null)); + addColumnEvent = new AddColumnEvent(tableId, addedColumns); + schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent); + Assertions.assertThat(schema) + .isEqualTo( + Schema.newBuilder() + .physicalColumn("col0", DataTypes.STRING()) + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .physicalColumn("col4", DataTypes.STRING()) + .physicalColumn("col5", DataTypes.STRING()) + .physicalColumn("col3", DataTypes.STRING()) + .build()); + // drop columns DropColumnEvent dropColumnEvent = new DropColumnEvent(tableId, Arrays.asList("col3", "col5"));