Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,16 @@ public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent eve

private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema) {
LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns());
Set<String> existingColumnNames =
columns.stream()
.map(Column::getName)
.collect(Collectors.toCollection(HashSet::new));
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
// Skip columns that already exist in the schema to handle duplicate AddColumnEvents
// (e.g., from gh-ost online schema migrations)
if (existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) {
continue;
}
Comment on lines +129 to +158
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applyAddColumnEvent now silently skips adding a column when the name already exists. This can mask upstream inconsistencies (e.g., same column name but different type/comment/default), leaving the schema potentially out of sync with the source without any signal. Consider validating that the existing column definition matches the incoming addColumn (and throw or at least log/warn when it differs) so only true duplicates are treated as idempotent.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added LOG field to SchemaUtils. When skipping a duplicate column, compares DataType of existing vs incoming column and logs at WARN level if they differ

switch (columnWithPosition.getPosition()) {
case FIRST:
{
Expand Down Expand Up @@ -165,10 +174,43 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema
break;
}
}
existingColumnNames.add(columnWithPosition.getAddColumn().getName());
}
return oldSchema.copy(columns);
}

/**
* Filters out redundant columns from an {@link AddColumnEvent} that already exist in the
* current schema, and deduplicates columns within the same event. Returns {@link
* Optional#empty()} if all columns are redundant.
*
* <p>This handles cases like gh-ost online schema migrations where duplicate ADD COLUMN events
* may be emitted for the same column.
*
* <p><b>Note:</b> Duplicate detection is based on column name only. If a duplicate
* AddColumnEvent arrives with a different type for an existing column name, it will be silently
* skipped. This is the expected behavior for online schema migration tools (gh-ost, pt-osc)
* where duplicate events are always exact copies.
*/
public static Optional<AddColumnEvent> filterRedundantAddColumns(
Schema currentSchema, AddColumnEvent event) {
Set<String> seenColumns = new HashSet<>(currentSchema.getColumnNames());
List<AddColumnEvent.ColumnWithPosition> nonRedundant = new ArrayList<>();
for (AddColumnEvent.ColumnWithPosition cwp : event.getAddedColumns()) {
String colName = cwp.getAddColumn().getName();
if (seenColumns.add(colName)) {
nonRedundant.add(cwp);
}
}
if (nonRedundant.isEmpty()) {
return Optional.empty();
}
if (nonRedundant.size() == event.getAddedColumns().size()) {
return Optional.of(event);
}
return Optional.of(new AddColumnEvent(event.tableId(), nonRedundant));
}

private static Schema applyDropColumnEvent(DropColumnEvent event, Schema oldSchema) {
List<Column> columns =
oldSchema.getColumns().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/** A test for the {@link org.apache.flink.cdc.common.utils.SchemaUtils}. */
class SchemaUtilsTest {
Expand Down Expand Up @@ -484,4 +486,231 @@ void testInferWiderSchema() {
.build()))
.isExactlyInstanceOf(IllegalStateException.class);
}

// ========================== Tests for duplicate AddColumnEvent handling
// ==========================

@Test
void testFilterRedundantAddColumns_allDuplicates() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.build();

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING())));
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("age", DataTypes.INT())));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);

Optional<AddColumnEvent> result =
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
Assertions.assertThat(result).isEmpty();
}

@Test
void testFilterRedundantAddColumns_noDuplicates() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.build();

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("age", DataTypes.INT())));
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("email", DataTypes.STRING())));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);

Optional<AddColumnEvent> result =
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
Assertions.assertThat(result).isPresent();
Assertions.assertThat(result.get().getAddedColumns()).hasSize(2);
Assertions.assertThat(result.get()).isEqualTo(addColumnEvent);
}

@Test
void testFilterRedundantAddColumns_partialDuplicates() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.build();

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING())));
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("age", DataTypes.INT())));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);

Optional<AddColumnEvent> result =
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
Assertions.assertThat(result).isPresent();
Assertions.assertThat(result.get().getAddedColumns()).hasSize(1);
Assertions.assertThat(result.get().getAddedColumns().get(0).getAddColumn().getName())
.isEqualTo("age");
}

@Test
void testFilterRedundantAddColumns_emptyAddedColumns() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.build();

AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, Collections.emptyList());

Optional<AddColumnEvent> result =
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
Assertions.assertThat(result).isEmpty();
}

@Test
void testApplyAddColumnEvent_idempotent() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.build();

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING())));
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("age", DataTypes.INT())));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);

Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Assertions.assertThat(result)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.build());
}

@Test
void testApplyAddColumnEvent_allDuplicates() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.build();

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING())));
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("age", DataTypes.INT())));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);

Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Assertions.assertThat(result)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.build());
}

@Test
void testFilterRedundantAddColumns_withPositions() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.build();

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING()),
AddColumnEvent.ColumnPosition.AFTER,
"id"));
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("age", DataTypes.INT()),
AddColumnEvent.ColumnPosition.AFTER,
"name"));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);

Optional<AddColumnEvent> result =
SchemaUtils.filterRedundantAddColumns(schema, addColumnEvent);
Assertions.assertThat(result).isPresent();
Assertions.assertThat(result.get().getAddedColumns()).hasSize(1);

AddColumnEvent.ColumnWithPosition remaining = result.get().getAddedColumns().get(0);
Assertions.assertThat(remaining.getAddColumn().getName()).isEqualTo("age");
Assertions.assertThat(remaining.getPosition())
.isEqualTo(AddColumnEvent.ColumnPosition.AFTER);
Assertions.assertThat(remaining.getExistedColumnName()).isEqualTo("name");
}

@Test
void testFilterRedundantAddColumns_intraEventDuplicates() {
Schema schema = Schema.newBuilder().physicalColumn("id", DataTypes.INT()).build();
TableId tableId = TableId.tableId("default", "schema", "table");
AddColumnEvent event =
new AddColumnEvent(
tableId,
Arrays.asList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING())),
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING()))));
Optional<AddColumnEvent> result = SchemaUtils.filterRedundantAddColumns(schema, event);
Assertions.assertThat(result).isPresent();
Assertions.assertThat(result.get().getAddedColumns()).hasSize(1);
Assertions.assertThat(result.get().getAddedColumns().get(0).getAddColumn().getName())
.isEqualTo("name");
}

@Test
void testApplyAddColumnEvent_duplicateWithinSameEvent() {
TableId tableId = TableId.parse("default.default.table1");
Schema schema = Schema.newBuilder().physicalColumn("id", DataTypes.INT()).build();

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING())));
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("name", DataTypes.STRING())));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);

Schema result = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Assertions.assertThat(result)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.cdc.common.converter.JavaObjectConverter;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
Expand All @@ -46,6 +47,9 @@
import org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable;
import org.apache.flink.shaded.guava31.com.google.common.collect.Table;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.Serializable;
Expand All @@ -68,6 +72,7 @@ public class PostTransformOperator extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event>, Serializable {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(PostTransformOperator.class);

private final String timezone;
private final List<TransformRule> transformRules;
Expand Down Expand Up @@ -240,6 +245,23 @@ private Optional<Event> processSchemaChangeEvent(
TableId tableId = event.tableId();
PostTransformChangeInfo info = checkNotNull(postTransformInfoMap.get(tableId));

// Filter out redundant AddColumnEvent columns that already exist in the schema
// to handle duplicate events from tools like gh-ost online schema migrations
if (event instanceof AddColumnEvent) {
AddColumnEvent addColumnEvent = (AddColumnEvent) event;
Schema currentSchema = info.getPreTransformedSchema();
Optional<AddColumnEvent> filtered =
SchemaUtils.filterRedundantAddColumns(currentSchema, addColumnEvent);
if (!filtered.isPresent()) {
LOG.debug(
"Skipping fully redundant AddColumnEvent for table {} "
+ "- all columns already exist",
tableId);
return Optional.empty();
}
event = filtered.get();
}

// Apply schema change event to the pre-transformed schema
Schema prevPreSchema = info.getPreTransformedSchema();
Schema nextPreSchema = SchemaUtils.applySchemaChangeEvent(prevPreSchema, event);
Expand Down
Loading
Loading