Skip to content

Commit 22b3166

Browse files
committed
fix
1 parent 6f56d77 commit 22b3166

1 file changed

Lines changed: 18 additions & 16 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,27 +45,29 @@ public class PostgresOffset extends Offset {
4545
// used by PostgresOffsetFactory
4646
PostgresOffset(Map<String, String> offset) {
4747
Map<String, String> filtered = new HashMap<>(offset);
48-
// When lsn == lsn_proc, WalPositionLocator is constructed with
49-
// (lastCommitStoredLsn=Y, lastEventStoredLsn=Y). In pgoutput non-streaming mode,
50-
// BEGIN and DML of the first new transaction after recovery share the same
51-
// data_start as the previous COMMIT LSN (Y). WalPositionLocator puts Y into
52-
// lsnSeen but sets startStreamingLsn to the new COMMIT LSN (Z), causing those
53-
// DML records to be silently dropped in the actual streaming phase
54-
// (Y in lsnSeen, Y != Z -> filtered).
48+
// When a checkpoint is taken right after a COMMIT (state-3a), all three LSN fields
49+
// converge to the same value: lsn == lsn_proc == lsn_commit.
50+
// Recovering from such a checkpoint constructs WalPositionLocator(C0, C0), which
51+
// causes the first new transaction's DML records (data_start=C0 in pgoutput) to be
52+
// silently dropped: they are added to lsnSeen during the find phase, but
53+
// startStreamingLsn is set to the next COMMIT (C1), so they are filtered in the
54+
// stream phase.
5555
//
56-
// Fix: when lsn == lsn_proc, remove lsn_proc and lsn_commit so that
57-
// WalPositionLocator is constructed with lastCommitStoredLsn=null, which
58-
// triggers the fast path: startStreamingLsn=firstLsnReceived=Y, switch-off
59-
// happens immediately and all messages pass through.
56+
// Fix: when lsn == lsn_proc == lsn_commit, remove lsn_proc and lsn_commit so that
57+
// WalPositionLocator is constructed with lastCommitStoredLsn=null, which triggers
58+
// the fast path: startStreamingLsn=firstLsnReceived=C0, all messages pass through.
59+
//
60+
// The triple-equality condition is safe: mid-transaction checkpoints (state-3b) have
61+
// lsn_commit pointing to the previous commit, so lsn_commit != lsn, and this branch
62+
// is not taken.
6063
//
61-
// This is fixed upstream in Debezium via DBZ-6204 (adding Operation.COMMIT to
62-
// the lastProcessedMessageType check in WalPositionLocator.resumeFromLsn):
63-
// https://github.com/debezium/debezium/commit/3b5740f1a836c8b438888f2458ebb1554320bac7
6464
// This workaround can be removed once Debezium is upgraded to a version that
65-
// includes DBZ-6204.
65+
// includes DBZ-6204:
66+
// https://github.com/debezium/debezium/commit/3b5740f1a836c8b438888f2458ebb1554320bac7
6667
String lsnVal = filtered.get(SourceInfo.LSN_KEY);
6768
String lsnProc = filtered.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
68-
if (lsnVal != null && lsnVal.equals(lsnProc)) {
69+
String lsnCommit = filtered.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
70+
if (lsnVal != null && lsnVal.equals(lsnProc) && lsnVal.equals(lsnCommit)) {
6971
filtered.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
7072
filtered.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
7173
}

0 commit comments

Comments
 (0)