Skip to content

Commit db6e299

Browse files
authored
[improve] update kafka source default schema from content<ROW<content STRING>> to content<STRING> (#8642)
1 parent e0c99ac commit db6e299

File tree

3 files changed

+84
-11
lines changed

3 files changed

+84
-11
lines changed

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java

+1-11
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.seatunnel.api.table.catalog.TableSchema;
2828
import org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser;
2929
import org.apache.seatunnel.api.table.type.BasicType;
30-
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3130
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3231
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
3332
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
@@ -218,16 +217,7 @@ private CatalogTable createCatalogTable(ReadonlyConfig readonlyConfig) {
218217
TableSchema.builder()
219218
.column(
220219
PhysicalColumn.of(
221-
"content",
222-
new SeaTunnelRowType(
223-
new String[] {"content"},
224-
new SeaTunnelDataType<?>[] {
225-
BasicType.STRING_TYPE
226-
}),
227-
0,
228-
false,
229-
null,
230-
null))
220+
"content", BasicType.STRING_TYPE, 0, false, null, null))
231221
.build();
232222
}
233223
return CatalogTable.of(

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java

+20
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,26 @@ public void testSourceKafkaTextToConsole(TestContainer container)
231231
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
232232
}
233233

234+
@TestTemplate
235+
public void testTextFormatWithNoSchema(TestContainer container)
236+
throws IOException, InterruptedException {
237+
try {
238+
for (int i = 0; i < 100; i++) {
239+
ProducerRecord<byte[], byte[]> producerRecord =
240+
new ProducerRecord<>(
241+
"test_topic_text_no_schema", null, "abcdef".getBytes());
242+
producer.send(producerRecord).get();
243+
}
244+
} catch (Exception e) {
245+
throw new RuntimeException(e);
246+
} finally {
247+
producer.flush();
248+
}
249+
Container.ExecResult execResult =
250+
container.executeJob("/textFormatIT/kafka_source_text_with_no_schema.conf");
251+
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
252+
}
253+
234254
@TestTemplate
235255
public void testSourceKafkaToAssertWithMaxPollRecords1(TestContainer container)
236256
throws IOException, InterruptedException {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
22+
# You can set spark configuration here
23+
spark.app.name = "SeaTunnel"
24+
spark.executor.instances = 1
25+
spark.executor.cores = 1
26+
spark.executor.memory = "1g"
27+
spark.master = local
28+
}
29+
30+
source {
31+
Kafka {
32+
bootstrap.servers = "kafkaCluster:9092"
33+
topic = "test_topic_text_no_schema"
34+
plugin_output = "kafka_table"
35+
start_mode = "earliest"
36+
format_error_handle_way = fail
37+
format = text
38+
}
39+
}
40+
41+
sink {
42+
Assert {
43+
plugin_input = "kafka_table"
44+
rules =
45+
{
46+
row_rules = [
47+
{
48+
rule_type = MIN_ROW
49+
rule_value = 100
50+
}
51+
],
52+
field_rules = [
53+
{
54+
field_name = "content"
55+
field_type = "string"
56+
field_value = [
57+
{equals_to = "abcdef"}
58+
]
59+
}
60+
]
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)