4949/**
5050 * A lightweight replication connection for GaussDB.
5151 *
52- * <p>GaussDB exposes a PostgreSQL-compatible logical replication protocol; this class uses the
53- * PostgreSQL JDBC driver replication API and Debezium's {@link ReplicationStream} contract.
52+ * <p>
53+ * GaussDB exposes a PostgreSQL-compatible logical replication protocol; this
54+ * class uses the
55+ * PostgreSQL JDBC driver replication API and Debezium's
56+ * {@link ReplicationStream} contract.
5457 */
5558public class GaussDBReplicationConnection implements ReplicationConnection {
5659
@@ -66,6 +69,10 @@ public class GaussDBReplicationConnection implements ReplicationConnection {
6669 private final MppdbDecodingMessageDecoder messageDecoder ;
6770 private final TypeRegistry typeRegistry ;
6871
72+ // Ending position for bounded reads - when reached, stream returns NoopMessage
73+ // instead of closing
74+ private volatile Lsn endingPos ;
75+
6976 private volatile Connection connection ;
7077
7178 public GaussDBReplicationConnection (
@@ -77,14 +84,12 @@ public GaussDBReplicationConnection(
7784 MppdbDecodingMessageDecoder messageDecoder ,
7885 TypeRegistry typeRegistry ,
7986 Connection initialConnection ) {
80- this .connectorConfig =
81- Objects .requireNonNull (connectorConfig , "connectorConfig must not be null" );
87+ this .connectorConfig = Objects .requireNonNull (connectorConfig , "connectorConfig must not be null" );
8288 this .slotName = Objects .requireNonNull (slotName , "slotName must not be null" );
8389 this .pluginName = Objects .requireNonNull (pluginName , "pluginName must not be null" );
8490 this .dropSlotOnClose = dropSlotOnClose ;
8591 this .statusUpdateInterval = statusUpdateInterval ;
86- this .messageDecoder =
87- Objects .requireNonNull (messageDecoder , "messageDecoder must not be null" );
92+ this .messageDecoder = Objects .requireNonNull (messageDecoder , "messageDecoder must not be null" );
8893 this .typeRegistry = Objects .requireNonNull (typeRegistry , "typeRegistry must not be null" );
8994 // Do not use the passed-in connection: it may be created by GaussDB JDBC driver
9095 // which
@@ -94,6 +99,21 @@ public GaussDBReplicationConnection(
9499 // Let connectIfNeeded() create a PostgreSQL JDBC connection via
95100 // openReplicationConnection().
96101 this .connection = null ;
102+ this .endingPos = null ;
103+ }
104+
105+ /**
106+ * Sets the ending position for bounded reads. When the stream reaches this LSN,
107+ * it will return a NoopMessage instead of requiring stream close/reopen.
108+ * This avoids slot contention issues with GaussDB.
109+ */
110+ public void setEndingPos (Lsn endingPos ) {
111+ this .endingPos = endingPos ;
112+ LOG .debug ("Set ending position for bounded read: {}" , endingPos );
113+ }
114+
115+ public Lsn getEndingPos () {
116+ return this .endingPos ;
97117 }
98118
99119 @ Override
@@ -122,19 +142,87 @@ public synchronized ReplicationStream startStreaming(Lsn offset, WalPositionLoca
122142 throws SQLException , InterruptedException {
123143 initConnection ();
124144 final Lsn startLsn = offset != null && offset .isValid () ? offset : null ;
125- final PGReplicationStream stream = startPgReplicationStream (startLsn );
145+
146+ // GaussDB may take time to release slots after connection disruption.
147+ // Retry with exponential backoff to handle transient "Database connection
148+ // failed" errors.
149+ // This is based on GaussDB documentation recommendations for slot management.
150+ final int maxRetries = 3 ;
151+ final long initialDelayMs = 2000 ; // 2 seconds initial delay
152+
153+ SQLException lastException = null ;
154+ for (int attempt = 1 ; attempt <= maxRetries ; attempt ++) {
155+ try {
156+ // Ensure we have a fresh connection for retries
157+ if (attempt > 1 ) {
158+ LOG .info ("Retry attempt {} of {} for starting replication stream (slot: {})" ,
159+ attempt , maxRetries , slotName );
160+ // Force reconnect on retry
161+ closeConnectionQuietly ();
162+ connectIfNeeded ();
163+ }
164+
165+ final PGReplicationStream stream = startPgReplicationStream (startLsn );
166+ if (attempt > 1 ) {
167+ LOG .info ("Successfully started replication stream on retry attempt {}" , attempt );
168+ }
169+ return createReplicationStreamWrapper (stream , startLsn , walPosition );
170+ } catch (SQLException e ) {
171+ lastException = e ;
172+ String message = e .getMessage ();
173+ boolean isRetryable = message != null && (message .contains ("Database connection failed" ) ||
174+ message .contains ("EOF Exception" ) ||
175+ message .contains ("Connection reset" ) ||
176+ message .contains ("starting copy" ));
177+
178+ if (!isRetryable || attempt >= maxRetries ) {
179+ LOG .error ("Failed to start replication stream after {} attempts: {}" ,
180+ attempt , e .getMessage ());
181+ throw e ;
182+ }
183+
184+ long delayMs = initialDelayMs * (1L << (attempt - 1 )); // Exponential backoff
185+ LOG .warn ("Failed to start replication stream (attempt {}/{}): {}. Retrying in {}ms..." ,
186+ attempt , maxRetries , e .getMessage (), delayMs );
187+ try {
188+ Thread .sleep (delayMs );
189+ } catch (InterruptedException ie ) {
190+ Thread .currentThread ().interrupt ();
191+ throw new SQLException ("Interrupted while waiting to retry replication stream" , ie );
192+ }
193+ }
194+ }
195+
196+ // Should not reach here, but throw the last exception if we do
197+ throw lastException != null ? lastException
198+ : new SQLException ("Failed to start replication stream after max retries" );
199+ }
200+
201+ private void closeConnectionQuietly () {
202+ try {
203+ if (connection != null && !connection .isClosed ()) {
204+ connection .close ();
205+ }
206+ } catch (Exception e ) {
207+ LOG .debug ("Error closing connection during retry: {}" , e .getMessage ());
208+ } finally {
209+ connection = null ;
210+ }
211+ }
212+
213+ private ReplicationStream createReplicationStreamWrapper (PGReplicationStream stream , Lsn startLsn ,
214+ WalPositionLocator walPosition ) {
126215 return new ReplicationStream () {
127216 private static final int CHECK_WARNINGS_AFTER_COUNT = 100 ;
128217
129218 private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT ;
130219 private ExecutorService keepAliveExecutor = null ;
131220 private AtomicBoolean keepAliveRunning ;
132- private final Metronome metronome =
133- Metronome .sleeper (
134- statusUpdateInterval != null
135- ? statusUpdateInterval
136- : Duration .ofSeconds (10 ),
137- Clock .SYSTEM );
221+ private final Metronome metronome = Metronome .sleeper (
222+ statusUpdateInterval != null
223+ ? statusUpdateInterval
224+ : Duration .ofSeconds (10 ),
225+ Clock .SYSTEM );
138226
139227 private volatile Lsn lastReceivedLsn ;
140228
@@ -144,6 +232,16 @@ public void read(ReplicationMessageProcessor processor)
144232 processWarnings (false );
145233 ByteBuffer read = stream .read ();
146234 final Lsn lastReceiveLsn = convertToLsn (stream .getLastReceiveLSN ());
235+
236+ // Check if we've reached the ending position for bounded reads
237+ if (reachEnd (lastReceiveLsn )) {
238+ lastReceivedLsn = lastReceiveLsn ;
239+ LOG .trace ("Reached ending position at LSN {}, returning NoopMessage" , lastReceivedLsn );
240+ // Process a no-op message to indicate we've reached the end
241+ processor .process (null );
242+ return ;
243+ }
244+
147245 if (messageDecoder .shouldMessageBeSkipped (
148246 read , lastReceiveLsn , startLsn , walPosition )) {
149247 return ;
@@ -157,6 +255,16 @@ public boolean readPending(ReplicationMessageProcessor processor)
157255 processWarnings (false );
158256 ByteBuffer read = stream .readPending ();
159257 final Lsn lastReceiveLsn = convertToLsn (stream .getLastReceiveLSN ());
258+
259+ // Check if we've reached the ending position for bounded reads
260+ if (reachEnd (lastReceiveLsn )) {
261+ lastReceivedLsn = lastReceiveLsn ;
262+ LOG .trace ("Reached ending position at LSN {}, returning NoopMessage" , lastReceivedLsn );
263+ // Process a no-op message to indicate we've reached the end
264+ processor .process (null );
265+ return true ;
266+ }
267+
160268 if (read == null ) {
161269 return false ;
162270 }
@@ -168,6 +276,18 @@ public boolean readPending(ReplicationMessageProcessor processor)
168276 return true ;
169277 }
170278
279+ /**
280+ * Checks if the current LSN has reached or exceeded the ending position.
281+ * Used for bounded reads to stop streaming at a specific LSN.
282+ */
283+ private boolean reachEnd (Lsn receivedLsn ) {
284+ if (receivedLsn == null ) {
285+ return false ;
286+ }
287+ Lsn ending = endingPos ;
288+ return ending != null && ending .compareTo (receivedLsn ) <= 0 ;
289+ }
290+
171291 private void deserializeMessages (
172292 ByteBuffer buffer , ReplicationMessageProcessor processor )
173293 throws SQLException , InterruptedException {
@@ -225,7 +345,19 @@ public void stopKeepAlive() {
225345 @ Override
226346 public void close () throws Exception {
227347 processWarnings (true );
228- stream .close ();
348+ // GaussDB doesn't properly handle the PostgreSQL CopyBothResponse close
349+ // protocol.
350+ // When closing the replication stream, GaussDB may reset the connection,
351+ // causing EOFException or SocketException. We catch and log these errors
352+ // rather than propagating them, since the stream is already being closed.
353+ try {
354+ stream .close ();
355+ } catch (Exception e ) {
356+ LOG .warn (
357+ "Error closing replication stream (expected with GaussDB): {}" ,
358+ e .getMessage ());
359+ LOG .debug ("Full exception during stream close" , e );
360+ }
229361 }
230362
231363 private void processWarnings (final boolean forced ) throws SQLException {
@@ -249,8 +381,8 @@ private void processWarnings(final boolean forced) throws SQLException {
249381 }
250382
251383 @ Override
252- public synchronized java .util .Optional <io .debezium .connector .postgresql .spi .SlotCreationResult >
253- createReplicationSlot () throws SQLException {
384+ public synchronized java .util .Optional <io .debezium .connector .postgresql .spi .SlotCreationResult > createReplicationSlot ()
385+ throws SQLException {
254386 try {
255387 connectIfNeeded ();
256388 } catch (InterruptedException e ) {
@@ -263,21 +395,19 @@ private void processWarnings(final boolean forced) throws SQLException {
263395 }
264396 try (Statement statement = conn .createStatement ()) {
265397 try {
266- final String createCommand =
267- String .format (
268- "CREATE_REPLICATION_SLOT \" %s\" LOGICAL %s" , slotName , pluginName );
398+ final String createCommand = String .format (
399+ "CREATE_REPLICATION_SLOT \" %s\" LOGICAL %s" , slotName , pluginName );
269400 LOG .info ("Creating replication slot with command {}" , createCommand );
270401 statement .execute (createCommand );
271402 } catch (SQLException first ) {
272403 if (isSlotAlreadyExists (first )) {
273404 throw first ;
274405 }
275406 // Fallback to PostgreSQL-compatible function if supported.
276- try (ResultSet rs =
277- statement .executeQuery (
278- String .format (
279- "SELECT * FROM pg_create_logical_replication_slot('%s', '%s')" ,
280- slotName , pluginName ))) {
407+ try (ResultSet rs = statement .executeQuery (
408+ String .format (
409+ "SELECT * FROM pg_create_logical_replication_slot('%s', '%s')" ,
410+ slotName , pluginName ))) {
281411 if (!rs .next ()) {
282412 throw first ;
283413 }
@@ -303,8 +433,7 @@ public synchronized void reconnect() throws SQLException {
303433 connectIfNeeded ();
304434 } catch (InterruptedException e ) {
305435 Thread .currentThread ().interrupt ();
306- SQLException interrupted =
307- new SQLException ("Interrupted while reconnecting replication connection" , e );
436+ SQLException interrupted = new SQLException ("Interrupted while reconnecting replication connection" , e );
308437 throw interrupted ;
309438 }
310439 }
@@ -391,15 +520,14 @@ private PGReplicationStream startPgReplicationStream(Lsn startLsn) throws SQLExc
391520 try {
392521 final PGConnection pgConnection = c .unwrap (PGConnection .class );
393522
394- ChainedLogicalStreamBuilder builder =
395- pgConnection
396- .getReplicationAPI ()
397- .replicationStream ()
398- .logical ()
399- .withSlotName (slotName ) // GaussDB may not need extra quotes
400- // GaussDB-specific options from official documentation
401- .withSlotOption ("include-xids" , false )
402- .withSlotOption ("skip-empty-xacts" , true );
523+ ChainedLogicalStreamBuilder builder = pgConnection
524+ .getReplicationAPI ()
525+ .replicationStream ()
526+ .logical ()
527+ .withSlotName (slotName ) // GaussDB may not need extra quotes
528+ // GaussDB-specific options from official documentation
529+ .withSlotOption ("include-xids" , false )
530+ .withSlotOption ("skip-empty-xacts" , true );
403531 if (startLsn != null ) {
404532 builder = builder .withStartPosition (convertToGaussDBLsn (startLsn ));
405533 LOG .info ("Replication stream will start from LSN: {}" , startLsn );
@@ -413,12 +541,21 @@ private PGReplicationStream startPgReplicationStream(Lsn startLsn) throws SQLExc
413541 Math .toIntExact (statusUpdateInterval .toMillis ()), TimeUnit .MILLISECONDS );
414542 }
415543
416- LOG .info ("Calling builder.start() to create replication stream..." );
544+ LOG .info (\ "Calling builder .start () to create replication stream ...\ " );
417545 final PGReplicationStream stream = builder .start ();
418- LOG .info ("Replication stream started successfully" );
546+ LOG .info (\" Replication stream started successfully \" );
547+
548+ // Small delay to stabilize connection when connections are opened/closed in fast sequence
549+ // See reference implementation in GaussDB-For-Apache-Flink project
550+ try {
551+ Thread .sleep (10 );
552+ } catch (InterruptedException ie ) {
553+ Thread .currentThread ().interrupt ();
554+ }
555+
419556 // ensure server sees feedback quickly
420557 stream .forceUpdateStatus ();
421- LOG .info ("Initial status update sent to server" );
558+ LOG .info (\ "Initial status update sent to server \ " );
422559 return stream ;
423560 } catch (SQLException e ) {
424561 LOG .error (
@@ -462,13 +599,11 @@ private static Connection openReplicationConnection(GaussDBConnectorConfig conne
462599 // Set connection timeout
463600 props .setProperty ("connectTimeout" , "60" );
464601
465- final Duration timeout =
466- connectorConfig .connectionTimeout () != null
467- ? connectorConfig .connectionTimeout ()
468- : Duration .ofSeconds (30 );
602+ final Duration timeout = connectorConfig .connectionTimeout () != null
603+ ? connectorConfig .connectionTimeout ()
604+ : Duration .ofSeconds (30 );
469605 final int previousLoginTimeoutSeconds = DriverManager .getLoginTimeout ();
470- final int loginTimeoutSeconds =
471- (int ) Math .min (Integer .MAX_VALUE , Math .max (0L , timeout .toSeconds ()));
606+ final int loginTimeoutSeconds = (int ) Math .min (Integer .MAX_VALUE , Math .max (0L , timeout .toSeconds ()));
472607
473608 LOG .info (
474609 "Opening replication connection to {}:{}/{} with user={}, replication={}, timeout={}s" ,
0 commit comments