Skip to content

Commit e49d2d4

Browse files
committed
Expose scan.newly-added-table.enabled option
Add the scan.newly-added-table.enabled YAML option to the Postgres Pipeline connector. The underlying SnapshotSplitAssigner.captureNewlyAddedTables() mechanism + PostgresSourceBuilder.scanNewlyAddedTableEnabled() builder method already exist in the postgres-cdc source; this PR adds the missing YAML-side wiring. Mirrors the same option already exposed by the MySQL Pipeline connector (MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED). Default is false, so the change is no-op for existing pipelines. When set to true, restoring from a savepoint will discover tables that match the source tables: pattern but were not part of the captured set at savepoint time — enabling DMS-style 'add a new table without re-snapshotting existing tables' workflows. Signed-off-by: Mehmet Can Şakiroğlu <cansakiroglu@gmail.com>
1 parent 7355d76 commit e49d2d4

2 files changed

Lines changed: 17 additions & 0 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
7373
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
7474
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
75+
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
7576
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
7677
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE;
7778
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCHEMA_CHANGE_ENABLED;
@@ -133,6 +134,7 @@ public DataSource createDataSource(Context context) {
133134
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
134135
boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE);
135136
boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED);
137+
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
136138

137139
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
138140
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -175,6 +177,7 @@ public DataSource createDataSource(Context context) {
175177
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
176178
.includeDatabaseInTableId(tableIdIncludeDatabase)
177179
.includeSchemaChanges(includeSchemaChanges)
180+
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
178181
.getConfigFactory();
179182

180183
List<TableId> tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null);
@@ -266,6 +269,7 @@ public Set<ConfigOption<?>> optionalOptions() {
266269
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
267270
options.add(TABLE_ID_INCLUDE_DATABASE);
268271
options.add(SCHEMA_CHANGE_ENABLED);
272+
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
269273
return options;
270274
}
271275

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,4 +281,17 @@ public class PostgresDataSourceOptions {
281281
.defaultValue(false)
282282
.withDescription(
283283
"Whether to infer CDC column types when processing pgoutput Relation messages.");
284+
285+
@Experimental
286+
public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
287+
ConfigOptions.key("scan.newly-added-table.enabled")
288+
.booleanType()
289+
.defaultValue(false)
290+
.withDescription(
291+
"Whether to scan the newly added tables or not. Defaults to false. "
292+
+ "This option only takes effect when restoring from a savepoint or checkpoint, "
293+
+ "and enables the existing SnapshotSplitAssigner#captureNewlyAddedTables() code path "
294+
+ "to discover tables that match the source `tables:` pattern but were not part of "
295+
+ "the captured set at savepoint time. Mirrors the MySQL Pipeline connector option "
296+
+ "of the same name.");
284297
}

0 commit comments

Comments
 (0)