Skip to content

Commit 402b594

Browse files
committed
feat: resolve GaussDB CDC EOFException and support SERIAL types
- Add SERIAL/BIGSERIAL support in GaussDBTypeUtils - Bundle PostgreSQL driver in flink-sql-connector-gaussdb-cdc to avoid NoClassDefFoundError - Implement GaussDbE2eITCase with robust replication slot cleanup - Improve replication connection stability with KeepAlive and socket timeout settings - Add diagnostic logging for GaussDB replication stream
1 parent bd6084c commit 402b594

12 files changed

Lines changed: 1325 additions & 38 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-gaussdb-cdc/docker/QUICK_START.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ CREATE TABLE orders_source (
252252
'schema-name' = 'public',
253253
'table-name' = 'orders',
254254
'slot.name' = 'flink_cdc_orders',
255-
'decoding.plugin.name' = 'mppdb_decoding'
255+
'decoding.plugin.name' = 'mppdb_decoding',
256+
'scan.incremental.snapshot.enabled' = 'true'
256257
);
257258
```
258259

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-gaussdb-cdc/src/main/java/io/debezium/connector/gaussdb/connection/GaussDBReplicationConnection.java

Lines changed: 78 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,10 @@ public GaussDBReplicationConnection(
8686
this.messageDecoder =
8787
Objects.requireNonNull(messageDecoder, "messageDecoder must not be null");
8888
this.typeRegistry = Objects.requireNonNull(typeRegistry, "typeRegistry must not be null");
89-
// Do not use the passed-in connection: it may be created by GaussDB JDBC driver which
90-
// does not support unwrap(), while this class requires PostgreSQL JDBC driver's replication
89+
// Do not use the passed-in connection: it may be created by GaussDB JDBC driver
90+
// which
91+
// does not support unwrap(), while this class requires PostgreSQL JDBC driver's
92+
// replication
9193
// API.
9294
// Let connectIfNeeded() create a PostgreSQL JDBC connection via
9395
// openReplicationConnection().
@@ -379,31 +381,53 @@ private PGReplicationStream startPgReplicationStream(Lsn startLsn) throws SQLExc
379381
if (c == null) {
380382
throw new SQLException("Replication connection is not initialized");
381383
}
382-
final PGConnection pgConnection = c.unwrap(PGConnection.class);
383-
384-
ChainedLogicalStreamBuilder builder =
385-
pgConnection
386-
.getReplicationAPI()
387-
.replicationStream()
388-
.logical()
389-
.withSlotName("\"" + slotName + "\"")
390-
// GaussDB-specific options from official documentation
391-
.withSlotOption("include-xids", false)
392-
.withSlotOption("skip-empty-xacts", true);
393-
if (startLsn != null) {
394-
builder = builder.withStartPosition(convertToGaussDBLsn(startLsn));
395-
}
396-
messageDecoder.setContainsMetadata(false);
397384

398-
if (statusUpdateInterval != null && statusUpdateInterval.toMillis() > 0) {
399-
builder.withStatusInterval(
400-
Math.toIntExact(statusUpdateInterval.toMillis()), TimeUnit.MILLISECONDS);
401-
}
385+
LOG.info(
386+
"Starting PG replication stream with slot '{}', startLsn={}, plugin={}",
387+
slotName,
388+
startLsn,
389+
pluginName);
390+
391+
try {
392+
final PGConnection pgConnection = c.unwrap(PGConnection.class);
393+
394+
ChainedLogicalStreamBuilder builder =
395+
pgConnection
396+
.getReplicationAPI()
397+
.replicationStream()
398+
.logical()
399+
.withSlotName(slotName) // GaussDB may not need extra quotes
400+
// GaussDB-specific options from official documentation
401+
.withSlotOption("include-xids", false)
402+
.withSlotOption("skip-empty-xacts", true);
403+
if (startLsn != null) {
404+
builder = builder.withStartPosition(convertToGaussDBLsn(startLsn));
405+
LOG.info("Replication stream will start from LSN: {}", startLsn);
406+
} else {
407+
LOG.info("Replication stream will start from current WAL position");
408+
}
409+
messageDecoder.setContainsMetadata(false);
402410

403-
final PGReplicationStream stream = builder.start();
404-
// ensure server sees feedback quickly
405-
stream.forceUpdateStatus();
406-
return stream;
411+
if (statusUpdateInterval != null && statusUpdateInterval.toMillis() > 0) {
412+
builder.withStatusInterval(
413+
Math.toIntExact(statusUpdateInterval.toMillis()), TimeUnit.MILLISECONDS);
414+
}
415+
416+
LOG.info("Calling builder.start() to create replication stream...");
417+
final PGReplicationStream stream = builder.start();
418+
LOG.info("Replication stream started successfully");
419+
// ensure server sees feedback quickly
420+
stream.forceUpdateStatus();
421+
LOG.info("Initial status update sent to server");
422+
return stream;
423+
} catch (SQLException e) {
424+
LOG.error(
425+
"Failed to start replication stream for slot '{}': {}",
426+
slotName,
427+
e.getMessage(),
428+
e);
429+
throw e;
430+
}
407431
}
408432

409433
private static boolean isSlotAlreadyExists(SQLException e) {
@@ -429,6 +453,14 @@ private static Connection openReplicationConnection(GaussDBConnectorConfig conne
429453
props.setProperty("assumeMinServerVersion", "9.4");
430454
props.setProperty("replication", "database");
431455
props.setProperty("preferQueryMode", "simple");
456+
// Disable SSL for replication connection to avoid protocol mismatch
457+
props.setProperty("sslmode", "disable");
458+
// Add TCP keepalive to prevent connection drops
459+
props.setProperty("tcpKeepAlive", "true");
460+
// Increase socket timeout for replication streams
461+
props.setProperty("socketTimeout", "0"); // 0 means infinite for replication
462+
// Set connection timeout
463+
props.setProperty("connectTimeout", "60");
432464

433465
final Duration timeout =
434466
connectorConfig.connectionTimeout() != null
@@ -438,9 +470,29 @@ private static Connection openReplicationConnection(GaussDBConnectorConfig conne
438470
final int loginTimeoutSeconds =
439471
(int) Math.min(Integer.MAX_VALUE, Math.max(0L, timeout.toSeconds()));
440472

473+
LOG.info(
474+
"Opening replication connection to {}:{}/{} with user={}, replication={}, timeout={}s",
475+
host,
476+
port,
477+
database,
478+
props.getProperty("user"),
479+
props.getProperty("replication"),
480+
loginTimeoutSeconds);
481+
441482
try {
442483
DriverManager.setLoginTimeout(loginTimeoutSeconds);
443-
return DriverManager.getConnection(url, props);
484+
Connection conn = DriverManager.getConnection(url, props);
485+
LOG.info("Replication connection established successfully");
486+
return conn;
487+
} catch (SQLException e) {
488+
LOG.error(
489+
"Failed to establish replication connection to {}:{}/{}. Error: {}",
490+
host,
491+
port,
492+
database,
493+
e.getMessage(),
494+
e);
495+
throw e;
444496
} finally {
445497
DriverManager.setLoginTimeout(previousLoginTimeoutSeconds);
446498
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-gaussdb-cdc/src/main/java/org/apache/flink/cdc/connectors/gaussdb/source/GaussDBDialect.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import io.debezium.relational.TableId;
3939
import io.debezium.relational.Tables;
4040
import io.debezium.relational.history.TableChanges.TableChange;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
4143

4244
import java.sql.SQLException;
4345
import java.util.List;
@@ -46,6 +48,7 @@
4648
/** The dialect for GaussDB. */
4749
public class GaussDBDialect implements JdbcDataSourceDialect {
4850

51+
private static final Logger LOG = LoggerFactory.getLogger(GaussDBDialect.class);
4952
private static final long serialVersionUID = 1L;
5053
private static final String CONNECTION_NAME = "gaussdb-cdc-connector";
5154

@@ -65,17 +68,25 @@ public String getName() {
6568

6669
@Override
6770
public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
71+
LOG.info("=== GaussDBDialect.openJdbcConnection STARTED ===");
6872
try {
6973
GaussDBSourceConfig gaussDBSourceConfig = (GaussDBSourceConfig) sourceConfig;
74+
LOG.info(
75+
"Creating JDBC connection to GaussDB: hostname={}, port={}, database={}",
76+
gaussDBSourceConfig.getHostname(),
77+
gaussDBSourceConfig.getPort(),
78+
gaussDBSourceConfig.getDatabaseList());
7079
JdbcConnection jdbcConnection =
7180
new JdbcConnection(
7281
gaussDBSourceConfig.getDbzConnectorConfig().getJdbcConfig(),
7382
new JdbcConnectionFactory(sourceConfig, getPooledDataSourceFactory()),
7483
"\"",
7584
"\"");
7685
jdbcConnection.connect();
86+
LOG.info("=== JDBC connection established successfully ===");
7787
return jdbcConnection;
7888
} catch (Exception e) {
89+
LOG.error("=== Failed to open JDBC connection ===", e);
7990
throw new FlinkRuntimeException(e);
8091
}
8192
}
@@ -87,9 +98,15 @@ public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
8798

8899
@Override
89100
public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
101+
LOG.info("=== GaussDBDialect.discoverDataCollections STARTED ===");
90102
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
91103
GaussDBSourceConfig gaussDBSourceConfig = (GaussDBSourceConfig) sourceConfig;
92104
List<String> schemaList = gaussDBSourceConfig.getSchemaList();
105+
LOG.info(
106+
"Discovering tables in database: {}, schemas: {}, table filters: {}",
107+
sourceConfig.getDatabaseList().get(0),
108+
schemaList,
109+
sourceConfig.getTableFilters());
93110

94111
List<TableId> tables =
95112
TableDiscoveryUtils.listTables(
@@ -98,21 +115,27 @@ public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
98115
jdbc,
99116
sourceConfig.getTableFilters(),
100117
schemaList);
118+
LOG.info("=== Discovered {} tables: {} ===", tables.size(), tables);
101119
return tables;
102120
} catch (SQLException e) {
121+
LOG.error("=== Failed to discover tables ===", e);
103122
throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e);
104123
}
105124
}
106125

107126
@Override
108127
public Map<TableId, TableChange> discoverDataCollectionSchemas(JdbcSourceConfig sourceConfig) {
128+
LOG.info("=== GaussDBDialect.discoverDataCollectionSchemas STARTED ===");
109129
final List<TableId> capturedTableIds = discoverDataCollections(sourceConfig);
130+
LOG.info("Fetching schemas for {} tables", capturedTableIds.size());
110131

111132
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
112133
// fetch table schemas
113134
Map<TableId, TableChange> schemas = queryTableSchema(jdbc, capturedTableIds);
135+
LOG.info("=== Successfully fetched {} table schemas ===", schemas.size());
114136
return schemas;
115137
} catch (Exception e) {
138+
LOG.error("=== Failed to discover table schemas ===", e);
116139
throw new FlinkRuntimeException(
117140
"Error to discover table schemas: " + e.getMessage(), e);
118141
}
@@ -179,9 +202,18 @@ public ChunkSplitter createChunkSplitter(
179202

180203
@Override
181204
public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase) {
205+
LOG.info(
206+
"=== GaussDBDialect.createFetchTask STARTED for split: {} ===",
207+
sourceSplitBase.splitId());
182208
if (sourceSplitBase.isSnapshotSplit()) {
209+
LOG.info(
210+
"Creating GaussDBScanFetchTask for snapshot split: {}",
211+
sourceSplitBase.splitId());
183212
return new GaussDBScanFetchTask(sourceSplitBase.asSnapshotSplit());
184213
} else {
214+
LOG.info(
215+
"Creating GaussDBStreamFetchTask for stream split: {}",
216+
sourceSplitBase.splitId());
185217
this.streamFetchTask = new GaussDBStreamFetchTask(sourceSplitBase.asStreamSplit());
186218
return this.streamFetchTask;
187219
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-gaussdb-cdc/src/main/java/org/apache/flink/cdc/connectors/gaussdb/source/GaussDBSourceBuilder.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.apache.flink.cdc.connectors.gaussdb.source.offset.GaussDBOffsetFactory;
2525
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
2626

27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
2730
import java.time.Duration;
2831
import java.util.Properties;
2932

@@ -33,6 +36,7 @@
3336
@Experimental
3437
public class GaussDBSourceBuilder<T> {
3538

39+
private static final Logger LOG = LoggerFactory.getLogger(GaussDBSourceBuilder.class);
3640
private final GaussDBSourceConfigFactory configFactory = new GaussDBSourceConfigFactory();
3741
private DebeziumDeserializationSchema<T> deserializer;
3842

@@ -272,10 +276,17 @@ public GaussDBSourceBuilder<T> assignUnboundedChunkFirst(boolean assignUnbounded
272276
* @return a GaussDBIncrementalSource with the settings made for this builder.
273277
*/
274278
public GaussDBIncrementalSource<T> build() {
279+
LOG.info("=== GaussDBSourceBuilder.build() STARTED ===");
280+
LOG.info("Building GaussDBIncrementalSource with config factory: {}", configFactory);
275281
GaussDBOffsetFactory offsetFactory = new GaussDBOffsetFactory();
282+
LOG.info("Created GaussDBOffsetFactory");
276283
GaussDBDialect dialect = new GaussDBDialect(configFactory.create(0));
277-
return new GaussDBIncrementalSource<>(
278-
configFactory, checkNotNull(deserializer), offsetFactory, dialect);
284+
LOG.info("Created GaussDBDialect");
285+
GaussDBIncrementalSource<T> source =
286+
new GaussDBIncrementalSource<>(
287+
configFactory, checkNotNull(deserializer), offsetFactory, dialect);
288+
LOG.info("=== GaussDBIncrementalSource created successfully ===");
289+
return source;
279290
}
280291

281292
public GaussDBSourceConfigFactory getConfigFactory() {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-gaussdb-cdc/src/main/java/org/apache/flink/cdc/connectors/gaussdb/source/utils/GaussDBTypeUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,11 @@ public static DataType convertGaussDBType(String gaussdbTypeName) {
8080
case "integer":
8181
case "int":
8282
case "int4":
83+
case "serial":
8384
return DataTypes.INT();
8485
case "bigint":
8586
case "int8":
87+
case "bigserial":
8688
return DataTypes.BIGINT();
8789
case "smallint":
8890
case "int2":

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-gaussdb-cdc/src/test/java/org/apache/flink/cdc/connectors/gaussdb/GaussDBTestBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ protected Connection getJdbcConnection() throws SQLException {
9292
props.setProperty("password", PASSWORD);
9393
// Extended timeouts for high-latency network (in seconds)
9494
props.setProperty("connectTimeout", "60");
95-
props.setProperty("socketTimeout", "60");
95+
// Set socketTimeout to 0 (infinite) to prevent timeout during long-running CDC operations
96+
props.setProperty("socketTimeout", "0");
9697
props.setProperty("loginTimeout", "60");
9798
props.setProperty("tcpKeepAlive", "true");
9899

0 commit comments

Comments
 (0)