Skip to content

Commit 67a5ca1

Browse files
authored
[FLINK-37479][postgres] Add support for PARTITIONED TABLE (apache#4004)
1 parent ea384c8 commit 67a5ca1

16 files changed

Lines changed: 301 additions & 18 deletions

File tree

docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,18 @@ Connector Options
280280
<li>false(默认):所有类型的消息都保持原样下发。</li>
281281
</td>
282282
</tr>
283+
<tr>
284+
<td>scan.include-partitioned-tables.enabled</td>
285+
<td>optional</td>
286+
<td style="word-wrap: break-word;">false</td>
287+
<td>Boolean</td>
288+
<td>
289+
Whether to enable reading partitioned tables via partition root.<br>
290+
If enabled:
291+
(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true
292+
(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.
293+
</td>
294+
</tr>
283295
</tbody>
284296
</table>
285297
</div>

docs/content/docs/connectors/flink-sources/postgres-cdc.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,18 @@ SELECT * FROM shipments;
277277
<li>false (default): All types of messages are sent as is.</li>
278278
</td>
279279
</tr>
280+
<tr>
281+
<td>scan.include-partitioned-tables.enabled</td>
282+
<td>optional</td>
283+
<td style="word-wrap: break-word;">false</td>
284+
<td>Boolean</td>
285+
<td>
286+
Whether to enable reading partitioned tables via partition root.<br>
287+
If enabled:
288+
(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true
289+
(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.
290+
</td>
291+
</tr>
280292
</tbody>
281293
</table>
282294
</div>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ private void generateCreateTableEvent(PostgresSourceConfig sourceConfig) {
130130
TableDiscoveryUtils.listTables(
131131
sourceConfig.getDatabaseList().get(0),
132132
jdbc,
133-
sourceConfig.getTableFilters());
133+
sourceConfig.getTableFilters(),
134+
sourceConfig.includePartitionedTables());
134135
for (TableId tableId : capturedTableIds) {
135136
Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc);
136137
createTableEventCache.add(

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,14 @@ public ChunkSplitter createChunkSplitter(
176176
@Override
177177
public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
178178
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
179+
boolean includePartitionedTables =
180+
((PostgresSourceConfig) sourceConfig).includePartitionedTables();
179181
return TableDiscoveryUtils.listTables(
180182
// there is always a single database provided
181-
sourceConfig.getDatabaseList().get(0), jdbc, sourceConfig.getTableFilters());
183+
sourceConfig.getDatabaseList().get(0),
184+
jdbc,
185+
sourceConfig.getTableFilters(),
186+
includePartitionedTables);
182187
} catch (SQLException e) {
183188
throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e);
184189
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,12 @@ public PostgresSourceBuilder<T> lsnCommitCheckpointsDelay(int lsnCommitDelay) {
299299
return this;
300300
}
301301

302+
/** Whether the connector should read partitioned tables via partition root or not. */
303+
public PostgresSourceBuilder<T> includePartitionedTables(boolean includePartitionedTables) {
304+
this.configFactory.setIncludePartitionedTables(includePartitionedTables);
305+
return this;
306+
}
307+
302308
/**
303309
* Build the {@link PostgresIncrementalSource}.
304310
*

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
3838

3939
private final int subtaskId;
4040
private final int lsnCommitCheckpointsDelay;
41+
private final boolean includePartitionedTables;
4142

4243
public PostgresSourceConfig(
4344
int subtaskId,
@@ -67,7 +68,8 @@ public PostgresSourceConfig(
6768
boolean skipSnapshotBackfill,
6869
boolean isScanNewlyAddedTableEnabled,
6970
int lsnCommitCheckpointsDelay,
70-
boolean assignUnboundedChunkFirst) {
71+
boolean assignUnboundedChunkFirst,
72+
boolean includePartitionedTables) {
7173
super(
7274
startupOptions,
7375
databaseList,
@@ -97,6 +99,7 @@ public PostgresSourceConfig(
9799
assignUnboundedChunkFirst);
98100
this.subtaskId = subtaskId;
99101
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
102+
this.includePartitionedTables = includePartitionedTables;
100103
}
101104

102105
/**
@@ -117,6 +120,15 @@ public int getLsnCommitCheckpointsDelay() {
117120
return this.lsnCommitCheckpointsDelay;
118121
}
119122

123+
/**
124+
* Returns {@code includePartitionedTables} value.
125+
*
126+
* @return include partitioned table
127+
*/
128+
public boolean includePartitionedTables() {
129+
return includePartitionedTables;
130+
}
131+
120132
/**
121133
* Returns the slot name for backfill task.
122134
*

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
5252

5353
private int lsnCommitCheckpointsDelay;
5454

55+
private boolean includePartitionedTables;
56+
5557
/** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */
5658
@Override
5759
public PostgresSourceConfig create(int subtaskId) {
@@ -133,7 +135,8 @@ public PostgresSourceConfig create(int subtaskId) {
133135
skipSnapshotBackfill,
134136
scanNewlyAddedTableEnabled,
135137
lsnCommitCheckpointsDelay,
136-
assignUnboundedChunkFirst);
138+
assignUnboundedChunkFirst,
139+
includePartitionedTables);
137140
}
138141

139142
/**
@@ -181,4 +184,9 @@ public void heartbeatInterval(Duration heartbeatInterval) {
181184
public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) {
182185
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
183186
}
187+
188+
/** Enable include partitioned table. */
189+
public void setIncludePartitionedTables(boolean includePartitionedTables) {
190+
this.includePartitionedTables = includePartitionedTables;
191+
}
184192
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,14 @@ public class PostgresSourceOptions extends JdbcSourceOptions {
8787
+ "By setting this to higher value, the offset that is consumed by global slot will be "
8888
+ "committed after multiple checkpoint delays instead of after each checkpoint completion.\n"
8989
+ "This allows continuous recycle of log files in stream phase.");
90+
91+
public static final ConfigOption<Boolean> SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED =
92+
ConfigOptions.key("scan.include-partitioned-tables.enabled")
93+
.booleanType()
94+
.defaultValue(Boolean.FALSE)
95+
.withDescription(
96+
"Enable reading from partitioned table via partition root.\n"
97+
+ "If enabled:\n"
98+
+ "(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true\n"
99+
+ "(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.");
90100
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,17 @@ public class TableDiscoveryUtils {
3434
private static final Logger LOG = LoggerFactory.getLogger(TableDiscoveryUtils.class);
3535

3636
public static List<TableId> listTables(
37-
String database, JdbcConnection jdbc, RelationalTableFilters tableFilters)
37+
String database,
38+
JdbcConnection jdbc,
39+
RelationalTableFilters tableFilters,
40+
boolean includePartitionedTables)
3841
throws SQLException {
3942

40-
Set<TableId> allTableIds =
41-
jdbc.readTableNames(database, null, null, new String[] {"TABLE"});
43+
String[] tableTypes = new String[] {"TABLE"};
44+
if (includePartitionedTables) {
45+
tableTypes = new String[] {"TABLE", "PARTITIONED TABLE"};
46+
}
47+
Set<TableId> allTableIds = jdbc.readTableNames(database, null, null, tableTypes);
4248

4349
Set<TableId> capturedTables =
4450
allTableIds.stream()

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.DECODING_PLUGIN_NAME;
5656
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
5757
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.PG_PORT;
58+
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED;
5859
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
5960
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
6061
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
@@ -117,6 +118,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
117118
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
118119
boolean isScanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
119120
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
121+
boolean includePartitionedTables = config.get(SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED);
120122
boolean assignUnboundedChunkFirst =
121123
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
122124
boolean appendOnly = config.get(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
@@ -167,7 +169,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
167169
isScanNewlyAddedTableEnabled,
168170
lsnCommitCheckpointsDelay,
169171
assignUnboundedChunkFirst,
170-
appendOnly);
172+
appendOnly,
173+
includePartitionedTables);
171174
}
172175

173176
@Override
@@ -212,6 +215,7 @@ public Set<ConfigOption<?>> optionalOptions() {
212215
options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
213216
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
214217
options.add(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
218+
options.add(SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED);
215219
return options;
216220
}
217221

0 commit comments

Comments
 (0)