diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java index 63fa7e9302c..9f9f0de6555 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask; +import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; @@ -81,6 +82,15 @@ default Offset displayCommittedOffset(C sourceConfig) { /** The task context used for fetch task to fetch data from external systems. */ FetchTask.Context createFetchTaskContext(C sourceConfig); + /** + * The stream fetcher used to fetch data of a stream split. Dialects may override this to + * provide connector-specific filtering or routing behavior. + */ + default IncrementalSourceStreamFetcher createStreamFetcher( + FetchTask.Context taskContext, int subtaskId) { + return new IncrementalSourceStreamFetcher(taskContext, subtaskId); + } + /** * We may need the offset corresponding to the checkpointId. For example, we should commit LSN * of checkpoint to postgres's slot. diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java index 98447280325..9563a4362ae 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java @@ -132,7 +132,7 @@ protected TableChanges getTableChangeRecord(SourceRecord element) throws IOExcep return TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true); } - private void updateStreamSplitState(SourceSplitState splitState, SourceRecord element) { + protected void updateStreamSplitState(SourceSplitState splitState, SourceRecord element) { if (splitState.isStreamSplitState()) { Offset position = getOffsetPosition(element); splitState.asStreamSplitState().setStartingOffset(position); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java index 84d687005a3..9b6b3482fa0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java @@ -268,7 +268,7 @@ private IncrementalSourceScanFetcher getScanFetcher() { private IncrementalSourceStreamFetcher getStreamFetcher() { if (reusedStreamFetcher == null) { reusedStreamFetcher = - new IncrementalSourceStreamFetcher( + dataSourceDialect.createStreamFetcher( dataSourceDialect.createFetchTaskContext(sourceConfig), subtaskId); } return reusedStreamFetcher; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java index 72c62d600d3..9ddb4157655 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -185,7 +185,7 @@ public void close() { * only the change event belong to [1024, 2048) and offset is after highWatermark1 should send. * */ - private boolean shouldEmit(SourceRecord sourceRecord) { + protected boolean shouldEmit(SourceRecord sourceRecord) { if (taskContext.isDataChangeRecord(sourceRecord)) { TableId tableId = taskContext.getTableId(sourceRecord); Offset position = taskContext.getStreamOffset(sourceRecord); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java index 4743247899a..dec287b0cea 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java @@ -26,11 +26,13 @@ import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask; +import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher; import org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext; import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask; import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext; import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask; +import org.apache.flink.cdc.connectors.postgres.source.reader.PostgresSourceStreamFetcher; import org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema; import org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils; import org.apache.flink.util.FlinkRuntimeException; @@ -239,6 +241,13 @@ public JdbcSourceFetchTaskContext createFetchTaskContext(JdbcSourceConfig taskSo return new PostgresSourceFetchTaskContext(taskSourceConfig, this); } + @Override + public IncrementalSourceStreamFetcher createStreamFetcher( + FetchTask.Context taskContext, int subtaskId) { + return new PostgresSourceStreamFetcher( + taskContext, subtaskId, sourceConfig.isLogicalMessageEnabled()); + } + @Override public void notifyCheckpointComplete(long checkpointId, Offset offset) throws Exception { if (streamFetchTask != null) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java index 53bb066b59d..e860705da7e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -315,6 +315,12 @@ public PostgresSourceBuilder includeDatabaseInTableId(boolean includeDatabase return this; } + /** Whether to emit Postgres logical decoding messages to the deserializer. */ + public PostgresSourceBuilder includeLogicalMessages(boolean includeLogicalMessages) { + this.configFactory.setIncludeLogicalMessages(includeLogicalMessages); + return this; + } + /** Whether to infer schema change event on relation message. */ public PostgresSourceBuilder includeSchemaChanges(boolean includeSchemaChanges) { this.configFactory.includeSchemaChanges(includeSchemaChanges); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java index 4ad4b3e7e9d..20d6e368944 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java @@ -40,6 +40,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig { private final int lsnCommitCheckpointsDelay; private final boolean includePartitionedTables; private final boolean includeDatabaseInTableId; + private final boolean includeLogicalMessages; public PostgresSourceConfig( int subtaskId, @@ -71,7 +72,8 @@ public PostgresSourceConfig( int lsnCommitCheckpointsDelay, boolean assignUnboundedChunkFirst, boolean includePartitionedTables, - boolean includeDatabaseInTableId) { + boolean includeDatabaseInTableId, + boolean includeLogicalMessages) { super( startupOptions, databaseList, @@ -103,6 +105,7 @@ public PostgresSourceConfig( this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay; this.includePartitionedTables = includePartitionedTables; this.includeDatabaseInTableId = includeDatabaseInTableId; + this.includeLogicalMessages = includeLogicalMessages; } /** @@ -156,4 +159,9 @@ public PostgresConnectorConfig getDbzConnectorConfig() { public boolean isIncludeDatabaseInTableId() { return includeDatabaseInTableId; } + + /** Returns whether to emit Postgres logical decoding messages to the deserializer. */ + public boolean isLogicalMessageEnabled() { + return includeLogicalMessages; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java index 847b1547461..3d08d26c439 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java @@ -57,6 +57,9 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { private boolean includeDatabaseInTableId = PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue(); + private boolean includeLogicalMessages = + PostgresSourceOptions.SCAN_LOGICAL_MESSAGE_ENABLED.defaultValue(); + /** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */ @Override public PostgresSourceConfig create(int subtaskId) { @@ -140,7 +143,8 @@ public PostgresSourceConfig create(int subtaskId) { lsnCommitCheckpointsDelay, assignUnboundedChunkFirst, includePartitionedTables, - includeDatabaseInTableId); + includeDatabaseInTableId, + includeLogicalMessages); } /** @@ -198,4 +202,9 @@ public void setIncludePartitionedTables(boolean includePartitionedTables) { public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) { this.includeDatabaseInTableId = includeDatabaseInTableId; } + + /** Set whether to emit Postgres logical decoding messages to the deserializer. */ + public void setIncludeLogicalMessages(boolean includeLogicalMessages) { + this.includeLogicalMessages = includeLogicalMessages; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java index db04ac14237..73f0e0c6173 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java @@ -98,6 +98,16 @@ public class PostgresSourceOptions extends JdbcSourceOptions { + "(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true\n" + "(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice."); + public static final ConfigOption SCAN_LOGICAL_MESSAGE_ENABLED = + ConfigOptions.key("scan.logical-message.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to emit Postgres logical messages produced by " + + "pg_logical_emit_message() to the deserializer. " + + "Disabled by default; logical messages are not bound to " + + "any table and are dropped by table-based watermark filtering."); + public static final ConfigOption TABLE_ID_INCLUDE_DATABASE = ConfigOptions.key("table-id.include-database") .booleanType() diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java index 7689eccf706..59da7b05b42 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java @@ -17,7 +17,9 @@ package org.apache.flink.cdc.connectors.postgres.source.reader; +import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState; import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics; import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter; import org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord; @@ -43,6 +45,18 @@ public PostgresSourceRecordEmitter( offsetFactory); } + @Override + protected void processElement( + SourceRecord element, SourceOutput output, SourceSplitState splitState) + throws Exception { + if (PostgresSourceStreamFetcher.isLogicalMessage(element)) { + updateStreamSplitState(splitState, element); + emitElement(element, output); + return; + } + super.processElement(element, output, splitState); + } + @Override protected TableChanges getTableChangeRecord(SourceRecord element) throws IOException { if (element instanceof PostgresSchemaRecord) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceStreamFetcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceStreamFetcher.java new file mode 100644 index 00000000000..32fc00d1707 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceStreamFetcher.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source.reader; + +import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask; +import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher; + +import io.debezium.data.Envelope; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +/** + * Stream fetcher for Postgres. When {@code includeLogicalMessages} is enabled, lets {@code + * pg_logical_emit_message} records (op="m") bypass table-based watermark filtering, since logical + * messages are not bound to a table. + */ +public class PostgresSourceStreamFetcher extends IncrementalSourceStreamFetcher { + + private final boolean includeLogicalMessages; + + public PostgresSourceStreamFetcher( + FetchTask.Context taskContext, int subtaskId, boolean includeLogicalMessages) { + super(taskContext, subtaskId); + this.includeLogicalMessages = includeLogicalMessages; + } + + @Override + protected boolean shouldEmit(SourceRecord sourceRecord) { + if (includeLogicalMessages && isLogicalMessage(sourceRecord)) { + return true; + } + return super.shouldEmit(sourceRecord); + } + + static boolean isLogicalMessage(SourceRecord record) { + if (record.value() instanceof Struct) { + Struct struct = (Struct) record.value(); + return struct.schema().field(Envelope.FieldName.OPERATION) != null + && "m".equals(struct.getString(Envelope.FieldName.OPERATION)); + } + return false; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceStreamFetcherITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceStreamFetcherITCase.java new file mode 100644 index 00000000000..bd9f2a4b444 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceStreamFetcherITCase.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source.reader; + +import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.base.source.assigner.StreamSplitAssigner; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords; +import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; +import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher; +import org.apache.flink.cdc.connectors.postgres.PostgresTestBase; +import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory; +import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext; +import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask; +import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory; +import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase; + +import io.debezium.data.Envelope; +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isHeartbeatEvent; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test for {@link PostgresSourceStreamFetcher} verifying that {@code + * pg_logical_emit_message} records reach the polled records when the feature flag is enabled and + * are dropped when it is disabled. + */ +class PostgresSourceStreamFetcherITCase extends PostgresTestBase { + + private static final String SCHEMA_NAME = "customer"; + private static final String TABLE_NAME = "Customers"; + private static final long POLL_TIMEOUT_MS = 30_000L; + + private final UniqueDatabase customDatabase = + new UniqueDatabase( + POSTGRES_CONTAINER, + "postgres", + "customer", + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword()); + + private String slotName; + + @BeforeEach + void before() throws SQLException { + customDatabase.createAndInitialize(); + this.slotName = getSlotName(); + } + + @AfterEach + void after() throws Exception { + Thread.sleep(1000L); + customDatabase.removeSlot(slotName); + } + + @Test + void logicalMessagesAreEmittedWhenEnabled() throws Exception { + IncrementalSourceStreamFetcher fetcher = startFetcher(true); + try { + emitLogicalMessageAndDataChange("cdc-test", "hello"); + + List records = + pollUntil( + fetcher, + r -> !isHeartbeatEvent(r), + recs -> recs.stream().anyMatch(this::isLogicalMessage)); + + assertThat(records).anyMatch(this::isLogicalMessage); + } finally { + fetcher.close(); + } + } + + @Test + void logicalMessagesAreDroppedWhenDisabled() throws Exception { + IncrementalSourceStreamFetcher fetcher = startFetcher(false); + try { + emitLogicalMessageAndDataChange("cdc-test", "hello"); + + // Wait until the trailing INSERT is observed; that proves the fetcher has caught up + // past the logical message, which should have been filtered out. + List records = + pollUntil( + fetcher, + r -> !isHeartbeatEvent(r), + recs -> + recs.stream() + .anyMatch( + r -> + r.toString() + .contains( + "logical-msg-marker"))); + + assertThat(records).noneMatch(this::isLogicalMessage); + } finally { + fetcher.close(); + } + } + + private IncrementalSourceStreamFetcher startFetcher(boolean includeLogicalMessages) + throws Exception { + PostgresSourceConfigFactory factory = + getMockPostgresSourceConfigFactory( + customDatabase, SCHEMA_NAME, TABLE_NAME, slotName, 10, true); + factory.startupOptions(StartupOptions.latest()); + factory.setIncludeLogicalMessages(includeLogicalMessages); + + PostgresSourceConfig sourceConfig = factory.create(0); + PostgresDialect dialect = new PostgresDialect(factory.create(0)); + PostgresSourceFetchTaskContext taskContext = + new PostgresSourceFetchTaskContext(sourceConfig, dialect); + + IncrementalSourceStreamFetcher fetcher = dialect.createStreamFetcher(taskContext, 0); + StreamSplit split = createStreamSplit(sourceConfig, dialect); + PostgresStreamFetchTask fetchTask = + (PostgresStreamFetchTask) dialect.createFetchTask(split); + fetcher.submitTask(fetchTask); + + // give the stream reader a moment to start consuming WAL + Thread.sleep(1000L); + return fetcher; + } + + private void emitLogicalMessageAndDataChange(String prefix, String payload) + throws SQLException { + try (Connection conn = + getJdbcConnection(POSTGRES_CONTAINER, customDatabase.getDatabaseName()); + Statement stmt = conn.createStatement()) { + stmt.execute( + String.format( + "SELECT pg_logical_emit_message(false, '%s', '%s')", prefix, payload)); + // A trailing INSERT acts as a marker so we can wait for the WAL to advance past the + // logical message even when it would be filtered out. + stmt.execute( + "INSERT INTO customer.\"Customers\" VALUES " + + "(9001, 'logical-msg-marker', 'Beijing', '111')"); + } + } + + private boolean isLogicalMessage(SourceRecord record) { + if (!(record.value() instanceof Struct)) { + return false; + } + Struct struct = (Struct) record.value(); + return struct.schema().field(Envelope.FieldName.OPERATION) != null + && "m".equals(struct.getString(Envelope.FieldName.OPERATION)); + } + + private List pollUntil( + IncrementalSourceStreamFetcher fetcher, + Predicate keep, + Predicate> done) + throws Exception { + List all = new ArrayList<>(); + long deadline = System.currentTimeMillis() + POLL_TIMEOUT_MS; + while (System.currentTimeMillis() < deadline) { + Iterator batch = fetcher.pollSplitRecords(); + if (batch != null) { + while (batch.hasNext()) { + Iterator it = batch.next().iterator(); + while (it.hasNext()) { + SourceRecord r = it.next(); + if (keep.test(r)) { + all.add(r); + } + } + } + } + if (done.test(all)) { + return all; + } + Thread.sleep(200L); + } + throw new AssertionError("Timed out waiting for expected records. Polled so far: " + all); + } + + private StreamSplit createStreamSplit( + PostgresSourceConfig sourceConfig, PostgresDialect dialect) throws Exception { + StreamSplitAssigner assigner = + new StreamSplitAssigner( + sourceConfig, + dialect, + new PostgresOffsetFactory(), + new MockSplitEnumeratorContext<>(1)); + assigner.open(); + + Map tableSchemas = + dialect.discoverDataCollectionSchemas(sourceConfig); + return StreamSplit.fillTableSchemas(assigner.createStreamSplit(), tableSchemas); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceStreamFetcherTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceStreamFetcherTest.java new file mode 100644 index 00000000000..710bfc0d697 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceStreamFetcherTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source.reader; + +import io.debezium.data.Envelope; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link PostgresSourceStreamFetcher#isLogicalMessage(SourceRecord)}. */ +class PostgresSourceStreamFetcherTest { + + private static final Schema ENVELOPE_WITH_OP = + SchemaBuilder.struct() + .field(Envelope.FieldName.OPERATION, Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + private static final Schema ENVELOPE_WITHOUT_OP = + SchemaBuilder.struct().field("source", Schema.OPTIONAL_STRING_SCHEMA).build(); + + @Test + void logicalMessageRecordIsDetected() { + SourceRecord record = recordWithOp("m"); + assertThat(PostgresSourceStreamFetcher.isLogicalMessage(record)).isTrue(); + } + + @Test + void dataChangeRecordsAreNotLogicalMessages() { + for (String op : new String[] {"c", "u", "d", "r", "t"}) { + SourceRecord record = recordWithOp(op); + assertThat(PostgresSourceStreamFetcher.isLogicalMessage(record)) + .as("op=%s should not be detected as logical message", op) + .isFalse(); + } + } + + @Test + void recordWithNullValueIsNotLogicalMessage() { + SourceRecord record = + new SourceRecord( + Collections.emptyMap(), + Collections.emptyMap(), + "topic", + ENVELOPE_WITH_OP, + null); + assertThat(PostgresSourceStreamFetcher.isLogicalMessage(record)).isFalse(); + } + + @Test + void recordWithoutOperationFieldIsNotLogicalMessage() { + Struct value = new Struct(ENVELOPE_WITHOUT_OP).put("source", "anything"); + SourceRecord record = + new SourceRecord( + Collections.emptyMap(), + Collections.emptyMap(), + "topic", + ENVELOPE_WITHOUT_OP, + value); + assertThat(PostgresSourceStreamFetcher.isLogicalMessage(record)).isFalse(); + } + + @Test + void recordWithNullOperationValueIsNotLogicalMessage() { + Struct value = new Struct(ENVELOPE_WITH_OP); + SourceRecord record = + new SourceRecord( + Collections.emptyMap(), + Collections.emptyMap(), + "topic", + ENVELOPE_WITH_OP, + value); + assertThat(PostgresSourceStreamFetcher.isLogicalMessage(record)).isFalse(); + } + + @Test + void recordWithNonStructValueIsNotLogicalMessage() { + SourceRecord record = + new SourceRecord( + Collections.emptyMap(), + Collections.emptyMap(), + "topic", + Schema.STRING_SCHEMA, + "not-a-struct"); + assertThat(PostgresSourceStreamFetcher.isLogicalMessage(record)).isFalse(); + } + + private static SourceRecord recordWithOp(String op) { + Struct value = new Struct(ENVELOPE_WITH_OP).put(Envelope.FieldName.OPERATION, op); + return new SourceRecord( + Collections.emptyMap(), Collections.emptyMap(), "topic", ENVELOPE_WITH_OP, value); + } +}