Skip to content

Commit 5913e8c

Browse files
authored
[Improve][Connector-V2] RocketMQ Source add message tag config (#8825)
1 parent afc990d commit 5913e8c

File tree

9 files changed

+360
-5
lines changed

9 files changed

+360
-5
lines changed

Diff for: docs/en/connector-v2/source/RocketMQ.md

+61-1
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ Source connector for Apache RocketMQ.
2727

2828
## Source Options
2929

30-
| Name | Type | Required | Default | Description |
30+
| Name | Type | Required | Default | Description |
3131
|-------------------------------------|---------|----------|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
3232
| topics | String | yes | - | `RocketMQ topic` name. If there are multiple `topics`, use `,` to split, for example: `"tpc1,tpc2"`. |
3333
| name.srv.addr | String | yes | - | `RocketMQ` name server cluster address. |
34+
| tags | String | no | - | `RocketMQ tag` name. If there are multiple `tags`, use `,` to split, for example: `"tag1,tag2"`. |
3435
| acl.enabled | Boolean | no | false | If true, access control is enabled, and access key and secret key need to be configured. |
3536
| access.key | String | no | | |
3637
| secret.key | String | no | | When ACL_ENABLED is true, secret key cannot be empty. |
@@ -218,3 +219,62 @@ sink {
218219
}
219220
```
220221

222+
### Specified tag example:
223+
224+
> Here you can specify a tag to consume data. If there are multiple tags, use `,` to separate them, for example: "tag1,tag2"
225+
226+
```hocon
227+
env {
228+
parallelism = 1
229+
job.mode = "BATCH"
230+
231+
# You can set spark configuration here
232+
spark.app.name = "SeaTunnel"
233+
spark.executor.instances = 2
234+
spark.executor.cores = 1
235+
spark.executor.memory = "1g"
236+
spark.master = local
237+
}
238+
239+
source {
240+
Rocketmq {
241+
plugin_output = "rocketmq_table"
242+
name.srv.addr = "localhost:9876"
243+
topics = "test_topic"
244+
format = text
245+
# The default field delimiter is ","
246+
field_delimiter = ","
247+
tags = "test_tag"
248+
schema = {
249+
fields {
250+
id = bigint
251+
c_map = "map<string, smallint>"
252+
c_array = "array<tinyint>"
253+
c_string = string
254+
c_boolean = boolean
255+
c_tinyint = tinyint
256+
c_smallint = smallint
257+
c_int = int
258+
c_bigint = bigint
259+
c_float = float
260+
c_double = double
261+
c_decimal = "decimal(2, 1)"
262+
c_bytes = bytes
263+
c_date = date
264+
c_timestamp = timestamp
265+
}
266+
}
267+
}
268+
}
269+
270+
transform {
271+
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
272+
# please go to https://seatunnel.apache.org/docs/category/transform
273+
}
274+
275+
sink {
276+
Console {
277+
plugin_input = "rocketmq_table"
278+
}
279+
}
280+
```

Diff for: seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,17 @@ public class ConsumerConfig extends Config {
2929
.stringType()
3030
.noDefaultValue()
3131
.withDescription(
32-
"RocketMq topic name. If there are multiple topics, use , to split, for example: "
32+
"RocketMq topic name. If there are multiple topics, use `,` to split, for example: "
3333
+ "\"tpc1,tpc2\".");
34+
35+
public static final Option<String> TAGS =
36+
Options.key("tags")
37+
.stringType()
38+
.noDefaultValue()
39+
.withDescription(
40+
"RocketMq tag name. If there are multiple tags, use `,` to split, for example: "
41+
+ "\"tag1,tag2\".");
42+
3443
public static final Option<String> CONSUMER_GROUP =
3544
Options.key("consumer.group")
3645
.stringType()

Diff for: seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/ConsumerMetadata.java

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
public class ConsumerMetadata implements Serializable {
3434
private RocketMqBaseConfiguration baseConfig = RocketMqBaseConfiguration.newBuilder().build();
3535
private List<String> topics;
36+
private List<String> tags;
3637
private boolean enabledCommitCheckpoint = false;
3738
private StartMode startMode;
3839
private Map<MessageQueue, Long> specificStartOffsets;

Diff for: seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java

+16
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,10 @@
5555
import com.google.auto.service.AutoService;
5656

5757
import java.util.Arrays;
58+
import java.util.Collections;
5859
import java.util.HashMap;
5960
import java.util.Map;
61+
import java.util.stream.Collectors;
6062

6163
import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY;
6264
import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED;
@@ -112,6 +114,20 @@ public void prepare(Config config) throws PrepareFailException {
112114
config.getString(ConsumerConfig.TOPICS.key())
113115
.split(DEFAULT_FIELD_DELIMITER)));
114116

117+
if (config.hasPath(ConsumerConfig.TAGS.key())) {
118+
String tags = config.getString(ConsumerConfig.TAGS.key());
119+
if (tags != null && !tags.trim().isEmpty()) {
120+
this.metadata.setTags(
121+
Arrays.stream(tags.split(DEFAULT_FIELD_DELIMITER))
122+
.map(String::trim)
123+
.filter(tag -> !tag.isEmpty())
124+
.distinct()
125+
.collect(Collectors.toList()));
126+
} else {
127+
this.metadata.setTags(Collections.emptyList());
128+
}
129+
}
130+
115131
RocketMqBaseConfiguration.Builder baseConfigBuilder =
116132
RocketMqBaseConfiguration.newBuilder()
117133
.consumer()

Diff for: seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public OptionRule optionRule() {
4141
.required(ConsumerConfig.TOPICS, Config.NAME_SRV_ADDR)
4242
.optional(
4343
Config.FORMAT,
44+
ConsumerConfig.TAGS,
4445
ConsumerConfig.START_MODE,
4546
ConsumerConfig.CONSUMER_GROUP,
4647
ConsumerConfig.COMMIT_ON_CHECKPOINT,

Diff for: seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java

+14-3
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,20 @@ record ->
146146
.collect(Collectors.toList());
147147
long lastOffset = -1;
148148
for (MessageExt record : messages) {
149-
deserializationSchema.deserialize(
150-
record.getBody(), output);
151-
lastOffset = record.getQueueOffset();
149+
// Check if the tags are specified and match the
150+
// record's tag
151+
boolean shouldProcess =
152+
metadata.getTags() == null
153+
|| metadata.getTags().isEmpty()
154+
|| metadata.getTags()
155+
.contains(
156+
record
157+
.getTags());
158+
if (shouldProcess) {
159+
deserializationSchema.deserialize(
160+
record.getBody(), output);
161+
lastOffset = record.getQueueOffset();
162+
}
152163
if (Boundedness.BOUNDED.equals(
153164
context.getBoundedness())
154165
&& record.getQueueOffset()

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java

+68
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@
4949
import org.apache.rocketmq.common.message.Message;
5050
import org.apache.rocketmq.common.message.MessageExt;
5151
import org.apache.rocketmq.common.message.MessageQueue;
52+
import org.apache.rocketmq.common.protocol.route.QueueData;
53+
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
5254
import org.apache.rocketmq.remoting.protocol.LanguageCode;
55+
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
5356

5457
import org.junit.jupiter.api.AfterAll;
5558
import org.junit.jupiter.api.Assertions;
@@ -77,6 +80,7 @@
7780
import java.util.Map;
7881
import java.util.Set;
7982
import java.util.UUID;
83+
import java.util.stream.Collectors;
8084

8185
import static org.apache.seatunnel.e2e.connector.rocketmq.RocketMqContainer.NAMESRV_PORT;
8286

@@ -204,6 +208,42 @@ public void testTextFormatSinkRocketMq(TestContainer container)
204208
Assertions.assertEquals(10, data.size());
205209
}
206210

211+
@TestTemplate
212+
public void testSourceRocketMqTextTagToConsole(TestContainer container)
213+
throws IOException, InterruptedException {
214+
String topic = "test_topic_text_tag";
215+
String tag = "tag_test";
216+
217+
// delete topic if exist
218+
deleteTopicIfExist(topic);
219+
220+
DefaultSeaTunnelRowSerializer serializer =
221+
new DefaultSeaTunnelRowSerializer(
222+
topic, tag, SEATUNNEL_ROW_TYPE, SchemaFormat.TEXT, DEFAULT_FIELD_DELIMITER);
223+
generateTestData(serializer::serializeRow, topic, 0, 32);
224+
Container.ExecResult execResult =
225+
container.executeJob("/rocketmq-source_text_tag_to_console.conf");
226+
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
227+
}
228+
229+
@TestTemplate
230+
public void testSourceRocketMqTextErrorTagToConsole(TestContainer container)
231+
throws IOException, InterruptedException {
232+
String topic = "test_topic_text_error_tag";
233+
String tag = "test_error_tag";
234+
235+
// delete topic if exist
236+
deleteTopicIfExist(topic);
237+
238+
DefaultSeaTunnelRowSerializer serializer =
239+
new DefaultSeaTunnelRowSerializer(
240+
topic, tag, SEATUNNEL_ROW_TYPE, SchemaFormat.TEXT, DEFAULT_FIELD_DELIMITER);
241+
generateTestData(serializer::serializeRow, topic, 0, 32);
242+
Container.ExecResult execResult =
243+
container.executeJob("/rocketmq-source_text_error_tag_to_console.conf");
244+
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
245+
}
246+
207247
@TestTemplate
208248
public void testSourceRocketMqTextToConsole(TestContainer container)
209249
throws IOException, InterruptedException {
@@ -458,4 +498,32 @@ public RocketMqBaseConfiguration newConfiguration() {
458498
interface ProducerRecordConverter {
459499
Message convert(SeaTunnelRow row);
460500
}
501+
502+
private void deleteTopicIfExist(String topicName) {
503+
DefaultMQAdminExt admin = new DefaultMQAdminExt();
504+
admin.setInstanceName(UUID.randomUUID().toString());
505+
try {
506+
admin.start();
507+
TopicRouteData topicRouteData = admin.examineTopicRouteInfo(topicName);
508+
if (topicRouteData != null
509+
&& topicRouteData.getQueueDatas() != null
510+
&& !topicRouteData.getQueueDatas().isEmpty()) {
511+
Set<String> brokerNames =
512+
topicRouteData.getQueueDatas().stream()
513+
.map(QueueData::getBrokerName)
514+
.collect(Collectors.toSet());
515+
admin.deleteTopicInBroker(brokerNames, topicName);
516+
admin.deleteTopicInNameServer(brokerNames, topicName, "delete_topic");
517+
log.info("Deleted topic: {}", topicName);
518+
} else {
519+
log.info("Topic {} does not exist", topicName);
520+
}
521+
} catch (Exception e) {
522+
log.warn("Failed to delete topic {}: {}", topicName, e.getMessage());
523+
} finally {
524+
if (admin != null) {
525+
admin.shutdown();
526+
}
527+
}
528+
}
461529
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
parallelism = 1
23+
job.mode = "BATCH"
24+
25+
# You can set spark configuration here
26+
spark.app.name = "SeaTunnel"
27+
spark.executor.instances = 1
28+
spark.executor.cores = 1
29+
spark.executor.memory = "1g"
30+
spark.master = local
31+
}
32+
33+
source {
34+
Rocketmq {
35+
plugin_output = "rocketmq_table"
36+
name.srv.addr = "rocketmq-e2e:9876"
37+
topics = "test_topic_text_error_tag"
38+
format = text
39+
# The default field delimiter is ","
40+
field_delimiter = ","
41+
tags = "error_tag_test"
42+
schema = {
43+
fields {
44+
id = bigint
45+
c_map = "map<string, smallint>"
46+
c_array = "array<tinyint>"
47+
c_string = string
48+
c_boolean = boolean
49+
c_tinyint = tinyint
50+
c_smallint = smallint
51+
c_int = int
52+
c_bigint = bigint
53+
c_float = float
54+
c_double = double
55+
c_decimal = "decimal(2, 1)"
56+
c_bytes = bytes
57+
c_date = date
58+
c_timestamp = timestamp
59+
}
60+
}
61+
}
62+
}
63+
64+
transform {
65+
}
66+
67+
sink {
68+
Assert {
69+
plugin_input = "rocketmq_table"
70+
rules = {
71+
row_rules = [
72+
{
73+
rule_type = MIN_ROW
74+
rule_value = 0
75+
},
76+
{
77+
rule_type = MAX_ROW
78+
rule_value = 0
79+
}
80+
],
81+
field_rules = [
82+
{
83+
field_name = c_string
84+
field_type = string
85+
field_value = [
86+
{
87+
rule_type = NULL
88+
}
89+
]
90+
}
91+
]
92+
}
93+
}
94+
}

0 commit comments

Comments
 (0)