Skip to content

Commit ec925cf

Browse files
author
luops
committed
[feature][connectors-v2][ActiveMQ] add blocking queue and thread pools. blocking until message consume and concurrent processing.
1 parent 1e1ab10 commit ec925cf

File tree

4 files changed

+147
-24
lines changed

4 files changed

+147
-24
lines changed

Diff for: seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/source/ActivemqSourceReader.java

+46-24
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,12 @@
2727
import org.apache.seatunnel.connectors.seatunnel.activemq.exception.ActivemqConnectorErrorCode;
2828
import org.apache.seatunnel.connectors.seatunnel.activemq.exception.ActivemqConnectorException;
2929
import org.apache.seatunnel.connectors.seatunnel.activemq.split.ActivemqSplit;
30-
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
3130

3231
import lombok.extern.slf4j.Slf4j;
3332

3433
import javax.jms.JMSException;
3534
import javax.jms.Message;
3635
import javax.jms.MessageConsumer;
37-
import javax.jms.MessageListener;
3836
import javax.jms.TextMessage;
3937

4038
import java.io.IOException;
@@ -45,14 +43,20 @@
4543
import java.util.Set;
4644
import java.util.SortedMap;
4745
import java.util.TreeMap;
46+
import java.util.concurrent.CompletableFuture;
47+
import java.util.concurrent.ExecutorService;
48+
import java.util.concurrent.Executors;
49+
import java.util.concurrent.LinkedBlockingQueue;
50+
import java.util.concurrent.TimeUnit;
4851

4952
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqOptions.QUEUE_NAME;
5053
import static org.apache.seatunnel.connectors.seatunnel.activemq.config.ActivemqSourceOptions.USE_CORRELATION_ID;
5154
import static org.apache.seatunnel.connectors.seatunnel.activemq.exception.ActivemqConnectorErrorCode.HANDLE_SHUTDOWN_SIGNAL_FAILED;
52-
import static org.apache.seatunnel.connectors.seatunnel.activemq.exception.ActivemqConnectorErrorCode.MESSAGE_ACK_FAILED;
5355

5456
@Slf4j
5557
public class ActivemqSourceReader<T> implements SourceReader<T, ActivemqSplit> {
58+
private static final long POLL_TIMEOUT_MILLIS = 1000L;
59+
5660
protected final Context context;
5761
protected final MessageConsumer consumer;
5862
protected transient Set<String> correlationIdsProcessedButNotAcknowledged;
@@ -62,6 +66,8 @@ public class ActivemqSourceReader<T> implements SourceReader<T, ActivemqSplit> {
6266
private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
6367
private ActivemqClient activemqClient;
6468
private final ReadonlyConfig config;
69+
private final ExecutorService executorService;
70+
private final LinkedBlockingQueue<Message> messageQueue;
6571

6672
public ActivemqSourceReader(
6773
DeserializationSchema<SeaTunnelRow> deserializationSchema,
@@ -74,12 +80,26 @@ public ActivemqSourceReader(
7480
this.config = config;
7581
this.activemqClient = new ActivemqClient(config);
7682
this.consumer = activemqClient.getConsumer();
83+
this.executorService =
84+
Executors.newCachedThreadPool(r -> new Thread(r, "ActiveMQ Source Data Consumer"));
85+
this.messageQueue = new LinkedBlockingQueue<>();
7786
}
7887

7988
@Override
8089
public void open() throws Exception {
8190
this.correlationIdsProcessedButNotAcknowledged = new HashSet<>();
8291
this.massageIdsProcessedForCurrentSnapshot = new ArrayList<>();
92+
// start consumer listening and put messages in a queue
93+
consumer.setMessageListener(
94+
message -> {
95+
try {
96+
messageQueue.put(message);
97+
} catch (InterruptedException e) {
98+
Thread.currentThread().interrupt();
99+
throw new ActivemqConnectorException(
100+
ActivemqConnectorErrorCode.HANDLE_SHUTDOWN_SIGNAL_FAILED, e);
101+
}
102+
});
83103
}
84104

85105
@Override
@@ -97,16 +117,26 @@ public void close() throws IOException {
97117
if (activemqClient != null) {
98118
activemqClient.close();
99119
}
120+
if (executorService != null) {
121+
executorService.shutdownNow();
122+
}
100123
}
101124

102125
@Override
103126
public void pollNext(Collector output) throws Exception {
104-
consumer.setMessageListener(
105-
new MessageListener() {
106-
@Override
107-
public void onMessage(Message message) {
127+
while (true) {
128+
Message message = messageQueue.poll(POLL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
129+
if (message == null) {
130+
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
131+
break;
132+
}
133+
continue;
134+
}
135+
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
136+
executorService.submit(
137+
() -> {
108138
try {
109-
if (message != null && message instanceof TextMessage) {
139+
if (message instanceof TextMessage) {
110140
TextMessage textMessage = (TextMessage) message;
111141
String correlationId = textMessage.getJMSCorrelationID();
112142
byte[] body = textMessage.getText().getBytes();
@@ -117,29 +147,21 @@ public void onMessage(Message message) {
117147
}
118148
massageIdsProcessedForCurrentSnapshot.add(
119149
message.getJMSMessageID());
120-
try {
121-
if (deserializationSchema
122-
instanceof JsonDeserializationSchema) {
123-
((JsonDeserializationSchema) deserializationSchema)
124-
.collect(body, output);
125-
} else {
126-
deserializationSchema.deserialize(body, output);
127-
}
128-
} catch (IOException e) {
129-
log.error("Failed to deserialize message", e);
130-
}
150+
deserializationSchema.deserialize(body, output);
151+
131152
// verify that message processing is complete
132153
textMessage.acknowledge();
133154
}
134155
}
135-
} catch (JMSException e) {
136-
throw new ActivemqConnectorException(MESSAGE_ACK_FAILED, e);
137156
} catch (Exception e) {
157+
completableFuture.join();
138158
throw new ActivemqConnectorException(HANDLE_SHUTDOWN_SIGNAL_FAILED, e);
139159
}
140-
}
141-
});
142-
// source support streaming mode, this is for test
160+
completableFuture.complete(null);
161+
});
162+
completableFuture.join();
163+
}
164+
143165
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
144166
// signal to the source that we have reached the end of the data.
145167
context.signalNoMoreElement();

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-activemq-e2e/pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -56,5 +56,12 @@
5656
<artifactId>connector-activemq</artifactId>
5757
<version>${project.version}</version>
5858
</dependency>
59+
60+
<dependency>
61+
<groupId>org.apache.seatunnel</groupId>
62+
<artifactId>connector-assert</artifactId>
63+
<version>${project.version}</version>
64+
<scope>test</scope>
65+
</dependency>
5966
</dependencies>
6067
</project>

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-activemq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/activemq/ActivemqIT.java

+7
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,13 @@ public void testActiveMQ(TestContainer container) throws Exception {
190190
}
191191
}
192192

193+
@TestTemplate
194+
public void testSourceActiveMQJsonToConsole(TestContainer container) throws Exception {
195+
initSourceData();
196+
Container.ExecResult execResult = container.executeJob("/activemq-source_to_console.conf");
197+
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
198+
}
199+
193200
private static Pair<SeaTunnelRowType, List<SeaTunnelRow>> generateTestDataSet() {
194201

195202
SeaTunnelRowType rowType =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
######
19+
###### This config file is a demonstration of batch processing in SeaTunnel config
20+
######
21+
22+
env {
23+
parallelism = 1
24+
job.mode = "BATCH"
25+
}
26+
27+
source {
28+
ActiveMQ {
29+
uri="tcp://activemq-host:61616"
30+
username = "admin"
31+
password = "admin"
32+
queue_name = "sourceQueue"
33+
format = json
34+
plugin_output = "activemq_table"
35+
schema = {
36+
fields {
37+
id = bigint
38+
c_map = "map<string, smallint>"
39+
c_array = "array<tinyint>"
40+
c_string = string
41+
c_boolean = boolean
42+
c_tinyint = tinyint
43+
c_smallint = smallint
44+
c_int = int
45+
c_bigint = bigint
46+
c_float = float
47+
c_double = double
48+
c_decimal = "decimal(2, 1)"
49+
c_bytes = bytes
50+
c_date = date
51+
c_timestamp = timestamp
52+
}
53+
}
54+
}
55+
}
56+
57+
transform {
58+
}
59+
60+
sink {
61+
Assert {
62+
plugin_input = "activemq_table"
63+
rules =
64+
{
65+
field_rules = [
66+
{
67+
field_name = id
68+
field_type = bigint
69+
field_value = [
70+
{
71+
rule_type = NOT_NULL
72+
},
73+
{
74+
rule_type = MIN
75+
rule_value = 0
76+
},
77+
{
78+
rule_type = MAX
79+
rule_value = 9
80+
}
81+
]
82+
}
83+
]
84+
}
85+
86+
}
87+
}

0 commit comments

Comments
 (0)