Skip to content

Commit b922bb9

Browse files
authored
[Improve][CDC] Extract duplicate code (#8906)
1 parent 2657626 commit b922bb9

File tree

5 files changed

+57
-159
lines changed

5 files changed

+57
-159
lines changed

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java

+53
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,35 @@
2222
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
2323
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
2424
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
25+
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
26+
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
27+
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
2528
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
2629
import org.apache.seatunnel.connectors.cdc.debezium.ConnectTableChangeSerializer;
30+
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
2731

32+
import org.apache.kafka.connect.data.SchemaAndValue;
2833
import org.apache.kafka.connect.data.Struct;
2934
import org.apache.kafka.connect.json.JsonConverter;
3035
import org.apache.kafka.connect.source.SourceRecord;
3136

3237
import io.debezium.config.CommonConnectorConfig;
3338
import io.debezium.data.Envelope;
39+
import io.debezium.jdbc.JdbcConnection;
3440
import io.debezium.pipeline.ErrorHandler;
3541
import io.debezium.pipeline.spi.OffsetContext;
3642
import io.debezium.pipeline.spi.Partition;
3743
import io.debezium.relational.RelationalDatabaseSchema;
3844
import io.debezium.relational.Table;
3945
import io.debezium.relational.TableId;
46+
import io.debezium.relational.history.TableChanges;
4047
import io.debezium.util.SchemaNameAdjuster;
4148

4249
import java.time.Instant;
50+
import java.util.ArrayList;
4351
import java.util.Collection;
4452
import java.util.Collections;
53+
import java.util.Iterator;
4554
import java.util.List;
4655
import java.util.Map;
4756
import java.util.stream.Collectors;
@@ -155,6 +164,50 @@ record -> {
155164
.collect(Collectors.toList());
156165
}
157166

167+
protected void registerDatabaseHistory(
168+
SourceSplitBase sourceSplitBase, JdbcConnection connection) {
169+
List<TableChanges.TableChange> engineHistory = new ArrayList<>();
170+
// TODO: support save table schema
171+
if (sourceSplitBase instanceof SnapshotSplit) {
172+
SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
173+
engineHistory.add(
174+
dataSourceDialect.queryTableSchema(connection, snapshotSplit.getTableId()));
175+
} else {
176+
IncrementalSplit incrementalSplit = (IncrementalSplit) sourceSplitBase;
177+
Map<TableId, byte[]> historyTableChanges = incrementalSplit.getHistoryTableChanges();
178+
for (TableId tableId : incrementalSplit.getTableIds()) {
179+
if (historyTableChanges != null && historyTableChanges.containsKey(tableId)) {
180+
SchemaAndValue schemaAndValue =
181+
jsonConverter.toConnectData("topic", historyTableChanges.get(tableId));
182+
Struct deserializedStruct = (Struct) schemaAndValue.value();
183+
184+
TableChanges tableChanges =
185+
tableChangeSerializer.deserialize(
186+
Collections.singletonList(deserializedStruct), false);
187+
188+
Iterator<TableChanges.TableChange> iterator = tableChanges.iterator();
189+
TableChanges.TableChange tableChange = null;
190+
while (iterator.hasNext()) {
191+
if (tableChange != null) {
192+
throw new IllegalStateException(
193+
"The table changes should only have one element");
194+
}
195+
tableChange = iterator.next();
196+
}
197+
engineHistory.add(tableChange);
198+
continue;
199+
}
200+
engineHistory.add(dataSourceDialect.queryTableSchema(connection, tableId));
201+
}
202+
}
203+
204+
EmbeddedDatabaseHistory.registerHistory(
205+
sourceConfig
206+
.getDbzConfiguration()
207+
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
208+
engineHistory);
209+
}
210+
158211
public SourceConfig getSourceConfig() {
159212
return sourceConfig;
160213
}

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java

+1-52
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,12 @@
2424
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
2525
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
2626
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
27-
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
28-
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
2927
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
30-
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
3128
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
3229
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
3330
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;
3431
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
3532

36-
import org.apache.kafka.connect.data.SchemaAndValue;
3733
import org.apache.kafka.connect.data.Struct;
3834
import org.apache.kafka.connect.source.SourceRecord;
3935

@@ -66,7 +62,6 @@
6662
import io.debezium.relational.Table;
6763
import io.debezium.relational.TableId;
6864
import io.debezium.relational.Tables;
69-
import io.debezium.relational.history.TableChanges;
7065
import io.debezium.schema.DataCollectionId;
7166
import io.debezium.schema.TopicSelector;
7267
import io.debezium.util.Collect;
@@ -75,9 +70,6 @@
7570
import java.io.IOException;
7671
import java.sql.SQLException;
7772
import java.time.Instant;
78-
import java.util.ArrayList;
79-
import java.util.Collections;
80-
import java.util.Iterator;
8173
import java.util.List;
8274
import java.util.Map;
8375
import java.util.Optional;
@@ -118,7 +110,7 @@ public MySqlSourceFetchTaskContext(
118110

119111
@Override
120112
public void configure(SourceSplitBase sourceSplitBase) {
121-
registerDatabaseHistory(sourceSplitBase);
113+
super.registerDatabaseHistory(sourceSplitBase, connection);
122114

123115
// initial stateful objects
124116
final MySqlConnectorConfig connectorConfig = getDbzConnectorConfig();
@@ -385,49 +377,6 @@ private void validateAndLoadDatabaseHistory(
385377
schema.recover(Offsets.of(mySqlPartition, offset));
386378
}
387379

388-
private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
389-
List<TableChanges.TableChange> engineHistory = new ArrayList<>();
390-
// TODO: support save table schema
391-
if (sourceSplitBase instanceof SnapshotSplit) {
392-
SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
393-
engineHistory.add(
394-
dataSourceDialect.queryTableSchema(connection, snapshotSplit.getTableId()));
395-
} else {
396-
IncrementalSplit incrementalSplit = (IncrementalSplit) sourceSplitBase;
397-
Map<TableId, byte[]> historyTableChanges = incrementalSplit.getHistoryTableChanges();
398-
for (TableId tableId : incrementalSplit.getTableIds()) {
399-
if (historyTableChanges != null && historyTableChanges.containsKey(tableId)) {
400-
SchemaAndValue schemaAndValue =
401-
jsonConverter.toConnectData("topic", historyTableChanges.get(tableId));
402-
Struct deserializedStruct = (Struct) schemaAndValue.value();
403-
404-
TableChanges tableChanges =
405-
tableChangeSerializer.deserialize(
406-
Collections.singletonList(deserializedStruct), false);
407-
408-
Iterator<TableChanges.TableChange> iterator = tableChanges.iterator();
409-
TableChanges.TableChange tableChange = null;
410-
while (iterator.hasNext()) {
411-
if (tableChange != null) {
412-
throw new IllegalStateException(
413-
"The table changes should only have one element");
414-
}
415-
tableChange = iterator.next();
416-
}
417-
engineHistory.add(tableChange);
418-
continue;
419-
}
420-
engineHistory.add(dataSourceDialect.queryTableSchema(connection, tableId));
421-
}
422-
}
423-
424-
EmbeddedDatabaseHistory.registerHistory(
425-
sourceConfig
426-
.getDbzConfiguration()
427-
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
428-
engineHistory);
429-
}
430-
431380
/** A subclass implementation of {@link MySqlTaskContext} which reuses one BinaryLogClient. */
432381
public class MySqlTaskContextImpl extends MySqlTaskContext {
433382

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java

+1-52
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,11 @@
2323
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
2424
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
2525
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
26-
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
27-
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
2826
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
29-
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
3027
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
3128
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.offset.RedoLogOffset;
3229
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleUtils;
3330

34-
import org.apache.kafka.connect.data.SchemaAndValue;
3531
import org.apache.kafka.connect.data.Struct;
3632
import org.apache.kafka.connect.source.SourceRecord;
3733

@@ -66,11 +62,7 @@
6662

6763
import java.sql.SQLException;
6864
import java.time.Instant;
69-
import java.util.ArrayList;
7065
import java.util.Collection;
71-
import java.util.Collections;
72-
import java.util.Iterator;
73-
import java.util.List;
7466
import java.util.Map;
7567

7668
import static org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleConnectionUtils.createOracleConnection;
@@ -105,7 +97,7 @@ public OracleSourceFetchTaskContext(
10597
@Override
10698
public void configure(SourceSplitBase sourceSplitBase) {
10799
// Initializes the table schema
108-
registerDatabaseHistory(sourceSplitBase);
100+
super.registerDatabaseHistory(sourceSplitBase, connection);
109101

110102
// initial stateful objects
111103
final OracleConnectorConfig connectorConfig = getDbzConnectorConfig();
@@ -258,49 +250,6 @@ private OracleOffsetContext loadStartingOffsetState(
258250
return oracleOffsetContext;
259251
}
260252

261-
private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
262-
List<TableChanges.TableChange> engineHistory = new ArrayList<>();
263-
// TODO: support save table schema
264-
if (sourceSplitBase instanceof SnapshotSplit) {
265-
SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
266-
engineHistory.add(
267-
dataSourceDialect.queryTableSchema(connection, snapshotSplit.getTableId()));
268-
} else {
269-
IncrementalSplit incrementalSplit = (IncrementalSplit) sourceSplitBase;
270-
Map<TableId, byte[]> historyTableChanges = incrementalSplit.getHistoryTableChanges();
271-
for (TableId tableId : incrementalSplit.getTableIds()) {
272-
if (historyTableChanges != null && historyTableChanges.containsKey(tableId)) {
273-
SchemaAndValue schemaAndValue =
274-
jsonConverter.toConnectData("topic", historyTableChanges.get(tableId));
275-
Struct deserializedStruct = (Struct) schemaAndValue.value();
276-
277-
TableChanges tableChanges =
278-
tableChangeSerializer.deserialize(
279-
Collections.singletonList(deserializedStruct), false);
280-
281-
Iterator<TableChanges.TableChange> iterator = tableChanges.iterator();
282-
TableChanges.TableChange tableChange = null;
283-
while (iterator.hasNext()) {
284-
if (tableChange != null) {
285-
throw new IllegalStateException(
286-
"The table changes should only have one element");
287-
}
288-
tableChange = iterator.next();
289-
}
290-
engineHistory.add(tableChange);
291-
continue;
292-
}
293-
engineHistory.add(dataSourceDialect.queryTableSchema(connection, tableId));
294-
}
295-
}
296-
297-
EmbeddedDatabaseHistory.registerHistory(
298-
sourceConfig
299-
.getDbzConfiguration()
300-
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
301-
engineHistory);
302-
}
303-
304253
private void validateAndLoadDatabaseHistory(
305254
OracleOffsetContext offset, OracleDatabaseSchema schema) {
306255
schema.initializeStorage();

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java

+1-27
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@
2424
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
2525
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
2626
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
27-
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
28-
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
2927
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
30-
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
3128
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfig;
3229
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.exception.PostgresConnectorErrorCode;
3330
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
@@ -68,10 +65,8 @@
6865
import lombok.extern.slf4j.Slf4j;
6966

7067
import java.sql.SQLException;
71-
import java.util.ArrayList;
7268
import java.util.Collection;
7369
import java.util.HashMap;
74-
import java.util.List;
7570
import java.util.Map;
7671
import java.util.Objects;
7772

@@ -129,7 +124,7 @@ public PostgresSourceFetchTaskContext(
129124

130125
@Override
131126
public void configure(SourceSplitBase sourceSplitBase) {
132-
registerDatabaseHistory(sourceSplitBase);
127+
super.registerDatabaseHistory(sourceSplitBase, dataConnection);
133128

134129
// initial stateful objects
135130
final PostgresConnectorConfig connectorConfig = getDbzConnectorConfig();
@@ -276,27 +271,6 @@ public void configure(SourceSplitBase sourceSplitBase) {
276271
}
277272
}
278273

279-
private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
280-
List<TableChanges.TableChange> engineHistory = new ArrayList<>();
281-
// TODO: support save table schema
282-
if (sourceSplitBase instanceof SnapshotSplit) {
283-
SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
284-
engineHistory.add(
285-
dataSourceDialect.queryTableSchema(dataConnection, snapshotSplit.getTableId()));
286-
} else {
287-
IncrementalSplit incrementalSplit = (IncrementalSplit) sourceSplitBase;
288-
for (TableId tableId : incrementalSplit.getTableIds()) {
289-
engineHistory.add(dataSourceDialect.queryTableSchema(dataConnection, tableId));
290-
}
291-
}
292-
293-
EmbeddedDatabaseHistory.registerHistory(
294-
sourceConfig
295-
.getDbzConfiguration()
296-
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
297-
engineHistory);
298-
}
299-
300274
@Override
301275
public PostgresSourceConfig getSourceConfig() {
302276
return (PostgresSourceConfig) sourceConfig;

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java

+1-28
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,7 @@
2222
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
2323
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
2424
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
25-
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
26-
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
2725
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
28-
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
2926
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfig;
3027
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset.LsnOffset;
3128
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerConnectionUtils;
@@ -55,16 +52,13 @@
5552
import io.debezium.relational.Table;
5653
import io.debezium.relational.TableId;
5754
import io.debezium.relational.Tables;
58-
import io.debezium.relational.history.TableChanges;
5955
import io.debezium.schema.DataCollectionId;
6056
import io.debezium.schema.TopicSelector;
6157
import io.debezium.util.Collect;
6258
import lombok.extern.slf4j.Slf4j;
6359

6460
import java.sql.SQLException;
6561
import java.time.Instant;
66-
import java.util.ArrayList;
67-
import java.util.List;
6862
import java.util.Map;
6963

7064
/** The context for fetch task that fetching data of snapshot split from MySQL data source. */
@@ -99,7 +93,7 @@ public SqlServerSourceFetchTaskContext(
9993

10094
@Override
10195
public void configure(SourceSplitBase sourceSplitBase) {
102-
registerDatabaseHistory(sourceSplitBase);
96+
super.registerDatabaseHistory(sourceSplitBase, dataConnection);
10397

10498
// initial stateful objects
10599
final SqlServerConnectorConfig connectorConfig = getDbzConnectorConfig();
@@ -282,27 +276,6 @@ private SqlServerOffsetContext loadStartingOffsetState(
282276
return sqlServerOffsetContext;
283277
}
284278

285-
private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
286-
List<TableChanges.TableChange> engineHistory = new ArrayList<>();
287-
// TODO: support save table schema
288-
if (sourceSplitBase instanceof SnapshotSplit) {
289-
SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
290-
engineHistory.add(
291-
dataSourceDialect.queryTableSchema(dataConnection, snapshotSplit.getTableId()));
292-
} else {
293-
IncrementalSplit incrementalSplit = (IncrementalSplit) sourceSplitBase;
294-
for (TableId tableId : incrementalSplit.getTableIds()) {
295-
engineHistory.add(dataSourceDialect.queryTableSchema(dataConnection, tableId));
296-
}
297-
}
298-
299-
EmbeddedDatabaseHistory.registerHistory(
300-
sourceConfig
301-
.getDbzConfiguration()
302-
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
303-
engineHistory);
304-
}
305-
306279
public static class SqlServerEventMetadataProvider implements EventMetadataProvider {
307280

308281
@Override

0 commit comments

Comments
 (0)