Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,16 @@ public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent eve

private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema) {
LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns());
Set<String> existingColumnNames =
columns.stream()
.map(Column::getName)
.collect(Collectors.toCollection(HashSet::new));
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
// Skip adding the column if it already exists in the schema to ensure idempotency.
// This can happen when schema change events are replayed after a failover recovery.
if (existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) {
continue;
}
Comment on lines +124 to +133
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping duplicates purely by name can hide a real schema conflict (e.g., replay vs. a different upstream AddColumnEvent that reuses the same column name but with a different type/nullable/default/comment). To keep idempotency while avoiding silent corruption, consider: if the name exists, look up the existing Column and compare with columnWithPosition.getAddColumn(); only skip when they are equivalent, otherwise throw an informative exception.

Copilot uses AI. Check for mistakes.
switch (columnWithPosition.getPosition()) {
case FIRST:
{
Expand Down Expand Up @@ -170,6 +179,7 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema
break;
}
}
existingColumnNames.add(columnWithPosition.getAddColumn().getName());
}
return oldSchema.copy(columns);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,46 @@ void testApplyColumnSchemaChangeEvent() {
"AFTER type AddColumnEvent error: Column %s does not exist in table %s",
"col10", tableId));

// add duplicate column should be ignored (idempotency)
addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col3", DataTypes.STRING()),
AddColumnEvent.ColumnPosition.LAST,
null));
addColumnEvent = new AddColumnEvent(tableId, addedColumns);
schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Comment on lines +163 to +171
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current tests cover replay of a single-column AddColumnEvent. Since the implementation now keeps existingColumnNames across iterations, add a unit test where a single AddColumnEvent contains the same column name twice (e.g., two ColumnWithPosition entries for col3), and assert only one column is added. This directly validates the new intra-event dedup behavior.

Copilot uses AI. Check for mistakes.
Assertions.assertThat(schema)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("col0", DataTypes.STRING())
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.physicalColumn("col4", DataTypes.STRING())
.physicalColumn("col5", DataTypes.STRING())
.physicalColumn("col3", DataTypes.STRING())
.build());

// add duplicate column in FIRST position should be ignored
addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col0", DataTypes.STRING()),
AddColumnEvent.ColumnPosition.FIRST,
null));
addColumnEvent = new AddColumnEvent(tableId, addedColumns);
schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Assertions.assertThat(schema)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("col0", DataTypes.STRING())
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.physicalColumn("col4", DataTypes.STRING())
.physicalColumn("col5", DataTypes.STRING())
.physicalColumn("col3", DataTypes.STRING())
.build());

// drop columns
DropColumnEvent dropColumnEvent =
new DropColumnEvent(tableId, Arrays.asList("col3", "col5"));
Expand Down
Loading