Skip to content

Commit 27c4a23

Browse files
authored
Merge pull request #31 from eclipse-researchlabs/register-monitoring-progress-listener
implement DleGitMonitorProgressListener
2 parents 9646d75 + 1b14053 commit 27c4a23

File tree

11 files changed

+344
-108
lines changed

11 files changed

+344
-108
lines changed

context-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/MessageBrokerDataSource.java

+4
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ public final String getTopic() {
7777
return this.getOptionValue(MessageBrokerDataSourceOptions.Topic, true);
7878
}
7979

80+
public final String getDleTopic() {
81+
return this.getOptionValue(MessageBrokerDataSourceOptions.DleTopic, true);
82+
}
83+
8084
public final Credentials getCredentials() {
8185
String userName = this.getUserName();
8286
String password = this.getPassword();

context-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/MessageBrokerDataSourceOptions.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ public enum MessageBrokerDataSourceOptions implements IDataSourceOptionValue {
3838

3939
Exchange("exchange", String.class),
4040

41-
Topic("topic", String.class);
41+
Topic("topic", String.class),
42+
43+
DleTopic("dle-topic", String.class);
4244

4345
private final String key;
4446
private final Class<? extends Serializable> valueType;

context-monitoring/src/main/java/de/atb/context/monitoring/monitors/AbstractMonitor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.slf4j.Logger;
2626
import org.slf4j.LoggerFactory;
2727

28-
public class AbstractMonitor<P> implements MonitoringProgressListener<P, IMonitoringDataModel<?, ?>> {
28+
public abstract class AbstractMonitor<P> implements MonitoringProgressListener<P, IMonitoringDataModel<?, ?>> {
2929

3030
protected final Logger logger = LoggerFactory.getLogger(AbstractMonitor.class);
3131

context-monitoring/src/main/java/de/atb/context/monitoring/monitors/ThreadedMonitor.java

+3-17
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public abstract class ThreadedMonitor<P, A extends IMonitoringDataModel<?, ?>> e
4545
protected Interpreter interpreter;
4646
protected Monitor monitor;
4747
protected AmIMonitoringConfiguration amiConfiguration;
48-
protected List<MonitoringProgressListener<P, A>> progressListeners = new ArrayList<>();
48+
protected final List<MonitoringProgressListener<P, A>> progressListeners = new ArrayList<>();
4949

5050
private Thread thread;
5151

@@ -152,28 +152,14 @@ protected void parseAndAnalyse(final P objectToParse,
152152
final IndexingAnalyser<A, P> analyser) {
153153
if (parser.parse(objectToParse)) {
154154
final Document document = parser.getDocument();
155-
this.indexer.addDocumentToIndex(document);
156155
this.raiseParsedEvent(objectToParse, document);
156+
this.indexer.addDocumentToIndex(document);
157+
this.raiseIndexedEvent(document);
157158
final List<A> analysedModels = analyser.analyse(objectToParse);
158159
this.raiseAnalysedEvent(analysedModels, objectToParse, analyser.getDocument());
159160
}
160161
}
161162

162-
/**
163-
* Adds the given Document to the Index underlying this indexer and informs
164-
* potential listeners about the indexed document.
165-
* <p>
166-
* Please note that calling this method will not check if the given document
167-
* already exists in the index. Therefore checking has to be implemented by
168-
* other classes (see {@link IndexingParser#isIndexUpToDate(String, long)}.
169-
*
170-
* @param document the Document to be indexed.
171-
*/
172-
protected final void addDocumentToIndex(final Document document) {
173-
this.indexer.addDocumentToIndex(document);
174-
raiseIndexedEvent(document);
175-
}
176-
177163
/**
178164
* Notifies all registered MonitoringProgressListeners about the document
179165
* that was indexed recently.

context-monitoring/src/main/java/de/atb/context/monitoring/monitors/messagebroker/MessageBrokerMonitor.java

+4-33
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,10 @@
1414
* #L%
1515
*/
1616

17-
import java.io.IOException;
1817
import java.nio.charset.StandardCharsets;
19-
import java.util.concurrent.TimeoutException;
2018

21-
import com.rabbitmq.client.BuiltinExchangeType;
2219
import com.rabbitmq.client.CancelCallback;
2320
import com.rabbitmq.client.Channel;
24-
import com.rabbitmq.client.Connection;
25-
import com.rabbitmq.client.ConnectionFactory;
2621
import com.rabbitmq.client.DeliverCallback;
2722
import com.rabbitmq.client.Envelope;
2823
import de.atb.context.monitoring.analyser.messagebroker.MessageBrokerAnalyser;
@@ -35,9 +30,9 @@
3530
import de.atb.context.monitoring.index.Indexer;
3631
import de.atb.context.monitoring.models.IMonitoringDataModel;
3732
import de.atb.context.monitoring.monitors.ScheduledExecutorThreadedMonitor;
33+
import de.atb.context.monitoring.monitors.messagebroker.util.MessageBrokerUtil;
3834
import de.atb.context.monitoring.parser.messagebroker.MessageBrokerParser;
3935
import de.atb.context.tools.ontology.AmIMonitoringConfiguration;
40-
import org.apache.commons.lang3.StringUtils;
4136

4237
/**
4338
* WebServiceMonitor
@@ -56,8 +51,7 @@ public class MessageBrokerMonitor extends ScheduledExecutorThreadedMonitor<Strin
5651
handleMessage(delivery.getEnvelope(), message);
5752
};
5853

59-
private final CancelCallback cancelCallback = consumerTag -> {
60-
};
54+
private final CancelCallback cancelCallback = consumerTag -> logger.info("{} cancelled!", consumerTag);
6155

6256
public MessageBrokerMonitor(final DataSource dataSource,
6357
final Interpreter interpreter,
@@ -95,11 +89,8 @@ protected void doMonitor(final InterpreterConfiguration setting) throws Exceptio
9589
this.parser = getParser(setting);
9690
}
9791

98-
final Channel channel = createChannel();
99-
channel.exchangeDeclare(dataSource.getExchange(), BuiltinExchangeType.TOPIC, true);
100-
final String queue = channel.queueDeclare("", true, false, false, null).getQueue();
101-
channel.queueBind(queue, dataSource.getExchange(), dataSource.getTopic());
102-
channel.basicConsume(queue, true, deliverCallback, cancelCallback);
92+
final Channel channel = MessageBrokerUtil.connectToTopicExchange(dataSource);
93+
MessageBrokerUtil.registerListenerOnTopic(channel, dataSource, deliverCallback, cancelCallback);
10394
}
10495
}
10596

@@ -121,24 +112,4 @@ protected final void handleMessage(Envelope envelope, String message) {
121112
logger.error("Unknown error in MessageBrokerMonitor.handleMessageBroker()", e);
122113
}
123114
}
124-
125-
private Channel createChannel() throws IOException, TimeoutException {
126-
final ConnectionFactory factory = new ConnectionFactory();
127-
128-
factory.setHost(dataSource.getMessageBrokerServer());
129-
factory.setPort(dataSource.getMessageBrokerPort());
130-
131-
final String userName = dataSource.getUserName();
132-
if (StringUtils.isNotBlank(userName)) {
133-
factory.setUsername(userName);
134-
}
135-
final String password = dataSource.getPassword();
136-
if (StringUtils.isNotBlank(password)) {
137-
factory.setPassword(password);
138-
}
139-
140-
final Connection connection = factory.newConnection();
141-
142-
return connection.createChannel();
143-
}
144115
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package de.atb.context.monitoring.monitors.messagebroker.util;
2+
3+
/*-
4+
* #%L
5+
* ATB Context Monitoring Core Services
6+
* %%
7+
* Copyright (C) 2015 - 2021 ATB – Institut für angewandte Systemtechnik Bremen GmbH
8+
* %%
9+
* This program and the accompanying materials are made
10+
* available under the terms of the Eclipse Public License 2.0
11+
* which is available at https://www.eclipse.org/legal/epl-2.0/
12+
*
13+
* SPDX-License-Identifier: EPL-2.0
14+
* #L%
15+
*/
16+
17+
import java.io.IOException;
18+
import java.nio.charset.StandardCharsets;
19+
import java.util.UUID;
20+
import java.util.concurrent.TimeoutException;
21+
22+
import com.google.gson.Gson;
23+
import com.rabbitmq.client.BuiltinExchangeType;
24+
import com.rabbitmq.client.CancelCallback;
25+
import com.rabbitmq.client.Channel;
26+
import com.rabbitmq.client.Connection;
27+
import com.rabbitmq.client.ConnectionFactory;
28+
import com.rabbitmq.client.DeliverCallback;
29+
import de.atb.context.monitoring.config.models.datasources.MessageBrokerDataSource;
30+
import org.apache.commons.lang3.StringUtils;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
/**
35+
* Helper class wrapping methods for interacting with message broker.
36+
*/
37+
public class MessageBrokerUtil {
38+
39+
private static final Logger LOGGER = LoggerFactory.getLogger(MessageBrokerUtil.class);
40+
private static final Gson GSON = new Gson();
41+
42+
/**
43+
* Connect to the message broker specified by {@code host} and {@code port}
44+
* with the credentials specified by {@code userName} and {@code password}.
45+
* Create the given {@code exchange} if it does not exist yet.
46+
*
47+
* @param host the host where the message broker is running
48+
* @param port the port where the message broker is listening
49+
* @param userName the username to use when connecting to message broker - optional
50+
* @param password the password to use when connecting to message broker - optional
51+
* @param exchange the topic exchange's name
52+
* @return a {@link Channel} object representing the established connection to the message broker
53+
* @throws IOException in case of error
54+
* @throws TimeoutException in case of error
55+
*/
56+
public static Channel connectToTopicExchange(final String host,
57+
final int port,
58+
final String userName,
59+
final String password,
60+
final String exchange) throws IOException, TimeoutException {
61+
LOGGER.info("Connecting to messagebroker {}:{} with user {}", host, port, userName != null ? userName : "<null>");
62+
final ConnectionFactory factory = new ConnectionFactory();
63+
64+
factory.setHost(host);
65+
factory.setPort(port);
66+
67+
if (StringUtils.isNotBlank(userName)) {
68+
factory.setUsername(userName);
69+
}
70+
if (StringUtils.isNotBlank(password)) {
71+
factory.setPassword(password);
72+
}
73+
74+
factory.setAutomaticRecoveryEnabled(true);
75+
76+
final Connection connection = factory.newConnection();
77+
78+
final Channel channel = connection.createChannel();
79+
80+
LOGGER.info("Creating topic exchange {}", exchange);
81+
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
82+
83+
return channel;
84+
}
85+
86+
/**
87+
* Connect to the message broker specified by {@code host} and {@code port}.
88+
* Create the given {@code exchange} if it does not exist yet.
89+
*
90+
* @param host the host where the message broker is running
91+
* @param port the port where the message broker is listening
92+
* @param exchange the topic exchange's name
93+
* @return a {@link Channel} object representing the established connection to the message broker
94+
* @throws IOException in case of error
95+
* @throws TimeoutException in case of error
96+
* @see MessageBrokerUtil#connectToTopicExchange(String, int, String, String, String)
97+
*/
98+
public static Channel connectToTopicExchange(final String host,
99+
final int port,
100+
final String exchange) throws IOException, TimeoutException {
101+
return connectToTopicExchange(host, port, null, null, exchange);
102+
}
103+
104+
/**
105+
* Connect to message broker. Message broker details and credentials are specified in the given {@code dataSource}.
106+
* Create the given {@code exchange} if it does not exist yet.
107+
*
108+
* @param dataSource the {@link MessageBrokerDataSource} containing the message broker connection details
109+
* @return a {@link Channel} object representing the established connection to the message broker
110+
* @throws IOException in case of error
111+
* @throws TimeoutException in case of error
112+
* @see MessageBrokerUtil#connectToTopicExchange(String, int, String, String, String)
113+
*/
114+
public static Channel connectToTopicExchange(final MessageBrokerDataSource dataSource)
115+
throws IOException, TimeoutException {
116+
return connectToTopicExchange(dataSource.getMessageBrokerServer(),
117+
dataSource.getMessageBrokerPort(),
118+
dataSource.getUserName(),
119+
dataSource.getPassword(),
120+
dataSource.getExchange());
121+
}
122+
123+
/**
124+
* Register the given callback functions to consume messages on the given {@code exchange} for the given {@code topic}.
125+
* <p>
126+
* Use {@link MessageBrokerUtil#connectToTopicExchange(String, int, String, String, String)} or one of its overloads
127+
* to create {@link Channel}.
128+
*
129+
* @param channel the {@link Channel} object representing the established connection to the message broker
130+
* @param exchange the topic exchange's name
131+
* @param topic the topic's name
132+
* @param queuePrefix the prefix to attach to the queue's name
133+
* @param deliverCallback callback function to handle received messages
134+
* @param cancelCallback callback function to handle cancellation of the listener
135+
* @throws IOException in case of error
136+
*/
137+
public static void registerListenerOnTopic(final Channel channel,
138+
final String exchange,
139+
final String topic,
140+
final String queuePrefix,
141+
final DeliverCallback deliverCallback,
142+
final CancelCallback cancelCallback) throws IOException {
143+
LOGGER.info("Registering listener on topic {}/{}", exchange, topic);
144+
final String queueName = String.format("%s-%s", queuePrefix, UUID.randomUUID());
145+
final String queue = channel.queueDeclare(queueName, true, true, true, null).getQueue();
146+
LOGGER.info("Created queue {}", queue);
147+
channel.queueBind(queue, exchange, topic);
148+
channel.basicConsume(queue, true, deliverCallback, cancelCallback);
149+
}
150+
151+
/**
152+
* Register the given callback functions to consume messages.
153+
* The exchange and topic to register for are specified in the given {@code dataSource}.
154+
* <p>
155+
* Use {@link MessageBrokerUtil#connectToTopicExchange(String, int, String, String, String)} or one of its overloads
156+
* to create {@link Channel}.
157+
*
158+
* @param channel the {@link Channel} object representing the established connection to the message broker
159+
* @param dataSource the {@link MessageBrokerDataSource} containing the exchange and topic details
160+
* @param deliverCallback callback function to handle received messages
161+
* @param cancelCallback callback function to handle cancellation of the listener
162+
* @throws IOException in case of error
163+
*/
164+
public static void registerListenerOnTopic(final Channel channel,
165+
final MessageBrokerDataSource dataSource,
166+
final DeliverCallback deliverCallback,
167+
final CancelCallback cancelCallback) throws IOException {
168+
registerListenerOnTopic(channel,
169+
dataSource.getExchange(),
170+
dataSource.getTopic(),
171+
dataSource.getId(),
172+
deliverCallback,
173+
cancelCallback);
174+
}
175+
176+
/**
177+
* Converts the given {@code payload} object to a JSON string
178+
* and sends it to the given {@code topic} on the given {@code exchange}.
179+
* <p>
180+
* Use {@link MessageBrokerUtil#connectToTopicExchange(String, int, String, String, String)} or one of its overloads
181+
* to create {@link Channel}.
182+
*
183+
* @param channel the {@link Channel} object representing the established connection to the message broker
184+
* @param exchange the topic exchange's name
185+
* @param topic the topic's name
186+
* @param payload the object to send
187+
* @throws IOException in case of error
188+
*/
189+
public static void convertAndSendToTopic(final Channel channel,
190+
final String exchange,
191+
final String topic,
192+
final Object payload) throws IOException {
193+
final String jsonMessage = GSON.toJson(payload);
194+
sendToTopic(channel, exchange, topic, jsonMessage);
195+
}
196+
197+
private static void sendToTopic(final Channel channel,
198+
final String exchange,
199+
final String topic,
200+
final String jsonMessage) throws IOException {
201+
LOGGER.info("Publishing message to topic {}/{}: {}", exchange, topic, jsonMessage);
202+
channel.basicPublish(exchange, topic, null, jsonMessage.getBytes(StandardCharsets.UTF_8));
203+
}
204+
}

smartclide-monitoring/src/main/java/de/atb/context/monitoring/monitors/GitMonitor.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,27 @@
1414
* #L%
1515
*/
1616

17+
import java.io.IOException;
18+
import java.util.concurrent.TimeoutException;
19+
1720
import de.atb.context.monitoring.config.models.DataSource;
1821
import de.atb.context.monitoring.config.models.Interpreter;
1922
import de.atb.context.monitoring.config.models.Monitor;
23+
import de.atb.context.monitoring.config.models.datasources.MessageBrokerDataSource;
2024
import de.atb.context.monitoring.index.Indexer;
2125
import de.atb.context.monitoring.monitors.messagebroker.MessageBrokerMonitor;
2226
import de.atb.context.tools.ontology.AmIMonitoringConfiguration;
27+
import eu.smartclide.contexthandling.dle.listener.DleGitMonitorProgressListener;
2328

2429
public class GitMonitor extends MessageBrokerMonitor {
2530
public GitMonitor(final DataSource dataSource,
2631
final Interpreter interpreter,
2732
final Monitor monitor,
2833
final Indexer indexer,
29-
final AmIMonitoringConfiguration configuration) {
34+
final AmIMonitoringConfiguration configuration) throws IOException, TimeoutException {
3035
super(dataSource, interpreter, monitor, indexer, configuration);
36+
37+
// FIXME: this is a temporary workaround and should be removed!
38+
addProgressListener(new DleGitMonitorProgressListener((MessageBrokerDataSource) dataSource));
3139
}
3240
}

smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/ServiceMain.java

+2
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ private static void startService() {
113113
public static void main(String[] args) {
114114
initialize();
115115

116+
// TODO: add DleGitMonitorProgressListener as progress listener in GitMonitor
117+
116118
if (reposService != null) {
117119
monitoringDataRepository = new AmIMonitoringDataRepositoryServiceWrapper(reposService);
118120
logger.debug(monitoringDataRepository.ping());

0 commit comments

Comments
 (0)