Skip to content

Commit 6f56d77

Browse files
committed
fix
1 parent 36cb6eb commit 6f56d77

1 file changed

Lines changed: 24 additions & 2 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,30 @@ public class PostgresOffset extends Offset {
4545
// used by PostgresOffsetFactory
4646
PostgresOffset(Map<String, String> offset) {
4747
Map<String, String> filtered = new HashMap<>(offset);
48-
filtered.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY); // lsn_proc
49-
filtered.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); // lsn_commit
48+
// When lsn == lsn_proc, WalPositionLocator is constructed with
49+
// (lastCommitStoredLsn=Y, lastEventStoredLsn=Y). In pgoutput non-streaming mode,
50+
// BEGIN and DML of the first new transaction after recovery share the same
51+
// data_start as the previous COMMIT LSN (Y). WalPositionLocator puts Y into
52+
// lsnSeen but sets startStreamingLsn to the new COMMIT LSN (Z), causing those
53+
// DML records to be silently dropped in the actual streaming phase
54+
// (Y in lsnSeen, Y != Z -> filtered).
55+
//
56+
// Fix: when lsn == lsn_proc, remove lsn_proc and lsn_commit so that
57+
// WalPositionLocator is constructed with lastCommitStoredLsn=null, which
58+
// triggers the fast path: startStreamingLsn=firstLsnReceived=Y, switch-off
59+
// happens immediately and all messages pass through.
60+
//
61+
// This is fixed upstream in Debezium via DBZ-6204 (adding Operation.COMMIT to
62+
// the lastProcessedMessageType check in WalPositionLocator.resumeFromLsn):
63+
// https://github.com/debezium/debezium/commit/3b5740f1a836c8b438888f2458ebb1554320bac7
64+
// This workaround can be removed once Debezium is upgraded to a version that
65+
// includes DBZ-6204.
66+
String lsnVal = filtered.get(SourceInfo.LSN_KEY);
67+
String lsnProc = filtered.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
68+
if (lsnVal != null && lsnVal.equals(lsnProc)) {
69+
filtered.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
70+
filtered.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
71+
}
5072
this.offset = filtered;
5173
}
5274

0 commit comments

Comments
 (0)