Skip to content

Commit dddcea6

Browse files
committed
fix
1 parent a19fdfe commit dddcea6

1 file changed

Lines changed: 5 additions & 1 deletion

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/offset/PostgresOffset.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
2121

22+
import io.debezium.connector.postgresql.PostgresOffsetContext;
2223
import io.debezium.connector.postgresql.SourceInfo;
2324
import io.debezium.connector.postgresql.connection.Lsn;
2425
import io.debezium.time.Conversions;
@@ -43,7 +44,10 @@ public class PostgresOffset extends Offset {
4344

4445
// used by PostgresOffsetFactory
4546
PostgresOffset(Map<String, String> offset) {
46-
this.offset = offset;
47+
Map<String, String> filtered = new HashMap<>(offset);
48+
filtered.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY); // lsn_proc
49+
filtered.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY); // lsn_commit
50+
this.offset = filtered;
4751
}
4852

4953
PostgresOffset(Long lsn, Long txId, Instant lastCommitTs) {

0 commit comments

Comments
 (0)