Skip to content

Commit 93d38d5

Browse files
[Improve][Core] Add protobuf transform test case (#7914)
1 parent 10c37ac commit 93d38d5

File tree

2 files changed

+195
-22
lines changed

2 files changed

+195
-22
lines changed

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java

Lines changed: 86 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3838
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3939
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
40+
import org.apache.seatunnel.common.utils.JsonUtils;
4041
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
4142
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
4243
import org.apache.seatunnel.e2e.common.TestResource;
@@ -64,6 +65,7 @@
6465
import org.apache.kafka.common.serialization.ByteArraySerializer;
6566
import org.apache.kafka.common.serialization.StringDeserializer;
6667

68+
import org.jetbrains.annotations.NotNull;
6769
import org.junit.jupiter.api.AfterAll;
6870
import org.junit.jupiter.api.Assertions;
6971
import org.junit.jupiter.api.BeforeAll;
@@ -693,30 +695,11 @@ public void testKafkaProtobufToAssert(TestContainer container)
693695
ProtobufDeserializationSchema deserializationSchema =
694696
new ProtobufDeserializationSchema(catalogTable);
695697

696-
// Create serializer
697698
DefaultSeaTunnelRowSerializer serializer =
698-
DefaultSeaTunnelRowSerializer.create(
699-
"test_protobuf_topic_fake_source",
700-
seaTunnelRowType,
701-
MessageFormat.PROTOBUF,
702-
DEFAULT_FIELD_DELIMITER,
703-
readonlyConfig);
704-
705-
// Produce records to Kafka
706-
IntStream.range(0, 20)
707-
.forEach(
708-
i -> {
709-
try {
710-
SeaTunnelRow originalRow = buildSeaTunnelRow();
711-
ProducerRecord<byte[], byte[]> producerRecord =
712-
serializer.serializeRow(originalRow);
713-
producer.send(producerRecord).get();
714-
} catch (InterruptedException | ExecutionException e) {
715-
throw new RuntimeException("Error sending Kafka message", e);
716-
}
717-
});
699+
getDefaultSeaTunnelRowSerializer(
700+
"test_protobuf_topic_fake_source", seaTunnelRowType, readonlyConfig);
718701

719-
producer.flush();
702+
sendData(serializer);
720703

721704
// Execute the job and validate
722705
Container.ExecResult execResult = container.executeJob(confFile);
@@ -769,6 +752,87 @@ public void testKafkaProtobufToAssert(TestContainer container)
769752
});
770753
}
771754

755+
private @NotNull DefaultSeaTunnelRowSerializer getDefaultSeaTunnelRowSerializer(
756+
String topic, SeaTunnelRowType seaTunnelRowType, ReadonlyConfig readonlyConfig) {
757+
// Create serializer
758+
DefaultSeaTunnelRowSerializer serializer =
759+
DefaultSeaTunnelRowSerializer.create(
760+
topic,
761+
seaTunnelRowType,
762+
MessageFormat.PROTOBUF,
763+
DEFAULT_FIELD_DELIMITER,
764+
readonlyConfig);
765+
return serializer;
766+
}
767+
768+
private void sendData(DefaultSeaTunnelRowSerializer serializer) {
769+
// Produce records to Kafka
770+
IntStream.range(0, 20)
771+
.forEach(
772+
i -> {
773+
try {
774+
SeaTunnelRow originalRow = buildSeaTunnelRow();
775+
ProducerRecord<byte[], byte[]> producerRecord =
776+
serializer.serializeRow(originalRow);
777+
producer.send(producerRecord).get();
778+
} catch (InterruptedException | ExecutionException e) {
779+
throw new RuntimeException("Error sending Kafka message", e);
780+
}
781+
});
782+
783+
producer.flush();
784+
}
785+
786+
@TestTemplate
787+
public void testKafkaProtobufForTransformToAssert(TestContainer container)
788+
throws IOException, InterruptedException, URISyntaxException {
789+
790+
String confFile = "/protobuf/kafka_protobuf_transform_to_assert.conf";
791+
String path = getTestConfigFile(confFile);
792+
Config config = ConfigFactory.parseFile(new File(path));
793+
Config sinkConfig = config.getConfigList("source").get(0);
794+
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
795+
SeaTunnelRowType seaTunnelRowType = buildSeaTunnelRowType();
796+
797+
// Create serializer
798+
DefaultSeaTunnelRowSerializer serializer =
799+
getDefaultSeaTunnelRowSerializer(
800+
"test_protobuf_topic_transform_fake_source",
801+
seaTunnelRowType,
802+
readonlyConfig);
803+
804+
// Produce records to Kafka
805+
sendData(serializer);
806+
807+
// Execute the job and validate
808+
Container.ExecResult execResult = container.executeJob(confFile);
809+
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
810+
811+
try (KafkaConsumer<byte[], byte[]> consumer =
812+
new KafkaConsumer<>(kafkaByteConsumerConfig())) {
813+
consumer.subscribe(Arrays.asList("verify_protobuf_transform"));
814+
Map<TopicPartition, Long> offsets =
815+
consumer.endOffsets(
816+
Arrays.asList(new TopicPartition("verify_protobuf_transform", 0)));
817+
Long endOffset = offsets.entrySet().iterator().next().getValue();
818+
Long lastProcessedOffset = -1L;
819+
820+
do {
821+
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
822+
for (ConsumerRecord<byte[], byte[]> record : records) {
823+
if (lastProcessedOffset < record.offset()) {
824+
String data = new String(record.value(), "UTF-8");
825+
ObjectNode jsonNodes = JsonUtils.parseObject(data);
826+
Assertions.assertEquals(jsonNodes.size(), 2);
827+
Assertions.assertEquals(jsonNodes.get("city").asText(), "city_value");
828+
Assertions.assertEquals(jsonNodes.get("c_string").asText(), "test data");
829+
}
830+
lastProcessedOffset = record.offset();
831+
}
832+
} while (lastProcessedOffset < endOffset - 1);
833+
}
834+
}
835+
772836
public static String getTestConfigFile(String configFile)
773837
throws FileNotFoundException, URISyntaxException {
774838
URL resource = KafkaIT.class.getResource(configFile);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
spark.app.name = "SeaTunnel"
22+
spark.executor.instances = 1
23+
spark.executor.cores = 1
24+
spark.executor.memory = "1g"
25+
spark.master = local
26+
}
27+
28+
source {
29+
Kafka {
30+
topic = "test_protobuf_topic_transform_fake_source"
31+
format = protobuf
32+
protobuf_message_name = Person
33+
protobuf_schema = """
34+
syntax = "proto3";
35+
36+
package org.apache.seatunnel.format.protobuf;
37+
38+
option java_outer_classname = "ProtobufE2E";
39+
40+
message Person {
41+
int32 c_int32 = 1;
42+
int64 c_int64 = 2;
43+
float c_float = 3;
44+
double c_double = 4;
45+
bool c_bool = 5;
46+
string c_string = 6;
47+
bytes c_bytes = 7;
48+
49+
message Address {
50+
string street = 1;
51+
string city = 2;
52+
string state = 3;
53+
string zip = 4;
54+
}
55+
56+
Address address = 8;
57+
58+
map<string, float> attributes = 9;
59+
60+
repeated string phone_numbers = 10;
61+
}
62+
"""
63+
schema = {
64+
fields {
65+
c_int32 = int
66+
c_int64 = long
67+
c_float = float
68+
c_double = double
69+
c_bool = boolean
70+
c_string = string
71+
c_bytes = bytes
72+
73+
Address {
74+
city = string
75+
state = string
76+
street = string
77+
}
78+
attributes = "map<string,float>"
79+
phone_numbers = "array<string>"
80+
}
81+
}
82+
bootstrap.servers = "kafkaCluster:9092"
83+
start_mode = "earliest"
84+
result_table_name = "kafka_table"
85+
}
86+
}
87+
88+
transform {
89+
Sql {
90+
source_table_name = "kafka_table"
91+
result_table_name = "kafka_table_transform"
92+
query = "select Address.city,c_string from kafka_table"
93+
}
94+
}
95+
96+
sink {
97+
kafka {
98+
topic = "verify_protobuf_transform"
99+
source_table_name = "kafka_table_transform"
100+
bootstrap.servers = "kafkaCluster:9092"
101+
kafka.request.timeout.ms = 60000
102+
kafka.config = {
103+
acks = "all"
104+
request.timeout.ms = 60000
105+
buffer.memory = 33554432
106+
}
107+
108+
}
109+
}

0 commit comments

Comments
 (0)