Skip to content

Commit 29cf3a7

Browse files
authored
[Fix][Connector-V2] Fix postgres cdc with debezium_json format can not parse number without scale (#9052)
1 parent 5e0e376 commit 29cf3a7

File tree

5 files changed

+308
-9
lines changed

5 files changed

+308
-9
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java

+12
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
3333
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
3434
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
35+
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
36+
import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
3537
import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
3638
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory;
3739
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffsetFactory;
@@ -93,11 +95,21 @@ public SourceConfig.Factory<JdbcSourceConfig> createSourceConfigFactory(Readonly
9395
@Override
9496
public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(
9597
ReadonlyConfig config) {
98+
Map<TableId, Struct> tableIdTableChangeMap = tableChanges();
99+
if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(
100+
config.get(JdbcSourceOptions.FORMAT))) {
101+
return (DebeziumDeserializationSchema<T>)
102+
new DebeziumJsonDeserializeSchema(
103+
config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES),
104+
tableIdTableChangeMap);
105+
}
106+
96107
String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
97108
return (DebeziumDeserializationSchema<T>)
98109
SeaTunnelRowDebeziumDeserializeSchema.builder()
99110
.setTables(catalogTables)
100111
.setServerTimeZone(ZoneId.of(zoneId))
112+
.setTableIdTableChangeMap(tableIdTableChangeMap)
101113
.build();
102114
}
103115

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml

+14
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,20 @@
7474
<scope>test</scope>
7575
</dependency>
7676

77+
<dependency>
78+
<groupId>org.testcontainers</groupId>
79+
<artifactId>kafka</artifactId>
80+
<version>${testcontainer.version}</version>
81+
<scope>test</scope>
82+
</dependency>
83+
84+
<dependency>
85+
<groupId>org.apache.seatunnel</groupId>
86+
<artifactId>connector-kafka</artifactId>
87+
<version>${project.version}</version>
88+
<scope>test</scope>
89+
</dependency>
90+
7791
<dependency>
7892
<!-- fix CVE-2022-26520 https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-26520 -->
7993
<groupId>org.postgresql</groupId>

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java

+174-6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,17 @@
3232
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
3333
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
3434

35+
import org.apache.kafka.clients.admin.AdminClient;
36+
import org.apache.kafka.clients.admin.AdminClientConfig;
37+
import org.apache.kafka.clients.admin.NewTopic;
38+
import org.apache.kafka.clients.consumer.ConsumerConfig;
39+
import org.apache.kafka.clients.consumer.ConsumerRecord;
40+
import org.apache.kafka.clients.consumer.ConsumerRecords;
41+
import org.apache.kafka.clients.consumer.KafkaConsumer;
42+
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
43+
import org.apache.kafka.common.IsolationLevel;
44+
import org.apache.kafka.common.TopicPartition;
45+
3546
import org.junit.jupiter.api.AfterAll;
3647
import org.junit.jupiter.api.Assertions;
3748
import org.junit.jupiter.api.BeforeAll;
@@ -40,10 +51,12 @@
4051
import org.slf4j.Logger;
4152
import org.slf4j.LoggerFactory;
4253
import org.testcontainers.containers.Container;
54+
import org.testcontainers.containers.KafkaContainer;
4355
import org.testcontainers.containers.PostgreSQLContainer;
4456
import org.testcontainers.containers.output.Slf4jLogConsumer;
4557
import org.testcontainers.lifecycle.Startables;
4658
import org.testcontainers.utility.DockerImageName;
59+
import org.testcontainers.utility.DockerLoggerFactory;
4760

4861
import io.debezium.jdbc.JdbcConnection;
4962
import io.debezium.relational.TableId;
@@ -59,19 +72,24 @@
5972
import java.sql.ResultSet;
6073
import java.sql.SQLException;
6174
import java.sql.Statement;
75+
import java.time.Duration;
6276
import java.util.ArrayList;
6377
import java.util.Arrays;
6478
import java.util.Collections;
6579
import java.util.List;
80+
import java.util.Map;
81+
import java.util.Properties;
6682
import java.util.concurrent.CompletableFuture;
83+
import java.util.concurrent.ExecutionException;
6784
import java.util.concurrent.TimeUnit;
85+
import java.util.concurrent.atomic.AtomicReference;
6886
import java.util.regex.Matcher;
6987
import java.util.regex.Pattern;
7088
import java.util.stream.Collectors;
7189
import java.util.stream.Stream;
7290

7391
import static org.awaitility.Awaitility.await;
74-
import static org.junit.Assert.assertNotNull;
92+
import static org.awaitility.Awaitility.given;
7593

7694
@Slf4j
7795
@DisabledOnContainer(
@@ -99,8 +117,21 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource {
99117

100118
private static final String SOURCE_TABLE_NO_PRIMARY_KEY = "full_types_no_primary_key";
101119

120+
private static final String SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM =
121+
"full_types_no_primary_key_with_debezium";
122+
102123
private static final String SOURCE_SQL_TEMPLATE = "select * from %s.%s order by id";
103124

125+
// kafka container
126+
private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.0.9";
127+
128+
private static final String KAFKA_HOST = "kafka_e2e";
129+
130+
private static KafkaContainer KAFKA_CONTAINER;
131+
132+
private static KafkaConsumer<String, String> kafkaConsumer;
133+
134+
private static final String DEBEZIUM_JSON_TOPIC = "debezium_json_topic";
104135
// use newer version of postgresql image to support pgoutput plugin
105136
// when testing postgres 13, only 13-alpine supports both amd64 and arm64
106137
protected static final DockerImageName PG_IMAGE =
@@ -122,6 +153,16 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource {
122153
"-c",
123154
"max_replication_slots=20");
124155

156+
private void createKafkaContainer() {
157+
KAFKA_CONTAINER =
158+
new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
159+
.withNetwork(NETWORK)
160+
.withNetworkAliases(KAFKA_HOST)
161+
.withLogConsumer(
162+
new Slf4jLogConsumer(
163+
DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
164+
}
165+
125166
private String driverUrl() {
126167
return "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.1/postgresql-42.5.1.jar";
127168
}
@@ -149,8 +190,136 @@ public void startUp() {
149190
PostgreSQLContainer.POSTGRESQL_PORT,
150191
PostgreSQLContainer.POSTGRESQL_PORT)));
151192
Startables.deepStart(Stream.of(POSTGRES_CONTAINER)).join();
193+
152194
log.info("Postgres Containers are started");
153195
initializePostgresTable(POSTGRES_CONTAINER, "inventory");
196+
197+
LOG.info("The third stage: Starting Kafka containers...");
198+
createKafkaContainer();
199+
Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
200+
LOG.info("Kafka Containers are started");
201+
202+
given().ignoreExceptions()
203+
.atLeast(100, TimeUnit.MILLISECONDS)
204+
.pollInterval(500, TimeUnit.MILLISECONDS)
205+
.atMost(180, TimeUnit.SECONDS)
206+
.untilAsserted(this::createTopic);
207+
LOG.info("Kafka create topic: " + DEBEZIUM_JSON_TOPIC);
208+
}
209+
210+
// Initialize the kafka Topic
211+
private void createTopic() {
212+
Properties props = new Properties();
213+
props.put(
214+
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
215+
216+
try (AdminClient adminClient = AdminClient.create(props)) {
217+
// Create a new topic
218+
NewTopic newTopic = new NewTopic(DEBEZIUM_JSON_TOPIC, 1, (short) 1);
219+
220+
// Create the topic (async operation)
221+
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
222+
223+
System.out.println("Topic " + DEBEZIUM_JSON_TOPIC + " created successfully");
224+
} catch (InterruptedException | ExecutionException e) {
225+
System.err.println("Error creating topic: " + e.getMessage());
226+
}
227+
}
228+
// Initialize the kafka Consumer
229+
230+
private Properties kafkaConsumerConfig() {
231+
Properties props = new Properties();
232+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
233+
props.put(
234+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
235+
OffsetResetStrategy.EARLIEST.toString().toLowerCase());
236+
props.put(
237+
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
238+
IsolationLevel.READ_COMMITTED.name().toLowerCase());
239+
props.put(
240+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
241+
"org.apache.kafka.common.serialization.StringDeserializer");
242+
props.put(
243+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
244+
"org.apache.kafka.common.serialization.StringDeserializer");
245+
246+
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
247+
return props;
248+
}
249+
250+
private List<String> getKafkaData() {
251+
long endOffset;
252+
long lastProcessedOffset = -1L;
253+
List<String> data = new ArrayList<>();
254+
kafkaConsumer.subscribe(Collections.singletonList(PostgresCDCIT.DEBEZIUM_JSON_TOPIC));
255+
Map<TopicPartition, Long> offsets =
256+
kafkaConsumer.endOffsets(
257+
Collections.singletonList(
258+
new TopicPartition(PostgresCDCIT.DEBEZIUM_JSON_TOPIC, 0)));
259+
endOffset = offsets.entrySet().iterator().next().getValue();
260+
log.info("End offset: {}", endOffset);
261+
do {
262+
ConsumerRecords<String, String> consumerRecords =
263+
kafkaConsumer.poll(Duration.ofMillis(1000));
264+
for (ConsumerRecord<String, String> record : consumerRecords) {
265+
data.add(record.value());
266+
lastProcessedOffset = record.offset();
267+
}
268+
log.info("Data size: {}", data.size());
269+
} while (lastProcessedOffset < endOffset - 1);
270+
271+
return data;
272+
}
273+
274+
@TestTemplate
275+
@DisabledOnContainer(
276+
value = {},
277+
type = {EngineType.SPARK, EngineType.FLINK},
278+
disabledReason = "Currently Only support Zeta engine")
279+
public void testPostgresCdcWithDebeziumJsonFormat(TestContainer container) {
280+
try {
281+
282+
log.info(
283+
"Table {} has {} rows.",
284+
SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM,
285+
query(getQuerySQL(POSTGRESQL_SCHEMA, SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM)));
286+
287+
Properties props = kafkaConsumerConfig();
288+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-debezium-json-format");
289+
kafkaConsumer = new KafkaConsumer<>(props);
290+
291+
CompletableFuture.supplyAsync(
292+
() -> {
293+
try {
294+
container.executeJob(
295+
"/postgrescdc_to_postgres_with_debezium_to_kafka.conf");
296+
} catch (Exception e) {
297+
log.error("Commit task exception :" + e.getMessage());
298+
throw new RuntimeException(e);
299+
}
300+
return null;
301+
});
302+
AtomicReference<Integer> dataSize = new AtomicReference<>(0);
303+
304+
await().atMost(1000 * 60 * 3, TimeUnit.MILLISECONDS)
305+
.untilAsserted(
306+
() -> {
307+
dataSize.updateAndGet(v -> v + getKafkaData().size());
308+
Assertions.assertEquals(1, dataSize.get());
309+
});
310+
// insert update delete
311+
upsertDeleteSourceTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM);
312+
313+
await().atMost(1000 * 60 * 3, TimeUnit.MILLISECONDS)
314+
.untilAsserted(
315+
() -> {
316+
dataSize.updateAndGet(v -> v + getKafkaData().size());
317+
Assertions.assertEquals(5, dataSize.get());
318+
});
319+
} finally {
320+
clearTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM);
321+
kafkaConsumer.close();
322+
}
154323
}
155324

156325
@TestTemplate
@@ -555,8 +724,7 @@ public void testPostgresCdcCheckDataWithNoPrimaryKey(TestContainer container) th
555724
}
556725

557726
@TestTemplate
558-
public void testPostgresCdcCheckDataWithCustomPrimaryKey(TestContainer container)
559-
throws Exception {
727+
public void testPostgresCdcCheckDataWithCustomPrimaryKey(TestContainer container) {
560728

561729
try {
562730
CompletableFuture.supplyAsync(
@@ -639,7 +807,7 @@ private Connection getJdbcConnection() throws SQLException {
639807
protected void initializePostgresTable(PostgreSQLContainer container, String sqlFile) {
640808
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
641809
final URL ddlTestFile = PostgresCDCIT.class.getClassLoader().getResource(ddlFile);
642-
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
810+
Assertions.assertNotNull(ddlTestFile, "Cannot locate " + ddlFile);
643811
try (Connection connection = getJdbcConnection();
644812
Statement statement = connection.createStatement()) {
645813
final List<String> statements =
@@ -723,7 +891,7 @@ private void upsertDeleteSourceTable(String database, String tableName) {
723891
+ tableName
724892
+ " VALUES (2, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,\n"
725893
+ " 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',\n"
726-
+ " '2020-07-17', '18:00:22', 500,'192.168.1.1');");
894+
+ " '2020-07-17', '18:00:22', 500, 88, '192.168.1.1');");
727895

728896
executeSql(
729897
"INSERT INTO "
@@ -732,7 +900,7 @@ private void upsertDeleteSourceTable(String database, String tableName) {
732900
+ tableName
733901
+ " VALUES (3, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,\n"
734902
+ " 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',\n"
735-
+ " '2020-07-17', '18:00:22', 500,'192.168.1.1');");
903+
+ " '2020-07-17', '18:00:22', 500, 88,'192.168.1.1');");
736904

737905
executeSql("DELETE FROM " + database + "." + tableName + " where id = 2;");
738906

0 commit comments

Comments
 (0)