Skip to content

Commit 744740e

Browse files
committed
send messages for DLE directly to DLE queue
not through topic exchange because DLE does not connect to topic exchange
1 parent 2d1703c commit 744740e

File tree

4 files changed

+137
-91
lines changed

4 files changed

+137
-91
lines changed

context-monitoring/src/main/java/de/atb/context/monitoring/monitors/messagebroker/util/MessageBrokerUtil.java

+85-53
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,6 @@
1414
* #L%
1515
*/
1616

17-
import java.io.IOException;
18-
import java.nio.charset.StandardCharsets;
19-
import java.util.UUID;
20-
import java.util.concurrent.TimeoutException;
21-
2217
import com.google.gson.Gson;
2318
import com.rabbitmq.client.BuiltinExchangeType;
2419
import com.rabbitmq.client.CancelCallback;
@@ -31,6 +26,11 @@
3126
import org.slf4j.Logger;
3227
import org.slf4j.LoggerFactory;
3328

29+
import java.io.IOException;
30+
import java.nio.charset.StandardCharsets;
31+
import java.util.UUID;
32+
import java.util.concurrent.TimeoutException;
33+
3434
/**
3535
* Helper class wrapping methods for interacting with message broker.
3636
*/
@@ -58,22 +58,7 @@ public static Channel connectToTopicExchange(final String host,
5858
final String userName,
5959
final String password,
6060
final String exchange) throws IOException, TimeoutException {
61-
LOGGER.info("Connecting to messagebroker {}:{} with user {}", host, port, userName != null ? userName : "<null>");
62-
final ConnectionFactory factory = new ConnectionFactory();
63-
64-
factory.setHost(host);
65-
factory.setPort(port);
66-
67-
if (StringUtils.isNotBlank(userName)) {
68-
factory.setUsername(userName);
69-
}
70-
if (StringUtils.isNotBlank(password)) {
71-
factory.setPassword(password);
72-
}
73-
74-
factory.setAutomaticRecoveryEnabled(true);
75-
76-
final Connection connection = factory.newConnection();
61+
final Connection connection = getConnection(host, port, userName, password);
7762

7863
final Channel channel = connection.createChannel();
7964

@@ -84,40 +69,53 @@ public static Channel connectToTopicExchange(final String host,
8469
}
8570

8671
/**
87-
* Connect to the message broker specified by {@code host} and {@code port}.
72+
* Connect to message broker. Message broker details and credentials are specified in the given {@code dataSource}.
8873
* Create the given {@code exchange} if it does not exist yet.
8974
*
90-
* @param host the host where the message broker is running
91-
* @param port the port where the message broker is listening
92-
* @param exchange the topic exchange's name
75+
* @param dataSource the {@link MessageBrokerDataSource} containing the message broker connection details
9376
* @return a {@link Channel} object representing the established connection to the message broker
9477
* @throws IOException in case of error
9578
* @throws TimeoutException in case of error
9679
* @see MessageBrokerUtil#connectToTopicExchange(String, int, String, String, String)
9780
*/
98-
public static Channel connectToTopicExchange(final String host,
99-
final int port,
100-
final String exchange) throws IOException, TimeoutException {
101-
return connectToTopicExchange(host, port, null, null, exchange);
81+
public static Channel connectToTopicExchange(final MessageBrokerDataSource dataSource)
82+
throws IOException, TimeoutException {
83+
return connectToTopicExchange(
84+
dataSource.getMessageBrokerServer(),
85+
dataSource.getMessageBrokerPort(),
86+
dataSource.getUserName(),
87+
dataSource.getPassword(),
88+
dataSource.getExchange()
89+
);
10290
}
10391

10492
/**
105-
* Connect to message broker. Message broker details and credentials are specified in the given {@code dataSource}.
106-
* Create the given {@code exchange} if it does not exist yet.
93+
* Connect to the message broker specified by {@code host} and {@code port}
94+
* with the credentials specified by {@code userName} and {@code password}.
95+
* Create the given {@code queue} if it does not exist yet.
10796
*
108-
* @param dataSource the {@link MessageBrokerDataSource} containing the message broker connection details
97+
* @param host the host where the message broker is running
98+
* @param port the port where the message broker is listening
99+
* @param userName the username to use when connecting to message broker - optional
100+
* @param password the password to use when connecting to message broker - optional
101+
* @param queue the queue's name
109102
* @return a {@link Channel} object representing the established connection to the message broker
110103
* @throws IOException in case of error
111104
* @throws TimeoutException in case of error
112-
* @see MessageBrokerUtil#connectToTopicExchange(String, int, String, String, String)
113105
*/
114-
public static Channel connectToTopicExchange(final MessageBrokerDataSource dataSource)
115-
throws IOException, TimeoutException {
116-
return connectToTopicExchange(dataSource.getMessageBrokerServer(),
117-
dataSource.getMessageBrokerPort(),
118-
dataSource.getUserName(),
119-
dataSource.getPassword(),
120-
dataSource.getExchange());
106+
public static Channel connectToQueue(final String host,
107+
final int port,
108+
final String userName,
109+
final String password,
110+
final String queue) throws IOException, TimeoutException {
111+
final Connection connection = getConnection(host, port, userName, password);
112+
113+
final Channel channel = connection.createChannel();
114+
115+
LOGGER.info("Creating queue {}", queue);
116+
channel.queueDeclare(queue, true, false, false, null);
117+
118+
return channel;
121119
}
122120

123121
/**
@@ -165,12 +163,14 @@ public static void registerListenerOnTopic(final Channel channel,
165163
final MessageBrokerDataSource dataSource,
166164
final DeliverCallback deliverCallback,
167165
final CancelCallback cancelCallback) throws IOException {
168-
registerListenerOnTopic(channel,
169-
dataSource.getExchange(),
170-
dataSource.getTopic(),
171-
dataSource.getId(),
172-
deliverCallback,
173-
cancelCallback);
166+
registerListenerOnTopic(
167+
channel,
168+
dataSource.getExchange(),
169+
dataSource.getTopic(),
170+
dataSource.getId(),
171+
deliverCallback,
172+
cancelCallback
173+
);
174174
}
175175

176176
/**
@@ -191,14 +191,46 @@ public static void convertAndSendToTopic(final Channel channel,
191191
final String topic,
192192
final Object payload) throws IOException {
193193
final String jsonMessage = GSON.toJson(payload);
194-
sendToTopic(channel, exchange, topic, jsonMessage);
195-
}
196-
197-
private static void sendToTopic(final Channel channel,
198-
final String exchange,
199-
final String topic,
200-
final String jsonMessage) throws IOException {
201194
LOGGER.info("Publishing message to topic {}/{}: {}", exchange, topic, jsonMessage);
202195
channel.basicPublish(exchange, topic, null, jsonMessage.getBytes(StandardCharsets.UTF_8));
203196
}
197+
198+
/**
199+
* Converts the given {@code payload} object to a JSON string and sends it to the given {@code queue}.
200+
* <p>
201+
* Use {@link MessageBrokerUtil#connectToQueue(String, int, String, String, String)} to create {@link Channel}.
202+
*
203+
* @param channel the {@link Channel} object representing the established connection to the message broker
204+
* @param queue the queue's name
205+
* @param payload the object to send
206+
* @throws IOException in case of error
207+
*/
208+
public static void convertAndSendToQueue(final Channel channel, final String queue, final Object payload)
209+
throws IOException {
210+
final String jsonMessage = GSON.toJson(payload);
211+
LOGGER.info("Publishing message to queue {}: {}", queue, jsonMessage);
212+
channel.basicPublish("", queue, null, jsonMessage.getBytes(StandardCharsets.UTF_8));
213+
}
214+
215+
private static Connection getConnection(final String host,
216+
final int port,
217+
final String userName,
218+
final String password) throws IOException, TimeoutException {
219+
LOGGER.info("Connecting to messagebroker {}:{} with user {}", host, port, userName != null ? userName : "<null>");
220+
final ConnectionFactory factory = new ConnectionFactory();
221+
222+
factory.setHost(host);
223+
factory.setPort(port);
224+
225+
if (StringUtils.isNotBlank(userName)) {
226+
factory.setUsername(userName);
227+
}
228+
if (StringUtils.isNotBlank(password)) {
229+
factory.setPassword(password);
230+
}
231+
232+
factory.setAutomaticRecoveryEnabled(true);
233+
234+
return factory.newConnection();
235+
}
204236
}

smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/dle/listener/DleGitMonitorProgressListener.java

+19-15
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@
1414
* #L%
1515
*/
1616

17-
import java.io.IOException;
18-
import java.util.List;
19-
import java.util.concurrent.TimeoutException;
20-
2117
import com.rabbitmq.client.Channel;
2218
import de.atb.context.monitoring.config.models.datasources.MessageBrokerDataSource;
2319
import de.atb.context.monitoring.events.MonitoringProgressListener;
@@ -31,19 +27,27 @@
3127
import org.slf4j.Logger;
3228
import org.slf4j.LoggerFactory;
3329

30+
import java.io.IOException;
31+
import java.util.List;
32+
import java.util.concurrent.TimeoutException;
33+
3434
public class DleGitMonitorProgressListener implements MonitoringProgressListener<String, IMonitoringDataModel<?, ?>> {
3535

3636
private static final Logger LOGGER = LoggerFactory.getLogger(DleGitMonitorProgressListener.class);
3737

38-
private final String topic;
39-
private final String exchange;
38+
private final String dleQueue;
4039
private final Channel channel;
4140

4241
public DleGitMonitorProgressListener(final MessageBrokerDataSource messageBrokerDataSource)
4342
throws IOException, TimeoutException {
44-
exchange = messageBrokerDataSource.getExchange();
45-
topic = messageBrokerDataSource.getDleTopic();
46-
channel = MessageBrokerUtil.connectToTopicExchange(messageBrokerDataSource);
43+
dleQueue = messageBrokerDataSource.getDleTopic();
44+
channel = MessageBrokerUtil.connectToQueue(
45+
messageBrokerDataSource.getMessageBrokerServer(),
46+
messageBrokerDataSource.getMessageBrokerPort(),
47+
messageBrokerDataSource.getUserName(),
48+
messageBrokerDataSource.getPassword(),
49+
dleQueue
50+
);
4751
}
4852

4953
@Override
@@ -71,20 +75,20 @@ public void documentAnalysed(final List<IMonitoringDataModel<?, ?>> analysed,
7175
private DleMessage convertToDleMessage(final GitMessage gitMessage) {
7276
return DleMessage.builder()
7377
.monitor(CommitMessage.builder()
74-
.user(gitMessage.getUser())
75-
.branch(gitMessage.getBranch())
76-
.files(gitMessage.getNoOfModifiedFiles())
77-
.build())
78+
.user(gitMessage.getUser())
79+
.branch(gitMessage.getBranch())
80+
.files(gitMessage.getNoOfModifiedFiles())
81+
.build())
7882
.build();
7983
}
8084

8185
private void send(final DleMessage dleMessage) {
8286
try {
8387
// simulate that actual context-extraction will take some time
8488
Thread.sleep(1000);
85-
MessageBrokerUtil.convertAndSendToTopic(channel, exchange, topic, dleMessage);
89+
MessageBrokerUtil.convertAndSendToQueue(channel, dleQueue, dleMessage);
8690
} catch (Exception e) {
87-
LOGGER.error("Failed to send {} to {}/{}", dleMessage, exchange, topic, e);
91+
LOGGER.error("Failed to send {} to {}/{}", dleMessage, "", dleQueue, e);
8892
}
8993
}
9094
}

smartclide-monitoring/src/test/java/de/atb/context/monitoring/TestDataRetrieval.java

+32-22
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,5 @@
11
package de.atb.context.monitoring;
22

3-
import java.io.File;
4-
import java.io.IOException;
5-
import java.nio.charset.StandardCharsets;
6-
import java.nio.file.Path;
7-
import java.time.ZonedDateTime;
8-
import java.time.format.DateTimeFormatter;
9-
import java.util.List;
10-
import java.util.Map;
11-
import java.util.concurrent.TimeoutException;
12-
133
import com.rabbitmq.client.Channel;
144
import de.atb.context.common.ContextPathUtils;
155
import de.atb.context.common.util.ApplicationScenario;
@@ -31,6 +21,16 @@
3121
import org.slf4j.LoggerFactory;
3222
import org.testcontainers.containers.RabbitMQContainer;
3323

24+
import java.io.File;
25+
import java.io.IOException;
26+
import java.nio.charset.StandardCharsets;
27+
import java.nio.file.Path;
28+
import java.time.ZonedDateTime;
29+
import java.time.format.DateTimeFormatter;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.concurrent.TimeoutException;
33+
3434
import static org.junit.Assert.assertEquals;
3535

3636
/**
@@ -44,10 +44,9 @@ public class TestDataRetrieval {
4444
private static final Logger logger = LoggerFactory.getLogger(TestDataRetrieval.class);
4545

4646
private static final String RABBITMQ_3_ALPINE = "rabbitmq:3-alpine";
47-
private static final String EXCHANGE_NAME = "smartclide-monitoring";
47+
private static final String EXCHANGE_NAME = "mom";
4848
private static final String ROUTING_KEY_MONITORING = "monitoring.git.commits";
49-
private static final String ROUTING_KEY_DLE = "dle.git.commits";
50-
private static final String QUEUE_PREFIX_DLE = "Fake-DLE";
49+
private static final String QUEUE_NAME_DLE = "code_repo_recommendation_queue";
5150
private static final String DATASOURCE_GIT = "datasource-git";
5251
private static final String MONITORING_CONFIG_FILE_NAME = "monitoring-config.xml";
5352
private static final String AMI_REPOSITORY_ID = "AmI-repository";
@@ -66,10 +65,16 @@ public void setup() throws Exception {
6665
// setup message broker
6766
final String rabbitMQContainerHost = container.getHost();
6867
final Integer rabbitMQContainerAmqpPort = container.getAmqpPort();
69-
channel = MessageBrokerUtil.connectToTopicExchange(rabbitMQContainerHost, rabbitMQContainerAmqpPort, EXCHANGE_NAME);
68+
channel = MessageBrokerUtil.connectToTopicExchange(
69+
rabbitMQContainerHost,
70+
rabbitMQContainerAmqpPort,
71+
null,
72+
null,
73+
EXCHANGE_NAME
74+
);
7075

7176
// setup fake DLE
72-
createFakeDleListener();
77+
createFakeDleListener(rabbitMQContainerHost, rabbitMQContainerAmqpPort);
7378

7479
// write dynamically allocated message broker host and port to monitoring config file
7580
final Path monitoringConfigFilePath = ContextPathUtils.getConfigDirPath().resolve(MONITORING_CONFIG_FILE_NAME);
@@ -145,15 +150,20 @@ private void updateMessageBrokerDataSource(final Path monitoringConfig, final St
145150
persister.write(config, new File(monitoringConfig.toString()));
146151
}
147152

148-
private void createFakeDleListener() throws IOException {
149-
MessageBrokerUtil.registerListenerOnTopic(
150-
channel,
151-
EXCHANGE_NAME,
152-
ROUTING_KEY_DLE,
153-
QUEUE_PREFIX_DLE,
153+
private void createFakeDleListener(final String rabbitMQContainerHost, final Integer rabbitMQContainerAmqpPort)
154+
throws IOException, TimeoutException {
155+
final Channel channel = MessageBrokerUtil.connectToQueue(
156+
rabbitMQContainerHost,
157+
rabbitMQContainerAmqpPort,
158+
null,
159+
null,
160+
QUEUE_NAME_DLE
161+
);
162+
channel.basicConsume(
163+
QUEUE_NAME_DLE,
164+
true,
154165
(t, m) -> logger.info("DLE received message: {}", new String(m.getBody(), StandardCharsets.UTF_8)),
155166
(t) -> logger.info("cancelled!")
156167
);
157168
}
158-
159169
}

smartclide-monitoring/src/test/resources/config/monitoring-config.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<datasource id="datasource-git" type="messagebroker"
1212
monitor="de.atb.context.monitoring.monitors.GitMonitor"
1313
uri=""
14-
options="server=localhost&amp;port=5672&amp;exchange=smartclide-monitoring&amp;topic=monitoring.git.*&amp;dle-topic=dle.git.commits"
14+
options="server=localhost&amp;port=5672&amp;exchange=mom&amp;topic=monitoring.git.*&amp;dle-topic=code_repo_recommendation_queue"
1515
class="de.atb.context.monitoring.config.models.datasources.MessageBrokerDataSource"/>
1616
</datasources>
1717

0 commit comments

Comments
 (0)