Skip to content

Commit 95a6b99

Browse files
authored
fix: ensure connect loop stops on client close (#154)
1 parent 522e7e0 commit 95a6b99

3 files changed

Lines changed: 94 additions & 49 deletions

File tree

src/integrationtests/java/com/aws/greengrass/integrationtests/ConfigTest.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.aws.greengrass.mqtt.bridge.BridgeConfig;
2121
import com.aws.greengrass.mqtt.bridge.MQTTBridge;
2222
import com.aws.greengrass.mqtt.bridge.TopicMapping;
23+
import com.aws.greengrass.mqtt.bridge.clients.MQTTClient;
2324
import com.aws.greengrass.mqtt.bridge.model.BridgeConfigReference;
2425
import com.aws.greengrass.mqtt.bridge.model.Mqtt5RouteOptions;
2526
import com.aws.greengrass.mqtt.bridge.model.MqttMessage;
@@ -59,6 +60,7 @@
5960
import static org.junit.jupiter.api.Assertions.assertFalse;
6061
import static org.junit.jupiter.api.Assertions.assertThrows;
6162
import static org.junit.jupiter.api.Assertions.assertTrue;
63+
import static org.junit.jupiter.api.Assertions.fail;
6264

6365
@BridgeIntegrationTest
6466
public class ConfigTest {
@@ -187,7 +189,7 @@ void GIVEN_Greengrass_with_mqtt_bridge_WHEN_connect_options_set_in_config_THEN_l
187189

188190
Topics routeOptions =
189191
testContext.getKernel().locate(MQTTBridge.SERVICE_NAME).getConfig().lookupTopics(
190-
CONFIGURATION_CONFIG_KEY, BridgeConfig.KEY_MQTT_5_ROUTE_OPTIONS)
192+
CONFIGURATION_CONFIG_KEY, BridgeConfig.KEY_MQTT_5_ROUTE_OPTIONS)
191193
.lookupTopics("toIotCore");
192194
Topic retainAsPublished = routeOptions.lookup("retainAsPublished");
193195
assertTrue((boolean) retainAsPublished.getOnce());
@@ -201,7 +203,7 @@ void GIVEN_Greengrass_with_mqtt_bridge_WHEN_connect_options_set_in_config_THEN_l
201203
});
202204

203205
testContext.getKernel().locate(MQTTBridge.SERVICE_NAME).getConfig().lookupTopics(CONFIGURATION_CONFIG_KEY,
204-
BridgeConfig.KEY_MQTT_5_ROUTE_OPTIONS).lookupTopics("toIotCore").lookup("retainAsPublished")
206+
BridgeConfig.KEY_MQTT_5_ROUTE_OPTIONS).lookupTopics("toIotCore").lookup("retainAsPublished")
205207
.withValue(false);
206208
testContext.getKernel().getContext().waitForPublishQueueToClear();
207209
retainAsPublished = routeOptions.lookup("retainAsPublished");
@@ -386,4 +388,39 @@ void GIVEN_Greengrass_with_mqtt_bridge_WHEN_invalid_mqttTopicMapping_updated_THE
386388

387389
assertTrue(bridgeErrored.await(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
388390
}
391+
392+
@SuppressWarnings("PMD.AvoidCatchingGenericException")
393+
@TestWithMqtt5Broker
394+
@WithKernel("config.yaml")
395+
void GIVEN_bridge_WHEN_mqtt_version_toggled_THEN_clients_switched(Broker broker, ExtensionContext context) throws Exception {
396+
MQTTClient v3Client = testContext.getLocalV3Client();
397+
assertTrue(v3Client.getMqttClientInternal().isConnected());
398+
399+
// switch mqtt version from mqtt3 to mqtt5
400+
testContext.getKernel().locate(MQTTBridge.SERVICE_NAME)
401+
.getConfig()
402+
.lookupTopics(CONFIGURATION_CONFIG_KEY)
403+
.lookup(BridgeConfig.KEY_MQTT, BridgeConfig.KEY_VERSION)
404+
.withValue("mqtt5");
405+
406+
// wait for clients to switch
407+
assertThat("mqtt5 client active", () -> {
408+
try {
409+
testContext.getLocalV5Client();
410+
return true;
411+
} catch (Exception e) {
412+
return false;
413+
}
414+
}, eventuallyEval(is(true)));
415+
416+
assertFalse(v3Client.getMqttClientInternal().isConnected());
417+
try {
418+
v3Client.getMqttClientInternal().connect();
419+
fail("v3 client expected to be closed");
420+
} catch (MqttException e) {
421+
assertEquals(MqttException.REASON_CODE_CLIENT_CLOSED, e.getReasonCode());
422+
}
423+
424+
assertThat("mqtt5 client connected", () -> testContext.getLocalV5Client().getClient().getIsConnected(), eventuallyEval(is(true)));
425+
}
389426
}

src/main/java/com/aws/greengrass/mqtt/bridge/BridgeConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ public final class BridgeConfig {
5757
public static final String KEY_MAXIMUM_PACKET_SIZE = "maximumPacketSize";
5858
public static final String KEY_SESSION_EXPIRY_INTERVAL = "sessionExpiryInterval";
5959
public static final String KEY_MQTT_5_ROUTE_OPTIONS = "mqtt5RouteOptions";
60-
static final String KEY_MQTT = "mqtt";
61-
static final String KEY_VERSION = "version";
60+
public static final String KEY_MQTT = "mqtt";
61+
public static final String KEY_VERSION = "version";
6262

6363

6464
private static final long MIN_TIMEOUT = 0L;

src/main/java/com/aws/greengrass/mqtt/bridge/clients/MQTTClient.java

Lines changed: 53 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,7 @@ protected MQTTClient(@NonNull URI brokerUri, @NonNull String clientId, MQTTClien
144144
}
145145

146146
void reset() {
147-
if (mqttClientInternal.isConnected()) {
148-
try {
149-
mqttClientInternal.disconnect();
150-
} catch (MqttException e) {
151-
LOGGER.atError().setCause(e).log("Failed to disconnect MQTT client");
152-
return;
153-
}
154-
}
147+
disconnect(30_000L); // paho default
155148
connectAndSubscribe();
156149
}
157150

@@ -169,6 +162,32 @@ public void start() throws MessageClientException {
169162
connectAndSubscribe();
170163
}
171164

165+
private void disconnect() {
166+
// 0ms quiescence time, just send the disconnect packet immediately
167+
disconnect(0);
168+
}
169+
170+
@SuppressWarnings("PMD.CloseResource")
171+
private void disconnect(long quiesceTimeout) {
172+
IMqttClient client = mqttClientInternal;
173+
if (client == null) {
174+
return;
175+
}
176+
try {
177+
LOGGER.debug("Disconnecting MQTT client");
178+
client.disconnect(quiesceTimeout);
179+
} catch (MqttException e) {
180+
if (MqttException.REASON_CODE_CLIENT_ALREADY_DISCONNECTED != e.getReasonCode()
181+
&& MqttException.REASON_CODE_CLIENT_CLOSED != e.getReasonCode()) {
182+
LOGGER.atError().setCause(e).log("Failed to disconnect MQTT client");
183+
return;
184+
}
185+
}
186+
// no need to unsubscribe because we connect with cleanSession=true
187+
subscribedLocalMqttTopics.clear();
188+
LOGGER.debug("MQTT client disconnected");
189+
}
190+
172191
/**
173192
* Stop the {@link MQTTClient}.
174193
*/
@@ -177,29 +196,15 @@ public void start() throws MessageClientException {
177196
public void stop() {
178197
mqttClientKeyStore.unsubscribeFromUpdates(onKeyStoreUpdate);
179198
cancelConnectTask();
180-
181-
IMqttClient client = mqttClientInternal;
182-
try {
183-
if (client != null && client.isConnected()) {
184-
LOGGER.debug("Disconnecting MQTT client");
185-
// 0ms quiescence time, just send the disconnect packet immediately
186-
client.disconnect(0);
187-
LOGGER.debug("MQTT client disconnected");
188-
}
189-
} catch (MqttException e) {
190-
LOGGER.atError().setCause(e).log("Failed to disconnect MQTT client");
191-
} finally {
192-
// no need to unsubscribe because we connect with cleanSession=true
193-
subscribedLocalMqttTopics.clear();
194-
}
195-
199+
disconnect();
196200
try {
197201
dataStore.close();
198202
} catch (MqttPersistenceException e) {
199203
LOGGER.atDebug().setCause(e).log("Unable to close mqtt client datastore");
200204
}
201205

202206
try {
207+
IMqttClient client = mqttClientInternal;
203208
if (client != null) {
204209
client.close();
205210
}
@@ -297,44 +302,47 @@ private void cancelConnectTask() {
297302
}
298303
}
299304

300-
private synchronized void doConnect() throws MqttException, KeyStoreException {
301-
if (!mqttClientInternal.isConnected()) {
302-
mqttClientInternal.connect(getConnectionOptions());
303-
LOGGER.atInfo()
304-
.kv(BridgeConfig.KEY_BROKER_URI, brokerUri)
305-
.kv(BridgeConfig.KEY_CLIENT_ID, clientId)
306-
.log("Connected to broker");
307-
}
308-
}
309-
310305
private void reconnectAndResubscribe() {
311306
int waitBeforeRetry = MIN_WAIT_RETRY_IN_SECONDS;
312307

313308
while (!mqttClientInternal.isConnected() && !Thread.currentThread().isInterrupted()) {
309+
Exception error;
314310
try {
315311
// TODO: Clean up this loop
316-
doConnect();
317-
} catch (MqttException | KeyStoreException e) {
312+
mqttClientInternal.connect(getConnectionOptions());
313+
break;
314+
} catch (MqttException e) {
318315
if (Utils.getUltimateCause(e) instanceof InterruptedException) {
319316
// paho doesn't reset the interrupt flag
320317
LOGGER.atDebug().log("Interrupted during reconnect");
321318
Thread.currentThread().interrupt();
322319
return;
323320
}
324-
325-
LOGGER.atDebug().setCause(e)
326-
.log("Unable to connect. Will be retried after {} seconds", waitBeforeRetry);
327-
try {
328-
Thread.sleep(waitBeforeRetry * 1000);
329-
} catch (InterruptedException er) {
330-
Thread.currentThread().interrupt();
331-
LOGGER.atDebug().log("Interrupted during reconnect");
321+
if (MqttException.REASON_CODE_CLIENT_CLOSED == e.getReasonCode()) {
332322
return;
333323
}
334-
waitBeforeRetry = Math.min(2 * waitBeforeRetry, MAX_WAIT_RETRY_IN_SECONDS);
324+
error = e;
325+
} catch (KeyStoreException e) {
326+
error = e;
335327
}
328+
329+
LOGGER.atDebug().setCause(error)
330+
.log("Unable to connect. Will be retried after {} seconds", waitBeforeRetry);
331+
try {
332+
Thread.sleep(waitBeforeRetry * 1000);
333+
} catch (InterruptedException er) {
334+
Thread.currentThread().interrupt();
335+
LOGGER.atDebug().log("Interrupted during reconnect");
336+
return;
337+
}
338+
waitBeforeRetry = Math.min(2 * waitBeforeRetry, MAX_WAIT_RETRY_IN_SECONDS);
336339
}
337340

341+
LOGGER.atInfo()
342+
.kv(BridgeConfig.KEY_BROKER_URI, brokerUri)
343+
.kv(BridgeConfig.KEY_CLIENT_ID, clientId)
344+
.log("Connected to broker");
345+
338346
resubscribe();
339347
}
340348

0 commit comments

Comments
 (0)