@@ -167,18 +167,16 @@ public static void main(String[] args) {
167167 new SocketOptions (), region , 1 , proxyOptions );
168168 final DiscoveryClient discoveryClient = new DiscoveryClient (discoveryClientConfig );
169169 final MqttClientConnection connection = getClientFromDiscovery (discoveryClient , clientBootstrap )) {
170- if (connection .connect ().get ()) {
171- System .out .println ("Session resumed" );
172- } else {
173- System .out .println ("Started a clean session" );
174- }
175170
176171 if ("subscribe" .equals (mode ) || "both" .equals (mode )) {
177172 final CompletableFuture <Integer > subFuture = connection .subscribe (topic , QualityOfService .AT_MOST_ONCE , message -> {
178173 System .out .println (String .format ("Message received on topic %s: %s" ,
179174 message .getTopic (), new String (message .getPayload (), StandardCharsets .UTF_8 )));
180175 });
176+
177+ subFuture .get ();
181178 }
179+
182180 final Scanner scanner = new Scanner (System .in );
183181 while (true ) {
184182 String input = null ;
@@ -208,8 +206,6 @@ public static void main(String[] args) {
208206 System .out .println ("Complete!" );
209207 }
210208
211- private static Pattern PATTERN_IS_PRIVATE_IP = Pattern .compile ("/(^127\\ .)|(^192\\ .168\\ .)|(^10\\ .)|(^172\\ .1[6-9]\\ .)|(^172\\ .2[0-9]\\ .)|(^172\\ .3[0-1]\\ .)|(^::1$)|(^[fF][cCdD])/" );
212-
213209 private static MqttClientConnection getClientFromDiscovery (final DiscoveryClient discoveryClient ,
214210 final ClientBootstrap bootstrap ) throws ExecutionException , InterruptedException {
215211 final CompletableFuture <DiscoverResponse > futureResponse = discoveryClient .discover (thingName );
@@ -219,49 +215,51 @@ private static MqttClientConnection getClientFromDiscovery(final DiscoveryClient
219215 if (groupOpt .isPresent ()) {
220216 final GGGroup group = groupOpt .get ();
221217 final GGCore core = group .getCores ().stream ().findFirst ().get ();
222- final SortedSet <ConnectivityInfo > prioritizedConnectivity = new TreeSet (new Comparator <ConnectivityInfo >() {
223- @ Override
224- public int compare (ConnectivityInfo lhs , ConnectivityInfo rhs ) {
225- return ordinalValue (lhs ) - ordinalValue (rhs );
218+
219+ for (ConnectivityInfo connInfo : core .getConnectivity ()) {
220+ final String dnsOrIp = connInfo .getHostAddress ();
221+ final Integer port = connInfo .getPortNumber ();
222+
223+ System .out .println (String .format ("Connecting to group ID %s, with thing arn %s, using endpoint %s:%d" ,
224+ group .getGGGroupId (), core .getThingArn (), dnsOrIp , port ));
225+
226+ final AwsIotMqttConnectionBuilder connectionBuilder = AwsIotMqttConnectionBuilder .newMtlsBuilderFromPath (certPath , keyPath )
227+ .withClientId (thingName )
228+ .withPort (port .shortValue ())
229+ .withEndpoint (dnsOrIp )
230+ .withBootstrap (bootstrap )
231+ .withConnectionEventCallbacks (new MqttClientConnectionEvents () {
232+ @ Override
233+ public void onConnectionInterrupted (int errorCode ) {
234+ System .out .println ("Connection interrupted: " + errorCode );
235+ }
236+
237+ @ Override
238+ public void onConnectionResumed (boolean sessionPresent ) {
239+ System .out .println ("Connection resumed!" );
240+ }
241+ });
242+ if (group .getCAs () != null ) {
243+ connectionBuilder .withCertificateAuthority (group .getCAs ().get (0 ));
226244 }
227- private int ordinalValue (ConnectivityInfo info ) {
228- if (info .getHostAddress ().equals ("127.0.0.1" ) || info .getHostAddress ().equals ("::1" )) {
229- return 0 ;
230- }
231- if (PATTERN_IS_PRIVATE_IP .matcher (info .getHostAddress ()).matches ()) {
232- return 1 ;
233- }
234- if (info .getHostAddress ().startsWith ("AUTOIP_" )) {
235- return 10 ;
245+
246+ try (MqttClientConnection connection = connectionBuilder .build ()) {
247+ if (connection .connect ().get ()) {
248+ System .out .println ("Session resumed" );
249+ } else {
250+ System .out .println ("Started a clean session" );
236251 }
237- return 2 ;
238- }
239- });
240- prioritizedConnectivity .addAll (core .getConnectivity ());
241- final ConnectivityInfo selectedConnectivity = prioritizedConnectivity .first ();
242- final String dnsOrIp = selectedConnectivity .getHostAddress ();
243- final Integer port = selectedConnectivity .getPortNumber ();
244252
245- System . out . println ( String . format ( "Connecting to group ID %s, with thing arn %s, using endpoint %s:%d" ,
246- group . getGGGroupId (), core . getThingArn (), dnsOrIp , port ) );
253+ /* This lets the connection escape the try block without getting cleaned up */
254+ connection . addRef ( );
247255
248- final AwsIotMqttConnectionBuilder connectionBuilder = AwsIotMqttConnectionBuilder .newMtlsBuilderFromPath (certPath , keyPath )
249- .withClientId ("test-" + UUID .randomUUID ().toString ())
250- .withPort (port .shortValue ())
251- .withEndpoint (dnsOrIp )
252- .withBootstrap (bootstrap )
253- .withConnectionEventCallbacks (new MqttClientConnectionEvents () {
254- @ Override
255- public void onConnectionInterrupted (int errorCode ) { System .out .println ("Connection interrupted: " + errorCode ); }
256- @ Override
257- public void onConnectionResumed (boolean sessionPresent ) {
258- System .out .println ("Connection resumed!" );
259- }
260- });
261- if (group .getCAs () != null ) {
262- connectionBuilder .withCertificateAuthority (group .getCAs ().get (0 ));
256+ return connection ;
257+ } catch (Exception e ) {
258+ System .out .println (String .format ("Connection failed with exception %s" , e .toString ()));
259+ }
263260 }
264- return connectionBuilder .build ();
261+
262+ throw new RuntimeException ("ThingName " + thingName + " could not connect to the green grass core using any of the endpoint connectivity options" );
265263 }
266264 }
267265 throw new RuntimeException ("ThingName " + thingName + " does not have a Greengrass group/core configuration" );
0 commit comments