Skip to content

Commit 4875210

Browse files
committed
[FLINK-39582][postgres] Allow logical messages
1 parent b94d7b2 commit 4875210

13 files changed

Lines changed: 466 additions & 5 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
2525
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
2626
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
27+
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
2728

2829
import io.debezium.relational.TableId;
2930
import io.debezium.relational.history.TableChanges;
@@ -81,6 +82,15 @@ default Offset displayCommittedOffset(C sourceConfig) {
8182
/** The task context used for fetch task to fetch data from external systems. */
8283
FetchTask.Context createFetchTaskContext(C sourceConfig);
8384

85+
/**
86+
* The stream fetcher used to fetch data of a stream split. Dialects may override this to
87+
* provide connector-specific filtering or routing behavior.
88+
*/
89+
default IncrementalSourceStreamFetcher createStreamFetcher(
90+
FetchTask.Context taskContext, int subtaskId) {
91+
return new IncrementalSourceStreamFetcher(taskContext, subtaskId);
92+
}
93+
8494
/**
8595
* We may need the offset corresponding to the checkpointId. For example, we should commit LSN
8696
* of checkpoint to postgres's slot.

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ protected TableChanges getTableChangeRecord(SourceRecord element) throws IOExcep
132132
return TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
133133
}
134134

135-
private void updateStreamSplitState(SourceSplitState splitState, SourceRecord element) {
135+
protected void updateStreamSplitState(SourceSplitState splitState, SourceRecord element) {
136136
if (splitState.isStreamSplitState()) {
137137
Offset position = getOffsetPosition(element);
138138
splitState.asStreamSplitState().setStartingOffset(position);

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ private IncrementalSourceScanFetcher getScanFetcher() {
268268
private IncrementalSourceStreamFetcher getStreamFetcher() {
269269
if (reusedStreamFetcher == null) {
270270
reusedStreamFetcher =
271-
new IncrementalSourceStreamFetcher(
271+
dataSourceDialect.createStreamFetcher(
272272
dataSourceDialect.createFetchTaskContext(sourceConfig), subtaskId);
273273
}
274274
return reusedStreamFetcher;

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ public void close() {
185185
* only the change event belong to [1024, 2048) and offset is after highWatermark1 should send.
186186
* </pre>
187187
*/
188-
private boolean shouldEmit(SourceRecord sourceRecord) {
188+
protected boolean shouldEmit(SourceRecord sourceRecord) {
189189
if (taskContext.isDataChangeRecord(sourceRecord)) {
190190
TableId tableId = taskContext.getTableId(sourceRecord);
191191
Offset position = taskContext.getStreamOffset(sourceRecord);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
2727
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
2828
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
29+
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
2930
import org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
3031
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
3132
import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask;
3233
import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
3334
import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
35+
import org.apache.flink.cdc.connectors.postgres.source.reader.PostgresSourceStreamFetcher;
3436
import org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema;
3537
import org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
3638
import org.apache.flink.util.FlinkRuntimeException;
@@ -239,6 +241,13 @@ public JdbcSourceFetchTaskContext createFetchTaskContext(JdbcSourceConfig taskSo
239241
return new PostgresSourceFetchTaskContext(taskSourceConfig, this);
240242
}
241243

244+
@Override
245+
public IncrementalSourceStreamFetcher createStreamFetcher(
246+
FetchTask.Context taskContext, int subtaskId) {
247+
return new PostgresSourceStreamFetcher(
248+
taskContext, subtaskId, sourceConfig.isLogicalMessageEnabled());
249+
}
250+
242251
@Override
243252
public void notifyCheckpointComplete(long checkpointId, Offset offset) throws Exception {
244253
if (streamFetchTask != null) {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,12 @@ public PostgresSourceBuilder<T> includeDatabaseInTableId(boolean includeDatabase
315315
return this;
316316
}
317317

318+
/** Whether to emit Postgres logical decoding messages to the deserializer. */
319+
public PostgresSourceBuilder<T> includeLogicalMessages(boolean includeLogicalMessages) {
320+
this.configFactory.setIncludeLogicalMessages(includeLogicalMessages);
321+
return this;
322+
}
323+
318324
/** Whether to infer schema change event on relation message. */
319325
public PostgresSourceBuilder<T> includeSchemaChanges(boolean includeSchemaChanges) {
320326
this.configFactory.includeSchemaChanges(includeSchemaChanges);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
4040
private final int lsnCommitCheckpointsDelay;
4141
private final boolean includePartitionedTables;
4242
private final boolean includeDatabaseInTableId;
43+
private final boolean includeLogicalMessages;
4344

4445
public PostgresSourceConfig(
4546
int subtaskId,
@@ -71,7 +72,8 @@ public PostgresSourceConfig(
7172
int lsnCommitCheckpointsDelay,
7273
boolean assignUnboundedChunkFirst,
7374
boolean includePartitionedTables,
74-
boolean includeDatabaseInTableId) {
75+
boolean includeDatabaseInTableId,
76+
boolean includeLogicalMessages) {
7577
super(
7678
startupOptions,
7779
databaseList,
@@ -103,6 +105,7 @@ public PostgresSourceConfig(
103105
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
104106
this.includePartitionedTables = includePartitionedTables;
105107
this.includeDatabaseInTableId = includeDatabaseInTableId;
108+
this.includeLogicalMessages = includeLogicalMessages;
106109
}
107110

108111
/**
@@ -156,4 +159,9 @@ public PostgresConnectorConfig getDbzConnectorConfig() {
156159
public boolean isIncludeDatabaseInTableId() {
157160
return includeDatabaseInTableId;
158161
}
162+
163+
/** Returns whether to emit Postgres logical decoding messages to the deserializer. */
164+
public boolean isLogicalMessageEnabled() {
165+
return includeLogicalMessages;
166+
}
159167
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
5757
private boolean includeDatabaseInTableId =
5858
PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue();
5959

60+
private boolean includeLogicalMessages =
61+
PostgresSourceOptions.SCAN_LOGICAL_MESSAGE_ENABLED.defaultValue();
62+
6063
/** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */
6164
@Override
6265
public PostgresSourceConfig create(int subtaskId) {
@@ -140,7 +143,8 @@ public PostgresSourceConfig create(int subtaskId) {
140143
lsnCommitCheckpointsDelay,
141144
assignUnboundedChunkFirst,
142145
includePartitionedTables,
143-
includeDatabaseInTableId);
146+
includeDatabaseInTableId,
147+
includeLogicalMessages);
144148
}
145149

146150
/**
@@ -198,4 +202,9 @@ public void setIncludePartitionedTables(boolean includePartitionedTables) {
198202
public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) {
199203
this.includeDatabaseInTableId = includeDatabaseInTableId;
200204
}
205+
206+
/** Set whether to emit Postgres logical decoding messages to the deserializer. */
207+
public void setIncludeLogicalMessages(boolean includeLogicalMessages) {
208+
this.includeLogicalMessages = includeLogicalMessages;
209+
}
201210
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,16 @@ public class PostgresSourceOptions extends JdbcSourceOptions {
9898
+ "(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true\n"
9999
+ "(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.");
100100

101+
public static final ConfigOption<Boolean> SCAN_LOGICAL_MESSAGE_ENABLED =
102+
ConfigOptions.key("scan.logical-message.enabled")
103+
.booleanType()
104+
.defaultValue(false)
105+
.withDescription(
106+
"Whether to emit Postgres logical messages produced by "
107+
+ "pg_logical_emit_message() to the deserializer. "
108+
+ "Disabled by default; logical messages are not bound to "
109+
+ "any table and are dropped by table-based watermark filtering.");
110+
101111
public static final ConfigOption<Boolean> TABLE_ID_INCLUDE_DATABASE =
102112
ConfigOptions.key("table-id.include-database")
103113
.booleanType()

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.flink.cdc.connectors.postgres.source.reader;
1919

20+
import org.apache.flink.api.connector.source.SourceOutput;
2021
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
22+
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
2123
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
2224
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
2325
import org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
@@ -43,6 +45,18 @@ public PostgresSourceRecordEmitter(
4345
offsetFactory);
4446
}
4547

48+
@Override
49+
protected void processElement(
50+
SourceRecord element, SourceOutput<T> output, SourceSplitState splitState)
51+
throws Exception {
52+
if (PostgresSourceStreamFetcher.isLogicalMessage(element)) {
53+
updateStreamSplitState(splitState, element);
54+
emitElement(element, output);
55+
return;
56+
}
57+
super.processElement(element, output, splitState);
58+
}
59+
4660
@Override
4761
protected TableChanges getTableChangeRecord(SourceRecord element) throws IOException {
4862
if (element instanceof PostgresSchemaRecord) {

0 commit comments

Comments
 (0)