Skip to content

Commit becc260

Browse files
committed
Drop @experimental + add factory test for scan.newly-added-table.enabled
1 parent e49d2d4 commit becc260

2 files changed

Lines changed: 20 additions & 1 deletion

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,6 @@ public class PostgresDataSourceOptions {
282282
.withDescription(
283283
"Whether to infer CDC column types when processing pgoutput Relation messages.");
284284

285-
@Experimental
286285
public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
287286
ConfigOptions.key("scan.newly-added-table.enabled")
288287
.booleanType()

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.HOSTNAME;
4949
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.PASSWORD;
5050
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.PG_PORT;
51+
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
5152
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SLOT_NAME;
5253
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES;
5354
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES_EXCLUDE;
@@ -97,6 +98,25 @@ public void testCreateDataSource() {
9798
.isEqualTo(Arrays.asList("inventory.products"));
9899
}
99100

101+
@Test
102+
public void testScanNewlyAddedTableEnabled() {
103+
Map<String, String> options = new HashMap<>();
104+
options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost());
105+
options.put(
106+
PG_PORT.key(), String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)));
107+
options.put(USERNAME.key(), TEST_USER);
108+
options.put(PASSWORD.key(), TEST_PASSWORD);
109+
options.put(TABLES.key(), POSTGRES_CONTAINER.getDatabaseName() + ".inventory.prod\\.*");
110+
options.put(SLOT_NAME.key(), slotName);
111+
options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
112+
113+
Factory.Context context = new MockContext(Configuration.fromMap(options));
114+
PostgresDataSourceFactory factory = new PostgresDataSourceFactory();
115+
PostgresDataSource dataSource = (PostgresDataSource) factory.createDataSource(context);
116+
117+
assertThat(dataSource.getPostgresSourceConfig().isScanNewlyAddedTableEnabled()).isTrue();
118+
}
119+
100120
@Test
101121
public void testNoMatchedTable() {
102122
Map<String, String> options = new HashMap<>();

0 commit comments

Comments
 (0)