Skip to content

Commit e2ad1fc

Browse files
authored
feat: reset mqtt3 client on config change (#158)
1 parent 72d4d5c commit e2ad1fc

12 files changed

Lines changed: 241 additions & 191 deletions

File tree

src/integrationtests/java/com/aws/greengrass/integrationtests/BridgeTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ void GIVEN_mqtt3_and_mapping_between_local_and_iotcore_WHEN_iotcore_message_rece
101101

102102
@BridgeIntegrationTest(
103103
withConfig = "mqtt3_local_and_iotcore.yaml",
104-
withBrokers = {Broker.MQTT5, Broker.MQTT3})
104+
withBrokers = {Broker.MQTT5})
105105
void GIVEN_mqtt3_and_mapping_between_local_and_iotcore_WHEN_local_message_received_THEN_message_bridged_to_iotcore(BridgeIntegrationTestContext context) throws Exception {
106106
MqttMessage expectedMessage = MqttMessage.builder()
107107
.topic("topic/toIotCore")
@@ -129,7 +129,7 @@ void GIVEN_mqtt3_and_mapping_between_local_and_iotcore_WHEN_local_message_receiv
129129
.contentType("contentType")
130130
.build());
131131

132-
subscribeCallback.getLeft().get(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
132+
subscribeCallback.getLeft().get(5L, TimeUnit.SECONDS);
133133
}
134134

135135
@BridgeIntegrationTest(

src/integrationtests/java/com/aws/greengrass/integrationtests/ConfigTest.java

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package com.aws.greengrass.integrationtests;
77

88
import com.aws.greengrass.config.Topics;
9-
import com.aws.greengrass.config.UpdateBehaviorTree;
109
import com.aws.greengrass.dependency.State;
1110
import com.aws.greengrass.integrationtests.extensions.BridgeIntegrationTest;
1211
import com.aws.greengrass.integrationtests.extensions.BridgeIntegrationTestContext;
@@ -38,12 +37,10 @@
3837
import java.util.Set;
3938
import java.util.concurrent.CompletableFuture;
4039
import java.util.concurrent.CountDownLatch;
41-
import java.util.concurrent.Semaphore;
4240
import java.util.concurrent.TimeUnit;
4341
import java.util.concurrent.TimeoutException;
4442
import java.util.concurrent.atomic.AtomicInteger;
4543
import java.util.function.Consumer;
46-
import java.util.function.Supplier;
4744

4845
import static com.github.grantwest.eventually.EventuallyLambdaMatcher.eventuallyEval;
4946
import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY;
@@ -61,8 +58,6 @@
6158
public class ConfigTest {
6259
private static final long AWAIT_TIMEOUT_SECONDS = 30L;
6360
private static final long RECEIVE_PUBLISH_SECONDS = 2L;
64-
private static final Supplier<UpdateBehaviorTree> MERGE_UPDATE_BEHAVIOR =
65-
() -> new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.MERGE, System.currentTimeMillis());
6661

6762
BridgeIntegrationTestContext testContext;
6863

@@ -180,50 +175,6 @@ void GIVEN_Greengrass_with_mqtt_bridge_WHEN_connect_options_set_in_config_THEN_l
180175
() -> largeMessageHandler.getLeft().get(RECEIVE_PUBLISH_SECONDS, TimeUnit.SECONDS));
181176
}
182177

183-
@BridgeIntegrationTest(
184-
withConfig = "config.yaml",
185-
withBrokers = Broker.MQTT3)
186-
void GIVEN_Greengrass_with_mqtt_bridge_WHEN_multiple_serialized_config_changes_occur_THEN_bridge_reinstalls_multiple_times(ExtensionContext context) throws Exception {
187-
ignoreExceptionOfType(context, InterruptedException.class);
188-
189-
Semaphore bridgeRestarted = new Semaphore(1);
190-
bridgeRestarted.acquire();
191-
192-
testContext.getKernel().getContext().addGlobalStateChangeListener((GreengrassService service, State was, State newState) -> {
193-
if (service.getName().equals(MQTTBridge.SERVICE_NAME) && newState.equals(State.RUNNING)) {
194-
bridgeRestarted.release();
195-
}
196-
});
197-
198-
Topics config = testContext.getKernel().locate(MQTTBridge.SERVICE_NAME).getConfig()
199-
.lookupTopics(CONFIGURATION_CONFIG_KEY);
200-
201-
int numRestarts = 5;
202-
for (int i = 0; i < numRestarts; i++) {
203-
// change the configuration and wait for bridge to restart
204-
config.updateFromMap(Utils.immutableMap(BridgeConfig.KEY_CLIENT_ID, String.format("clientId%d", i)), MERGE_UPDATE_BEHAVIOR.get());
205-
assertTrue(bridgeRestarted.tryAcquire(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
206-
}
207-
}
208-
209-
@BridgeIntegrationTest(
210-
withConfig = "config.yaml",
211-
withBrokers = Broker.MQTT3)
212-
void GIVEN_Greengrass_with_mqtt_bridge_WHEN_clientId_config_changes_THEN_bridge_reinstalls() throws Exception {
213-
CountDownLatch bridgeRestarted = new CountDownLatch(1);
214-
testContext.getKernel().getContext().addGlobalStateChangeListener((GreengrassService service, State was, State newState) -> {
215-
if (service.getName().equals(MQTTBridge.SERVICE_NAME) && newState.equals(State.NEW)) {
216-
bridgeRestarted.countDown();
217-
}
218-
});
219-
220-
Topics config = testContext.getKernel().locate(MQTTBridge.SERVICE_NAME).getConfig()
221-
.lookupTopics(CONFIGURATION_CONFIG_KEY);
222-
config.updateFromMap(Utils.immutableMap(BridgeConfig.KEY_CLIENT_ID, "new_client_id"), MERGE_UPDATE_BEHAVIOR.get());
223-
224-
assertTrue(bridgeRestarted.await(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
225-
}
226-
227178
@BridgeIntegrationTest(
228179
withConfig = "config.yaml",
229180
withBrokers = Broker.MQTT3)

src/integrationtests/java/com/aws/greengrass/integrationtests/KeystoreTest.java

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,9 @@
99
import com.aws.greengrass.clientdevices.auth.certificate.CertificateHelper;
1010
import com.aws.greengrass.clientdevices.auth.certificate.CertificateStore;
1111
import com.aws.greengrass.config.Topic;
12-
import com.aws.greengrass.dependency.State;
1312
import com.aws.greengrass.integrationtests.extensions.BridgeIntegrationTest;
1413
import com.aws.greengrass.integrationtests.extensions.BridgeIntegrationTestContext;
1514
import com.aws.greengrass.integrationtests.extensions.Broker;
16-
import com.aws.greengrass.lifecyclemanager.GlobalStateChangeListener;
17-
import com.aws.greengrass.lifecyclemanager.GreengrassService;
18-
import com.aws.greengrass.mqtt.bridge.BridgeConfig;
1915
import com.aws.greengrass.mqtt.bridge.MQTTBridge;
2016
import com.aws.greengrass.mqtt.bridge.auth.MQTTClientKeyStore;
2117
import com.aws.greengrass.mqtt.bridge.model.MqttVersion;
@@ -41,7 +37,6 @@
4137
import java.util.function.Consumer;
4238
import java.util.stream.IntStream;
4339

44-
import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY;
4540
import static com.aws.greengrass.lifecyclemanager.GreengrassService.RUNTIME_STORE_NAMESPACE_TOPIC;
4641
import static com.aws.greengrass.lifecyclemanager.GreengrassService.SERVICES_NAMESPACE_TOPIC;
4742
import static com.aws.greengrass.testcommons.testutilities.ExceptionLogProtector.ignoreExceptionOfType;
@@ -154,22 +149,8 @@ void GIVEN_mqtt_bridge_WHEN_cda_ca_conf_changed_after_shutdown_THEN_bridge_keyst
154149
ignoreExceptionOfType(context, IllegalArgumentException.class);
155150
ignoreExceptionOfType(context, NullPointerException.class);
156151

157-
// break bridge
158-
CountDownLatch bridgeIsBroken = new CountDownLatch(1);
159-
GlobalStateChangeListener listener = (GreengrassService service, State was, State newState) -> {
160-
if (service.getName().equals(MQTTBridge.SERVICE_NAME) && service.getState().equals(State.BROKEN)) {
161-
bridgeIsBroken.countDown();
162-
}
163-
};
164-
Topic brokerUriTopic = testContext.getKernel().getConfig().lookup(
165-
SERVICES_NAMESPACE_TOPIC,
166-
MQTTBridge.SERVICE_NAME,
167-
CONFIGURATION_CONFIG_KEY,
168-
BridgeConfig.KEY_BROKER_URI
169-
);
170-
brokerUriTopic.withValue("garbage");
171-
testContext.getKernel().getContext().addGlobalStateChangeListener(listener);
172-
assertTrue(bridgeIsBroken.await(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
152+
// shutdown the bridge
153+
testContext.getFromContext(MQTTBridge.class).shutdown();
173154

174155
CountDownLatch keyStoreUpdated = new CountDownLatch(1);
175156
MQTTClientKeyStore keyStore = testContext.getKernel().getContext().get(MQTTClientKeyStore.class);
@@ -191,8 +172,8 @@ void GIVEN_mqtt_bridge_WHEN_cda_ca_conf_changed_after_shutdown_THEN_bridge_keyst
191172
Date.from(Instant.now().plusSeconds(100)),
192173
"CA"))));
193174

194-
// shouldn't update
195-
assertFalse(keyStoreUpdated.await(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
175+
testContext.getKernel().getContext().waitForPublishQueueToClear();
176+
assertFalse(keyStoreUpdated.await(5L, TimeUnit.SECONDS));
196177
}
197178

198179
private CompletableFuture<Void> asyncAssertNumConnects(Integer numConnects) throws InterruptedException {

src/main/java/com/aws/greengrass/mqtt/bridge/MQTTBridge.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717
import com.aws.greengrass.lifecyclemanager.PluginService;
1818
import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException;
1919
import com.aws.greengrass.mqtt.bridge.auth.MQTTClientKeyStore;
20+
import com.aws.greengrass.mqtt.bridge.clients.Configurable;
2021
import com.aws.greengrass.mqtt.bridge.clients.IoTCoreClient;
21-
import com.aws.greengrass.mqtt.bridge.clients.LocalMqtt5Client;
2222
import com.aws.greengrass.mqtt.bridge.clients.LocalMqttClientFactory;
23-
import com.aws.greengrass.mqtt.bridge.clients.MQTTClient;
2423
import com.aws.greengrass.mqtt.bridge.clients.MessageClient;
2524
import com.aws.greengrass.mqtt.bridge.clients.MessageClientException;
2625
import com.aws.greengrass.mqtt.bridge.clients.PubSubClient;
@@ -295,17 +294,14 @@ public class ConfigurationChangeHandler {
295294
return;
296295
}
297296

298-
// TODO same for MQTT3 client
299-
if (localMqttClient instanceof LocalMqtt5Client) {
300-
((LocalMqtt5Client) localMqttClient).applyConfig(LocalMqtt5Client.Config.fromBridgeConfig(newConfig));
297+
if (localMqttClient instanceof Configurable) {
298+
((Configurable) localMqttClient).applyConfig(newConfig);
301299
}
302300
});
303301

304302
private boolean reinstallRequired(BridgeConfig prevConfig, BridgeConfig newConfig) {
305-
return !Objects.equals(prevConfig.getMqttVersion(), newConfig.getMqttVersion()) // to switch between clients
306-
|| localMqttClient instanceof MQTTClient
307-
&& (!Objects.equals(prevConfig.getBrokerUri(), newConfig.getBrokerUri())
308-
|| !Objects.equals(prevConfig.getClientId(), newConfig.getClientId()));
303+
// to switch between clients
304+
return !Objects.equals(prevConfig.getMqttVersion(), newConfig.getMqttVersion());
309305
}
310306

311307
/**
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package com.aws.greengrass.mqtt.bridge.clients;
7+
8+
import com.aws.greengrass.mqtt.bridge.BridgeConfig;
9+
10+
@FunctionalInterface
11+
public interface Configurable {
12+
void applyConfig(BridgeConfig config);
13+
}

src/main/java/com/aws/greengrass/mqtt/bridge/clients/LocalMqtt5Client.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
import static com.aws.greengrass.mqtt.bridge.model.Mqtt5RouteOptions.DEFAULT_NO_LOCAL;
6969

7070
@SuppressWarnings("PMD.CloseResource")
71-
public class LocalMqtt5Client implements MessageClient<MqttMessage> {
71+
public class LocalMqtt5Client implements MessageClient<MqttMessage>, Configurable {
7272

7373
private static final Logger LOGGER = LogManager.getLogger(LocalMqtt5Client.class);
7474

@@ -334,11 +334,16 @@ public static boolean resetRequired(Config prevConfig, Config newConfig) {
334334
* Apply new configuration to this client. Depending on what configurations changed, a
335335
* {@link LocalMqtt5Client#reset()} may occur to apply them.
336336
*
337-
* @param config new configuration
337+
* @param bridgeConfig new bridge configuration
338338
*/
339-
public void applyConfig(@NonNull Config config) {
339+
@Override
340+
public void applyConfig(@NonNull BridgeConfig bridgeConfig) {
341+
applyConfig(Config.fromBridgeConfig(bridgeConfig));
342+
}
343+
344+
void applyConfig(Config newConfig) {
340345
Config previousConfig = this.config;
341-
this.config = config;
346+
this.config = newConfig;
342347
if (Config.resetRequired(previousConfig, config)) {
343348
scheduleResetTask();
344349
}

src/main/java/com/aws/greengrass/mqtt/bridge/clients/LocalMqttClientFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ public MessageClient<MqttMessage> createLocalMqttClient() throws MessageClientEx
6363
case MQTT3: // fall-through
6464
default:
6565
return new MQTTClient(
66-
config.getBrokerUri(),
67-
config.getClientId(),
66+
config,
6867
mqttClientKeyStore,
6968
executorService
7069
);

0 commit comments

Comments
 (0)