|
21 | 21 | import software.amazon.awssdk.crt.io.TlsContext; |
22 | 22 | import software.amazon.awssdk.crt.io.TlsContextOptions; |
23 | 23 | import software.amazon.awssdk.crt.mqtt.MqttClient; |
24 | | -import software.amazon.awssdk.crt.mqtt.MqttConnection; |
25 | | -import software.amazon.awssdk.crt.mqtt.MqttConnectionEvents; |
| 24 | +import software.amazon.awssdk.crt.mqtt.MqttClientConnection; |
| 25 | +import software.amazon.awssdk.crt.mqtt.MqttClientConnectionEvents; |
26 | 26 | import software.amazon.awssdk.crt.mqtt.MqttMessage; |
27 | 27 | import software.amazon.awssdk.crt.mqtt.QualityOfService; |
28 | 28 | import software.amazon.awssdk.iot.iotjobs.model.RejectedError; |
@@ -136,60 +136,60 @@ public static void main(String[] args) { |
136 | 136 | return; |
137 | 137 | } |
138 | 138 |
|
139 | | - try { |
140 | | - ClientBootstrap clientBootstrap = new ClientBootstrap(1); |
141 | | - TlsContextOptions tlsContextOptions = TlsContextOptions.createWithMTLS(certPath, keyPath); |
| 139 | + try(ClientBootstrap clientBootstrap = new ClientBootstrap(1); |
| 140 | + TlsContextOptions tlsContextOptions = TlsContextOptions.createWithMTLS(certPath, keyPath)) { |
142 | 141 | tlsContextOptions.overrideDefaultTrustStore(null, rootCaPath); |
143 | | - TlsContext tlsContext = new TlsContext(tlsContextOptions); |
144 | | - MqttClient client = new MqttClient(clientBootstrap, tlsContext); |
145 | | - |
146 | | - MqttConnection connection = new MqttConnection(client, new MqttConnectionEvents() { |
147 | | - @Override |
148 | | - public void onConnectionInterrupted(int errorCode) { |
149 | | - if (errorCode != 0) { |
150 | | - System.out.println("Connection interrupted: " + errorCode + ": " + CRT.awsErrorString(errorCode)); |
| 142 | + |
| 143 | + try(TlsContext tlsContext = new TlsContext(tlsContextOptions); |
| 144 | + MqttClient client = new MqttClient(clientBootstrap, tlsContext); |
| 145 | + MqttClientConnection connection = new MqttClientConnection(client, new MqttClientConnectionEvents() { |
| 146 | + @Override |
| 147 | + public void onConnectionInterrupted(int errorCode) { |
| 148 | + if (errorCode != 0) { |
| 149 | + System.out.println("Connection interrupted: " + errorCode + ": " + CRT.awsErrorString(errorCode)); |
| 150 | + } |
151 | 151 | } |
152 | | - } |
153 | 152 |
|
154 | | - @Override |
155 | | - public void onConnectionResumed(boolean sessionPresent) { |
156 | | - System.out.println("Connection resumed: " + (sessionPresent ? "existing session" : "clean session")); |
157 | | - } |
158 | | - }); |
159 | | - |
160 | | - CompletableFuture<Boolean> connected = connection.connect( |
161 | | - clientId, |
162 | | - endpoint, port, |
163 | | - null, tlsContext, true, 0, 0) |
164 | | - .exceptionally((ex) -> { |
165 | | - System.out.println("Exception occurred during connect: " + ex.toString()); |
166 | | - return null; |
| 153 | + @Override |
| 154 | + public void onConnectionResumed(boolean sessionPresent) { |
| 155 | + System.out.println("Connection resumed: " + (sessionPresent ? "existing session" : "clean session")); |
| 156 | + } |
| 157 | + })) { |
| 158 | + |
| 159 | + CompletableFuture<Boolean> connected = connection.connect( |
| 160 | + clientId, |
| 161 | + endpoint, port, |
| 162 | + null, true, 0, 0) |
| 163 | + .exceptionally((ex) -> { |
| 164 | + System.out.println("Exception occurred during connect: " + ex.toString()); |
| 165 | + return null; |
| 166 | + }); |
| 167 | + boolean sessionPresent = connected.get(); |
| 168 | + System.out.println("Connected to " + (!sessionPresent ? "new" : "existing") + " session!"); |
| 169 | + |
| 170 | + CompletableFuture<Integer> subscribed = connection.subscribe(topic, QualityOfService.AT_LEAST_ONCE, (message) -> { |
| 171 | + try { |
| 172 | + String payload = new String(message.getPayload().array(), "UTF-8"); |
| 173 | + System.out.println("MESSAGE: " + payload); |
| 174 | + } catch (UnsupportedEncodingException ex) { |
| 175 | + System.out.println("Unable to decode payload: " + ex.getMessage()); |
| 176 | + } |
167 | 177 | }); |
168 | | - boolean sessionPresent = connected.get(); |
169 | | - System.out.println("Connected to " + (!sessionPresent ? "new" : "existing") + " session!"); |
170 | | - |
171 | | - CompletableFuture<Integer> subscribed = connection.subscribe(topic, QualityOfService.AT_LEAST_ONCE, (message) -> { |
172 | | - try { |
173 | | - String payload = new String(message.getPayload().array(), "UTF-8"); |
174 | | - System.out.println("MESSAGE: " + payload); |
175 | | - } catch (UnsupportedEncodingException ex) { |
176 | | - System.out.println("Unable to decode payload: " + ex.getMessage()); |
177 | | - } |
178 | | - }); |
179 | 178 |
|
180 | | - subscribed.get(); |
| 179 | + subscribed.get(); |
181 | 180 |
|
182 | | - int count = 0; |
183 | | - while (count++ < messagesToPublish) { |
184 | | - ByteBuffer payload = ByteBuffer.allocateDirect(message.length()); |
185 | | - payload.put(message.getBytes()); |
186 | | - CompletableFuture<Integer> published = connection.publish(new MqttMessage(topic, payload), QualityOfService.AT_LEAST_ONCE, false); |
187 | | - published.get(); |
188 | | - Thread.sleep(1000); |
189 | | - } |
| 181 | + int count = 0; |
| 182 | + while (count++ < messagesToPublish) { |
| 183 | + ByteBuffer payload = ByteBuffer.allocateDirect(message.length()); |
| 184 | + payload.put(message.getBytes()); |
| 185 | + CompletableFuture<Integer> published = connection.publish(new MqttMessage(topic, payload), QualityOfService.AT_LEAST_ONCE, false); |
| 186 | + published.get(); |
| 187 | + Thread.sleep(1000); |
| 188 | + } |
190 | 189 |
|
191 | | - CompletableFuture<Void> disconnected = connection.disconnect(); |
192 | | - disconnected.get(); |
| 190 | + CompletableFuture<Void> disconnected = connection.disconnect(); |
| 191 | + disconnected.get(); |
| 192 | + } |
193 | 193 | } catch (CrtRuntimeException | InterruptedException | ExecutionException ex) { |
194 | 194 | System.out.println("Exception encountered: " + ex.toString()); |
195 | 195 | } |
|
0 commit comments