Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,9 @@ public static boolean isSchemaChangeEventRedundant(
return true;
},
createTableEvent -> {
// It has been applied if such table already exists
return latestSchema.isPresent();
// Redundant only if the table exists AND the schema is identical
return latestSchema.isPresent()
&& latestSchema.get().equals(createTableEvent.getSchema());
},
dropColumnEvent -> {
// It has not been applied if schema does not even exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,74 @@ TABLE_ID, of("id", BIGINT), of("name", VARCHAR(17), "id", BIGINT)))
ImmutableMap.of("foo", INT, "baz", FLOAT)));
}

@Test
void testGetSchemaDifferenceForProjectionChanges() {
// Simulate projection change: new schema has an added column
Assertions.assertThat(
getSchemaDifference(
TABLE_ID,
of("id", BIGINT, "name", STRING),
of("id", BIGINT, "name", STRING, "age", INT)))
.as("projection change adding a column should produce AddColumnEvent")
.containsExactly(
new AddColumnEvent(
TABLE_ID,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("age", INT),
AddColumnEvent.ColumnPosition.AFTER,
"name"))));

// Simulate projection change: new schema has a removed column
Assertions.assertThat(
getSchemaDifference(
TABLE_ID,
of("id", BIGINT, "name", STRING, "age", INT),
of("id", BIGINT, "name", STRING)))
.as("projection change removing a column should produce DropColumnEvent")
.containsExactly(new DropColumnEvent(TABLE_ID, Collections.singletonList("age")));

// Simulate projection change: column type changed (e.g. STRING -> VARCHAR(255))
Assertions.assertThat(
getSchemaDifference(
TABLE_ID,
of("id", BIGINT, "name", STRING),
of("id", BIGINT, "name", DataTypes.VARCHAR(255))))
.as(
"projection change with different column type should produce AlterColumnTypeEvent")
.containsExactly(
new AlterColumnTypeEvent(
TABLE_ID,
Collections.singletonMap("name", DataTypes.VARCHAR(255)),
Collections.singletonMap("name", STRING)));

// Simulate projection change: both added and removed columns (column swap)
Assertions.assertThat(
getSchemaDifference(
TABLE_ID,
of("id", BIGINT, "name", STRING),
of("id", BIGINT, "email", STRING)))
.as("projection change swapping columns should produce Add + Drop events")
.containsExactly(
new AddColumnEvent(
TABLE_ID,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("email", STRING),
AddColumnEvent.ColumnPosition.AFTER,
"id"))),
new DropColumnEvent(TABLE_ID, Collections.singletonList("name")));

// Simulate projection change: identical schemas should produce no diff
Assertions.assertThat(
getSchemaDifference(
TABLE_ID,
of("id", BIGINT, "name", STRING),
of("id", BIGINT, "name", STRING)))
.as("identical schemas should produce no schema change events")
.isEmpty();
}

@Test
void testMergeAndDiff() {
Assertions.assertThat(mergeAndDiff(null, of("id", BIGINT, "name", VARCHAR(17))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
Expand Down Expand Up @@ -484,4 +485,66 @@ void testInferWiderSchema() {
.build()))
.isExactlyInstanceOf(IllegalStateException.class);
}

@Test
void testIsSchemaChangeEventRedundantForCreateTableEvent() {
TableId tableId = TableId.tableId("test_db", "test_table");

Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.primaryKey("id")
.build();

// CreateTableEvent is NOT redundant when table doesn't exist (null schema)
CreateTableEvent createEvent = new CreateTableEvent(tableId, schema);
Assertions.assertThat(SchemaUtils.isSchemaChangeEventRedundant(null, createEvent))
.as("CreateTableEvent should not be redundant when table does not exist")
.isFalse();

// CreateTableEvent IS redundant when table exists with the same schema
Assertions.assertThat(SchemaUtils.isSchemaChangeEventRedundant(schema, createEvent))
.as("CreateTableEvent should be redundant when table exists with same schema")
.isTrue();

// CreateTableEvent is NOT redundant when table exists with different schema (extra column)
Schema schemaWithExtraColumn =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.primaryKey("id")
.build();
CreateTableEvent createEventWithExtraColumn =
new CreateTableEvent(tableId, schemaWithExtraColumn);
Assertions.assertThat(
SchemaUtils.isSchemaChangeEventRedundant(
schema, createEventWithExtraColumn))
.as("CreateTableEvent should not be redundant when new schema has extra columns")
.isFalse();

// CreateTableEvent is NOT redundant when column types differ
Schema schemaWithDifferentType =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.VARCHAR(255))
.primaryKey("id")
.build();
CreateTableEvent createEventWithDifferentType =
new CreateTableEvent(tableId, schemaWithDifferentType);
Assertions.assertThat(
SchemaUtils.isSchemaChangeEventRedundant(
schema, createEventWithDifferentType))
.as("CreateTableEvent should not be redundant when column types differ")
.isFalse();

// CreateTableEvent is NOT redundant when existing schema has more columns than new schema
Assertions.assertThat(
SchemaUtils.isSchemaChangeEventRedundant(
schemaWithExtraColumn, createEvent))
.as(
"CreateTableEvent should not be redundant when existing schema has more columns")
.isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,29 @@ private List<SchemaChangeEvent> deduceEvolvedSchemaChanges(SchemaChangeEvent eve

List<SchemaChangeEvent> rawSchemaChangeEvents = new ArrayList<>();
if (upstreamDependencies.size() == 1) {
// If it's a one-by-one routing rule, we can simply forward it to downstream sink.
SchemaChangeEvent rawEvent = event.copy(evolvedTableId);
rawSchemaChangeEvents.add(rawEvent);
LOG.info(
"Step 3.3 - It's an one-by-one routing and could be forwarded as {}.",
rawEvent);
// If it's a one-by-one routing rule, we can simply forward it to downstream
// sink. However, if the incoming event is a CreateTableEvent for an
// already-known evolved table (e.g. after restart with changed projections),
// we must compute the schema diff instead of forwarding the raw
// CreateTableEvent, which would fail at the sink.
if (event instanceof CreateTableEvent && currentEvolvedSchema != null) {
CreateTableEvent createTableEvent = (CreateTableEvent) event;
List<SchemaChangeEvent> diffEvents =
SchemaMergingUtils.getSchemaDifference(
evolvedTableId,
currentEvolvedSchema,
createTableEvent.getSchema());
rawSchemaChangeEvents.addAll(diffEvents);
LOG.info(
"Step 3.3 - It's a one-by-one routing but CreateTableEvent for existing table. Computed diff events: {}.",
diffEvents);
} else {
SchemaChangeEvent rawEvent = event.copy(evolvedTableId);
rawSchemaChangeEvents.add(rawEvent);
LOG.info(
"Step 3.3 - It's an one-by-one routing and could be forwarded as {}.",
rawEvent);
}
Comment on lines +291 to +313
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add E2e test cases in org.apache.flink.cdc.pipeline.tests.migration to cover these scenarios?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I've gone ahead and added test cases for the scenarios

} else {
Set<Schema> toBeMergedSchemas =
SchemaDerivator.reverseLookupDependingUpstreamSchemas(
Expand Down
Loading
Loading