Skip to content

Commit 792a792

Browse files
vjuranekNaros
authored andcommitted
DBZ-8782 Allow key and value format of type Connect
1 parent 79f5893 commit 792a792

File tree

3 files changed

+119
-0
lines changed

3 files changed

+119
-0
lines changed

debezium-server-core/src/main/java/io/debezium/server/DebeziumServer.java

+8
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import io.debezium.DebeziumException;
3434
import io.debezium.embedded.ClientProvided;
35+
import io.debezium.embedded.Connect;
3536
import io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory;
3637
import io.debezium.engine.ChangeEvent;
3738
import io.debezium.engine.DebeziumEngine;
@@ -95,6 +96,7 @@ public class DebeziumServer {
9596
private static final String FORMAT_PROTOBUF = Protobuf.class.getSimpleName().toLowerCase();
9697
private static final String FORMAT_BINARY = Binary.class.getSimpleName().toLowerCase();
9798
private static final String FORMAT_STRING = SimpleString.class.getSimpleName().toLowerCase();
99+
private static final String FORMAT_CONNECT = Connect.class.getSimpleName().toLowerCase();
98100
private static final String FORMAT_CLIENT_PROVIDED = ClientProvided.class.getSimpleName().toLowerCase();
99101

100102
private static final Pattern SHELL_PROPERTY_NAME_PATTERN = Pattern.compile("^[a-zA-Z0-9_]+_+[a-zA-Z0-9_]+$");
@@ -230,6 +232,9 @@ else if (FORMAT_BINARY.equals(formatName)) {
230232
else if (FORMAT_STRING.equals(formatName)) {
231233
return SimpleString.class;
232234
}
235+
else if (FORMAT_CONNECT.equalsIgnoreCase(formatName)) {
236+
return Connect.class;
237+
}
233238
else if (FORMAT_CLIENT_PROVIDED.equals(formatName)) {
234239
return ClientProvided.class;
235240
}
@@ -244,6 +249,9 @@ private Class<?> getHeaderFormat(Config config) {
244249
else if (FORMAT_JSON_BYTE_ARRAY.equals(formatName)) {
245250
return JsonByteArray.class;
246251
}
252+
else if (FORMAT_CONNECT.equals(formatName)) {
253+
return Connect.class;
254+
}
247255
else if (FORMAT_CLIENT_PROVIDED.equals(formatName)) {
248256
return ClientProvided.class;
249257
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright Debezium Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
package io.debezium.server;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
10+
import java.time.Duration;
11+
12+
import jakarta.enterprise.event.Observes;
13+
import jakarta.inject.Inject;
14+
15+
import org.apache.kafka.connect.data.Struct;
16+
import org.apache.kafka.connect.header.ConnectHeaders;
17+
import org.apache.kafka.connect.source.SourceRecord;
18+
import org.awaitility.Awaitility;
19+
import org.junit.jupiter.api.Test;
20+
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
21+
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
22+
23+
import io.debezium.server.events.ConnectorCompletedEvent;
24+
import io.debezium.server.events.ConnectorStartedEvent;
25+
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
26+
import io.debezium.util.Testing;
27+
import io.quarkus.test.common.QuarkusTestResource;
28+
import io.quarkus.test.junit.QuarkusTest;
29+
import io.quarkus.test.junit.TestProfile;
30+
31+
/**
32+
* Integration test that verifies Debezium server is able to deliver records in Kafka Connect format.
33+
*
34+
* @author vjuranek
35+
*/
36+
@QuarkusTest
37+
@TestProfile(DebeziumServerConnectFormatProfile.class)
38+
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
39+
@EnabledIfSystemProperty(named = "test.apicurio", matches = "false", disabledReason = "DebeziumServerConfigProvidersIT doesn't run with apicurio profile.")
40+
@DisabledIfSystemProperty(named = "debezium.format.key", matches = "protobuf")
41+
@DisabledIfSystemProperty(named = "debezium.format.value", matches = "protobuf")
42+
public class DebeziumServerConnectFormatIT {
43+
44+
private static final int MESSAGE_COUNT = 4;
45+
@Inject
46+
DebeziumServer server;
47+
48+
{
49+
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
50+
}
51+
52+
void setupDependencies(@Observes ConnectorStartedEvent event) {
53+
if (!TestConfigSource.isItTest()) {
54+
return;
55+
}
56+
57+
}
58+
59+
void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
60+
if (!event.isSuccess()) {
61+
throw (Exception) event.getError().get();
62+
}
63+
}
64+
65+
@Test
66+
public void testPostgresWithSourceRecord() throws Exception {
67+
Testing.Print.enable();
68+
final TestConsumer testConsumer = (TestConsumer) server.getConsumer();
69+
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds()))
70+
.until(() -> (testConsumer.getValues().size() >= MESSAGE_COUNT));
71+
assertThat(testConsumer.getValues().size()).isEqualTo(MESSAGE_COUNT);
72+
73+
SourceRecord record = (SourceRecord) testConsumer.getValues().get(MESSAGE_COUNT - 1);
74+
Struct key = (Struct) record.key();
75+
assertThat(key.getInt32("id")).isEqualTo(1004);
76+
77+
Struct after = ((Struct) record.value()).getStruct("after");
78+
assertThat(after.getInt32("id")).isEqualTo(1004);
79+
assertThat(after.getString("first_name")).isEqualTo("Anne");
80+
assertThat(after.getString("last_name")).isEqualTo("Kretchmar");
81+
assertThat(after.getString("email")).isEqualTo("[email protected]");
82+
83+
ConnectHeaders headers = (ConnectHeaders) record.headers();
84+
assertThat(headers.size()).isEqualTo(0);
85+
}
86+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright Debezium Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
package io.debezium.server;
7+
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
11+
import io.debezium.embedded.Connect;
12+
import io.quarkus.test.junit.QuarkusTestProfile;
13+
14+
public class DebeziumServerConnectFormatProfile implements QuarkusTestProfile {
15+
16+
@Override
17+
public Map<String, String> getConfigOverrides() {
18+
Map<String, String> config = new HashMap<String, String>();
19+
config.put("debezium.format.key", Connect.class.getSimpleName().toLowerCase());
20+
config.put("debezium.format.value", Connect.class.getSimpleName().toLowerCase());
21+
config.put("debezium.format.header", Connect.class.getSimpleName().toLowerCase());
22+
return config;
23+
}
24+
25+
}

0 commit comments

Comments
 (0)