Skip to content

Commit 607c03e

Browse files
jcosentino11MikeDombo
authored andcommitted
fix: retry IotCoreClient subscriptions when greengrass starts offline (#75)
Allows subscriptions in IotCoreClient to be made eventually if greengrass starts offline.
1 parent c7cafd1 commit 607c03e

2 files changed

Lines changed: 33 additions & 17 deletions

File tree

src/main/java/com/aws/greengrass/mqtt/bridge/clients/IoTCoreClient.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,26 +44,26 @@ public class IoTCoreClient implements MessageClient {
4444
private Set<String> subscribedIotCoreTopics = ConcurrentHashMap.newKeySet();
4545
@Getter(AccessLevel.PROTECTED)
4646
private Set<String> toSubscribeIotCoreTopics = new HashSet<>();
47-
48-
private Consumer<Message> messageHandler;
47+
private volatile Consumer<Message> messageHandler;
4948
private Future<?> subscribeFuture;
5049
private final Object subscribeLock = new Object();
51-
5250
private final MqttClient iotMqttClient;
5351
private final ExecutorService executorService;
5452

5553
private final Consumer<MqttMessage> iotCoreCallback = (message) -> {
5654
String topic = message.getTopic();
5755
LOGGER.atTrace().kv(TOPIC, topic).log("Received IoT Core message");
5856

59-
if (messageHandler == null) {
57+
Consumer<Message> handler = messageHandler;
58+
if (handler == null) {
6059
LOGGER.atWarn().kv(TOPIC, topic).log("IoT Core message received but message handler not set");
6160
} else {
6261
Message msg = new Message(topic, message.getPayload());
63-
messageHandler.accept(msg);
62+
handler.accept(msg);
6463
}
6564
};
6665

66+
@Getter(AccessLevel.PACKAGE) // for unit testing
6767
private final MqttClientConnectionEvents connectionCallbacks = new MqttClientConnectionEvents() {
6868
@Override
6969
public void onConnectionInterrupted(int errorCode) {
@@ -93,7 +93,8 @@ public void onConnectionResumed(boolean sessionPresent) {
9393
public IoTCoreClient(MqttClient iotMqttClient, ExecutorService executorService) {
9494
this.iotMqttClient = iotMqttClient;
9595
this.executorService = executorService;
96-
iotMqttClient.addToCallbackEvents(connectionCallbacks);
96+
// onConnect handler required to handle case when bridge starts offline
97+
iotMqttClient.addToCallbackEvents(connectionCallbacks::onConnectionResumed, connectionCallbacks);
9798
}
9899

99100
/**
@@ -184,15 +185,16 @@ public boolean supportsTopicFilters() {
184185

185186
@SuppressWarnings({"PMD.AvoidCatchingGenericException", "PMD.PreserveStackTrace", "PMD.ExceptionAsFlowControl"})
186187
private void subscribeToTopicsWithRetry(Set<String> topics) {
187-
// retry only if client is connected; skip if offline.
188-
// topics left here should be subscribed when the client is back online (onConnectionResumed event)
189-
topics.forEach(s -> {
188+
for (String topic : topics) {
190189
try {
191190
RetryUtils.runWithRetry(subscribeRetryConfig, () -> {
192191
try {
192+
// retry only if client is connected; skip if offline.
193+
// topics left here should be subscribed when the client
194+
// is back online (onConnectionResumed event)
193195
if (iotMqttClient.connected()) {
194-
subscribeToIotCore(s);
195-
subscribedIotCoreTopics.add(s);
196+
subscribeToIotCore(topic);
197+
subscribedIotCoreTopics.add(topic);
196198
}
197199
// useless return
198200
return null;
@@ -207,10 +209,11 @@ private void subscribeToTopicsWithRetry(Set<String> topics) {
207209
}, "subscribe-iotcore-topic", LOGGER);
208210
} catch (InterruptedException e) {
209211
Thread.currentThread().interrupt();
212+
return;
210213
} catch (Exception e) {
211-
LOGGER.atError().kv(TOPIC, s).setCause(e).log("Failed to subscribe to IoTCore topic");
214+
LOGGER.atError().kv(TOPIC, topic).setCause(e).log("Failed to subscribe to IoTCore topic");
212215
}
213-
});
216+
}
214217
}
215218

216219
private void subscribeToIotCore(String topic) throws InterruptedException, ExecutionException, TimeoutException {

src/test/java/com/aws/greengrass/mqtt/bridge/clients/IoTCoreClientTest.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
import java.util.stream.Collectors;
3030

3131
import static org.hamcrest.MatcherAssert.assertThat;
32-
import static org.mockito.ArgumentMatchers.any;
3332
import static org.hamcrest.Matchers.is;
3433
import static org.junit.jupiter.api.Assertions.assertThrows;
34+
import static org.mockito.ArgumentMatchers.any;
3535
import static org.mockito.Mockito.lenient;
3636
import static org.mockito.Mockito.never;
3737
import static org.mockito.Mockito.reset;
@@ -79,19 +79,32 @@ void GIVEN_iotcore_client_started_WHEN_update_subscriptions_THEN_topics_subscrib
7979
}
8080

8181
@Test
82-
void GIVEN_disconnected_iotcore_client_WHEN_update_subscriptions_THEN_topics_not_subscribed() throws Exception {
82+
void GIVEN_offline_iotcore_client_WHEN_update_subscriptions_THEN_subscribe_once_online() throws Exception {
8383
reset(mockIotMqttClient);
84-
when(mockIotMqttClient.connected()).thenReturn(false);
84+
8585
IoTCoreClient iotCoreClient = new IoTCoreClient(mockIotMqttClient, executorService);
86+
8687
Set<String> topics = new HashSet<>();
8788
topics.add("iotcore/topic");
8889
topics.add("iotcore/topic2");
90+
91+
// attempt to update subscriptions, this is expected to fail since bridge is offline
92+
when(mockIotMqttClient.connected()).thenReturn(false);
8993
iotCoreClient.updateSubscriptions(topics, message -> {
9094
});
9195

96+
// verify no subscriptions were made
9297
verify(mockIotMqttClient, never()).subscribe(any(SubscribeRequest.class));
9398
assertThat(iotCoreClient.getSubscribedIotCoreTopics().size(), is(0));
94-
// topics left to be subscribed
99+
assertThat(iotCoreClient.getToSubscribeIotCoreTopics(),
100+
Matchers.containsInAnyOrder("iotcore/topic", "iotcore/topic2"));
101+
102+
// simulate mqtt connection resume
103+
when(mockIotMqttClient.connected()).thenReturn(true);
104+
iotCoreClient.getConnectionCallbacks().onConnectionResumed(false);
105+
106+
// verify subscriptions were made
107+
assertThat(iotCoreClient.getSubscribedIotCoreTopics().size(), is(2));
95108
assertThat(iotCoreClient.getToSubscribeIotCoreTopics(),
96109
Matchers.containsInAnyOrder("iotcore/topic", "iotcore/topic2"));
97110
}

0 commit comments

Comments
 (0)