Skip to content

Commit 88adcfa

Browse files
committed
[postgres][hotfix] Fix pass wrong schemaChangeEnabled.
1 parent 08f3fb8 commit 88adcfa

5 files changed

Lines changed: 49 additions & 21 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ protected void processElement(
135135
maybeSendCreateTableEventFromCache(tableId, output);
136136
} else if (isDataChangeRecord(element)) {
137137
handleDataChangeRecord(element, output);
138-
} else if (isSchemaChangeEvent(element) && sourceConfig.isSchemaChangeEnabled()) {
138+
} else if (isSchemaChangeEvent(element) && sourceConfig.isIncludeSchemaChanges()) {
139139
handleSchemaChangeRecord(element, output, splitState);
140140
}
141141
super.processElement(element, output, splitState);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ public void testSchemaChangeWithDataInserts() throws Exception {
620620
configFactory.database(inventoryDatabase.getDatabaseName());
621621
configFactory.slotName(slotName);
622622
configFactory.decodingPluginName("pgoutput");
623-
configFactory.enableSchemaChange(true);
623+
configFactory.includeSchemaChanges(true);
624624

625625
FlinkSourceProvider sourceProvider =
626626
(FlinkSourceProvider)

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
4040
private final int lsnCommitCheckpointsDelay;
4141
private final boolean includePartitionedTables;
4242
private final boolean includeDatabaseInTableId;
43-
private final boolean schemaChangeEnabled;
4443

4544
public PostgresSourceConfig(
4645
int subtaskId,
@@ -72,8 +71,7 @@ public PostgresSourceConfig(
7271
int lsnCommitCheckpointsDelay,
7372
boolean assignUnboundedChunkFirst,
7473
boolean includePartitionedTables,
75-
boolean includeDatabaseInTableId,
76-
boolean schemaChangeEnabled) {
74+
boolean includeDatabaseInTableId) {
7775
super(
7876
startupOptions,
7977
databaseList,
@@ -105,7 +103,6 @@ public PostgresSourceConfig(
105103
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
106104
this.includePartitionedTables = includePartitionedTables;
107105
this.includeDatabaseInTableId = includeDatabaseInTableId;
108-
this.schemaChangeEnabled = schemaChangeEnabled;
109106
}
110107

111108
/**
@@ -159,9 +156,4 @@ public PostgresConnectorConfig getDbzConnectorConfig() {
159156
public boolean isIncludeDatabaseInTableId() {
160157
return includeDatabaseInTableId;
161158
}
162-
163-
/** Returns whether to infer column types via JDBC TypeRegistry on schema change events. */
164-
public boolean isSchemaChangeEnabled() {
165-
return schemaChangeEnabled;
166-
}
167159
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
5757
private boolean includeDatabaseInTableId =
5858
PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue();
5959

60-
private boolean schemaChangeEnabled = false;
61-
6260
/** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */
6361
@Override
6462
public PostgresSourceConfig create(int subtaskId) {
@@ -142,8 +140,7 @@ public PostgresSourceConfig create(int subtaskId) {
142140
lsnCommitCheckpointsDelay,
143141
assignUnboundedChunkFirst,
144142
includePartitionedTables,
145-
includeDatabaseInTableId,
146-
schemaChangeEnabled);
143+
includeDatabaseInTableId);
147144
}
148145

149146
/**
@@ -201,9 +198,4 @@ public void setIncludePartitionedTables(boolean includePartitionedTables) {
201198
public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) {
202199
this.includeDatabaseInTableId = includeDatabaseInTableId;
203200
}
204-
205-
/** Set whether to infer schema change event on relation message. */
206-
public void enableSchemaChange(boolean schemaChangeEnabled) {
207-
this.schemaChangeEnabled = schemaChangeEnabled;
208-
}
209201
}

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,14 @@ void testSyncWholeDatabase() throws Exception {
116116
+ " scan.startup.mode: initial\n"
117117
+ " server-time-zone: UTC\n"
118118
+ " connect.timeout: 120s\n"
119+
+ " schema-change.enabled: true\n"
119120
+ "\n"
120121
+ "sink:\n"
121122
+ " type: values\n"
122123
+ "\n"
123124
+ "pipeline:\n"
124-
+ " parallelism: %d",
125+
+ " parallelism: %d\n"
126+
+ " schema.change.behavior: evolve",
125127
INTER_CONTAINER_POSTGRES_ALIAS,
126128
5432,
127129
POSTGRES_TEST_USER,
@@ -172,5 +174,47 @@ void testSyncWholeDatabase() throws Exception {
172174
LOG.error("Update table for CDC failed.", e);
173175
throw new RuntimeException(e);
174176
}
177+
178+
LOG.info("Begin schema change stage.");
179+
180+
try (Connection conn =
181+
getJdbcConnection(
182+
POSTGRES_CONTAINER, postgresInventoryDatabase.getDatabaseName());
183+
Statement stat = conn.createStatement()) {
184+
// Test ADD COLUMN
185+
stat.execute("ALTER TABLE inventory.products ADD COLUMN category VARCHAR(255);");
186+
stat.execute(
187+
"INSERT INTO inventory.products VALUES (default, 'widget', 'A small widget', 1.5, 'tools');");
188+
189+
// Test DROP COLUMN
190+
stat.execute("ALTER TABLE inventory.products DROP COLUMN weight;");
191+
stat.execute(
192+
"INSERT INTO inventory.products VALUES (default, 'gadget', 'A useful gadget', 'electronics');");
193+
194+
// Test RENAME COLUMN
195+
stat.execute(
196+
"ALTER TABLE inventory.products RENAME COLUMN category TO product_category;");
197+
stat.execute(
198+
"INSERT INTO inventory.products VALUES (default, 'gizmo', 'A fancy gizmo', 'gadgets');");
199+
} catch (Exception e) {
200+
LOG.error("Schema change test failed.", e);
201+
throw new RuntimeException(e);
202+
}
203+
204+
// Validate schema change events and corresponding data
205+
waitUntilSpecificEvent(
206+
"AddColumnEvent{tableId=inventory.products, addedColumns=[ColumnWithPosition{column=`category` VARCHAR(255), position=LAST, existedColumnName=null}]}");
207+
waitUntilSpecificEvent(
208+
"DataChangeEvent{tableId=inventory.products, before=[], after=[110, widget, A small widget, 1.5, tools], op=INSERT, meta=()}");
209+
210+
waitUntilSpecificEvent(
211+
"DropColumnEvent{tableId=inventory.products, droppedColumnNames=[weight]}");
212+
waitUntilSpecificEvent(
213+
"DataChangeEvent{tableId=inventory.products, before=[], after=[111, gadget, A useful gadget, electronics], op=INSERT, meta=()}");
214+
215+
waitUntilSpecificEvent(
216+
"RenameColumnEvent{tableId=inventory.products, nameMapping={category=product_category}}");
217+
waitUntilSpecificEvent(
218+
"DataChangeEvent{tableId=inventory.products, before=[], after=[112, gizmo, A fancy gizmo, gadgets], op=INSERT, meta=()}");
175219
}
176220
}

0 commit comments

Comments
 (0)