|
| 1 | +/* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. |
| 2 | + * |
| 3 | + * Licensed under the Apache License, Version 2.0 (the "License"). |
| 4 | + * You may not use this file except in compliance with the License. |
| 5 | + * A copy of the License is located at |
| 6 | + * |
| 7 | + * http://aws.amazon.com/apache2.0 |
| 8 | + * |
| 9 | + * or in the "license" file accompanying this file. This file is distributed |
| 10 | + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either |
| 11 | + * express or implied. See the License for the specific language governing |
| 12 | + * permissions and limitations under the License. |
| 13 | + */ |
| 14 | + |
| 15 | +package greengrass; |
| 16 | + |
| 17 | +import software.amazon.awssdk.crt.CRT; |
| 18 | +import software.amazon.awssdk.crt.CrtResource; |
| 19 | +import software.amazon.awssdk.crt.CrtRuntimeException; |
| 20 | +import software.amazon.awssdk.crt.Log; |
| 21 | +import software.amazon.awssdk.crt.http.HttpProxyOptions; |
| 22 | +import software.amazon.awssdk.crt.io.*; |
| 23 | +import software.amazon.awssdk.crt.mqtt.*; |
| 24 | +import software.amazon.awssdk.iot.AwsIotMqttConnectionBuilder; |
| 25 | +import software.amazon.awssdk.iot.discovery.DiscoveryClient; |
| 26 | +import software.amazon.awssdk.iot.discovery.DiscoveryClientConfig; |
| 27 | +import software.amazon.awssdk.iot.discovery.model.ConnectivityInfo; |
| 28 | +import software.amazon.awssdk.iot.discovery.model.DiscoverResponse; |
| 29 | +import software.amazon.awssdk.iot.discovery.model.GGCore; |
| 30 | +import software.amazon.awssdk.iot.discovery.model.GGGroup; |
| 31 | + |
| 32 | +import java.io.File; |
| 33 | +import java.nio.charset.StandardCharsets; |
| 34 | +import java.util.*; |
| 35 | +import java.util.concurrent.CompletableFuture; |
| 36 | +import java.util.concurrent.ExecutionException; |
| 37 | +import java.util.regex.Pattern; |
| 38 | + |
| 39 | +import static software.amazon.awssdk.iot.discovery.DiscoveryClient.TLS_EXT_ALPN; |
| 40 | + |
| 41 | +class BasicDiscovery { |
| 42 | + static String thingName; |
| 43 | + static String rootCaPath; |
| 44 | + static String certPath; |
| 45 | + static String keyPath; |
| 46 | + static String region = "us-east-1"; |
| 47 | + static String topic = "/samples/test"; |
| 48 | + static String mode = "both"; |
| 49 | + static boolean showHelp = false; |
| 50 | + |
| 51 | + static String proxyHost; |
| 52 | + static int proxyPort; |
| 53 | + |
| 54 | + static void printUsage() { |
| 55 | + System.out.println( |
| 56 | + "Usage:\n"+ |
| 57 | + " --help This message\n" + |
| 58 | + " --thing_name Thing name to use\n" + |
| 59 | + " -r|--region AWS IoT service region\n" + |
| 60 | + " -a|--rootca Path to the root certificate\n" + |
| 61 | + " -c|--cert Path to the IoT thing certificate\n" + |
| 62 | + " -k|--key Path to the IoT thing private key\n" + |
| 63 | + " -t|--topic Topic to subscribe/publish to (optional)\n" + |
| 64 | + " -m|--mode Message to publish (optional)\n" + |
| 65 | + " --proxyhost Websocket proxy host to use\n" + |
| 66 | + " --proxyport Websocket proxy port to use\n"); |
| 67 | + } |
| 68 | + |
| 69 | + static void parseCommandLine(String[] args) { |
| 70 | + for (int idx = 0; idx < args.length; ++idx) { |
| 71 | + switch (args[idx]) { |
| 72 | + case "--help": |
| 73 | + showHelp = true; |
| 74 | + break; |
| 75 | + case "--thingName": |
| 76 | + if (idx + 1 < args.length) { |
| 77 | + thingName = args[++idx]; |
| 78 | + } |
| 79 | + break; |
| 80 | + case "-r": |
| 81 | + case "--region": |
| 82 | + if (idx + 1 < args.length) { |
| 83 | + region = args[++idx]; |
| 84 | + } |
| 85 | + break; |
| 86 | + case "-a": |
| 87 | + case "--rootca": |
| 88 | + if (idx + 1 < args.length) { |
| 89 | + rootCaPath = args[++idx]; |
| 90 | + final File rootCaFile = new File(rootCaPath); |
| 91 | + if (!rootCaFile.isFile()) { |
| 92 | + throw new RuntimeException("Cannot load root CA from path: " + rootCaFile.getAbsolutePath()); |
| 93 | + } |
| 94 | + rootCaPath = rootCaFile.getAbsolutePath(); |
| 95 | + } |
| 96 | + break; |
| 97 | + case "-c": |
| 98 | + case "--cert": |
| 99 | + if (idx + 1 < args.length) { |
| 100 | + certPath = args[++idx]; |
| 101 | + final File certFile = new File(certPath); |
| 102 | + if (!certFile.isFile()) { |
| 103 | + throw new RuntimeException("Cannot load certificate from path: " + certFile.getAbsolutePath()); |
| 104 | + } |
| 105 | + certPath = certFile.getAbsolutePath(); |
| 106 | + } |
| 107 | + break; |
| 108 | + case "-k": |
| 109 | + case "--key": |
| 110 | + if (idx + 1 < args.length) { |
| 111 | + keyPath = args[++idx]; |
| 112 | + final File keyFile = new File(keyPath); |
| 113 | + if (!keyFile.isFile()) { |
| 114 | + throw new RuntimeException("Cannot load private key from path: " + keyFile.getAbsolutePath()); |
| 115 | + } |
| 116 | + keyPath = keyFile.getAbsolutePath(); |
| 117 | + } |
| 118 | + break; |
| 119 | + case "-t": |
| 120 | + case "--topic": |
| 121 | + if (idx + 1 < args.length) { |
| 122 | + topic = args[++idx]; |
| 123 | + } |
| 124 | + break; |
| 125 | + case "-m": |
| 126 | + case "--mode": |
| 127 | + if (idx + 1 < args.length) { |
| 128 | + mode = args[++idx]; |
| 129 | + } |
| 130 | + break; |
| 131 | + case "--proxyhost": |
| 132 | + if (idx + 1 < args.length) { |
| 133 | + proxyHost = args[++idx]; |
| 134 | + } |
| 135 | + break; |
| 136 | + case "--proxyport": |
| 137 | + if (idx + 1 < args.length) { |
| 138 | + proxyPort = Integer.parseInt(args[++idx]); |
| 139 | + } |
| 140 | + break; |
| 141 | + default: |
| 142 | + System.out.println("Unrecognized argument: " + args[idx]); |
| 143 | + } |
| 144 | + } |
| 145 | + } |
| 146 | + |
| 147 | + public static void main(String[] args) { |
| 148 | + Log.initLoggingFromSystemProperties(); |
| 149 | + |
| 150 | + parseCommandLine(args); |
| 151 | + if (showHelp || thingName == null || |
| 152 | + certPath == null || keyPath == null) { |
| 153 | + printUsage(); |
| 154 | + return; |
| 155 | + } |
| 156 | + |
| 157 | + try(final EventLoopGroup eventLoopGroup = new EventLoopGroup(1); |
| 158 | + final HostResolver resolver = new HostResolver(eventLoopGroup); |
| 159 | + final ClientBootstrap clientBootstrap = new ClientBootstrap(eventLoopGroup, resolver); |
| 160 | + final TlsContextOptions tlsCtxOptions = TlsContextOptions.createWithMtlsFromPath(certPath, keyPath)) { |
| 161 | + if(TlsContextOptions.isAlpnSupported()) { |
| 162 | + tlsCtxOptions.withAlpnList(TLS_EXT_ALPN); |
| 163 | + } |
| 164 | + if(rootCaPath != null) { |
| 165 | + tlsCtxOptions.overrideDefaultTrustStoreFromPath(null, rootCaPath); |
| 166 | + } |
| 167 | + HttpProxyOptions proxyOptions = null; |
| 168 | + if (proxyHost != null && proxyPort > 0) { |
| 169 | + proxyOptions = new HttpProxyOptions(); |
| 170 | + proxyOptions.setHost(proxyHost); |
| 171 | + proxyOptions.setPort(proxyPort); |
| 172 | + } |
| 173 | + |
| 174 | + try(final DiscoveryClientConfig discoveryClientConfig = |
| 175 | + new DiscoveryClientConfig(clientBootstrap, tlsCtxOptions, |
| 176 | + new SocketOptions(), region, 1, proxyOptions); |
| 177 | + final DiscoveryClient discoveryClient = new DiscoveryClient(discoveryClientConfig); |
| 178 | + final MqttClientConnection connection = getClientFromDiscovery(discoveryClient, clientBootstrap)) { |
| 179 | + if (connection.connect().get()) { |
| 180 | + System.out.println("Session resumed"); |
| 181 | + } else { |
| 182 | + System.out.println("Started a clean session"); |
| 183 | + } |
| 184 | + |
| 185 | + if ("subscribe".equals(mode) || "both".equals(mode)) { |
| 186 | + final CompletableFuture<Integer> subFuture = connection.subscribe(topic, QualityOfService.AT_MOST_ONCE, message -> { |
| 187 | + System.out.println(String.format("Message received on topic %s: %s", |
| 188 | + message.getTopic(), new String(message.getPayload(), StandardCharsets.UTF_8))); |
| 189 | + }); |
| 190 | + } |
| 191 | + final Scanner scanner = new Scanner(System.in); |
| 192 | + while (true) { |
| 193 | + String input = null; |
| 194 | + if ("publish".equals(mode) || "both".equals(mode)) { |
| 195 | + System.out.println("Enter the message you want to publish to topic " + topic + " and press Enter. " + |
| 196 | + "Type 'exit' or 'quit' to exit this program: "); |
| 197 | + input = scanner.nextLine(); |
| 198 | + } |
| 199 | + |
| 200 | + if ("exit".equals(input) || "quit".equals(input)) { |
| 201 | + System.out.println("Terminating..."); |
| 202 | + break; |
| 203 | + } |
| 204 | + |
| 205 | + if ("publish".equals(mode) || "both".equals(mode)) { |
| 206 | + final CompletableFuture<Integer> publishResult = connection.publish(new MqttMessage(topic, |
| 207 | + input.getBytes(StandardCharsets.UTF_8)), QualityOfService.AT_MOST_ONCE, false); |
| 208 | + Integer result = publishResult.get(); |
| 209 | + } |
| 210 | + } |
| 211 | + } |
| 212 | + } catch (CrtRuntimeException | InterruptedException | ExecutionException ex) { |
| 213 | + System.out.println("Exception thrown: " + ex.toString()); |
| 214 | + ex.printStackTrace(); |
| 215 | + } |
| 216 | + CrtResource.waitForNoResources(); |
| 217 | + System.out.println("Complete!"); |
| 218 | + } |
| 219 | + |
| 220 | + private static Pattern PATTERN_IS_PRIVATE_IP = Pattern.compile("/(^127\\.)|(^192\\.168\\.)|(^10\\.)|(^172\\.1[6-9]\\.)|(^172\\.2[0-9]\\.)|(^172\\.3[0-1]\\.)|(^::1$)|(^[fF][cCdD])/"); |
| 221 | + |
| 222 | + private static MqttClientConnection getClientFromDiscovery(final DiscoveryClient discoveryClient, |
| 223 | + final ClientBootstrap bootstrap) throws ExecutionException, InterruptedException { |
| 224 | + final CompletableFuture<DiscoverResponse> futureResponse = discoveryClient.discover(thingName); |
| 225 | + final DiscoverResponse response = futureResponse.get(); |
| 226 | + if(response.getGGGroups() != null) { |
| 227 | + final Optional<GGGroup> groupOpt = response.getGGGroups().stream().findFirst(); |
| 228 | + if(groupOpt.isPresent()) { |
| 229 | + final GGGroup group = groupOpt.get(); |
| 230 | + final GGCore core = group.getCores().stream().findFirst().get(); |
| 231 | + final SortedSet<ConnectivityInfo> prioritizedConnectivity = new TreeSet(new Comparator<ConnectivityInfo>() { |
| 232 | + @Override |
| 233 | + public int compare(ConnectivityInfo lhs, ConnectivityInfo rhs) { |
| 234 | + return ordinalValue(lhs) - ordinalValue(rhs); |
| 235 | + } |
| 236 | + private int ordinalValue(ConnectivityInfo info) { |
| 237 | + if(info.getHostAddress().equals("127.0.0.1") || info.getHostAddress().equals("::1")) { |
| 238 | + return 0; |
| 239 | + } |
| 240 | + if(PATTERN_IS_PRIVATE_IP.matcher(info.getHostAddress()).matches()) { |
| 241 | + return 1; |
| 242 | + } |
| 243 | + if(info.getHostAddress().startsWith("AUTOIP_")) { |
| 244 | + return 10; |
| 245 | + } |
| 246 | + return 2; |
| 247 | + } |
| 248 | + }); |
| 249 | + prioritizedConnectivity.addAll(core.getConnectivity()); |
| 250 | + final ConnectivityInfo selectedConnectivity = prioritizedConnectivity.first(); |
| 251 | + final String dnsOrIp = selectedConnectivity.getHostAddress(); |
| 252 | + final Integer port = selectedConnectivity.getPortNumber(); |
| 253 | + |
| 254 | + System.out.println(String.format("Connecting to group ID %s, with thing arn %s, using endpoint %s:%d", |
| 255 | + group.getGGGroupId(), core.getThingArn(), dnsOrIp, port)); |
| 256 | + |
| 257 | + final AwsIotMqttConnectionBuilder connectionBuilder = AwsIotMqttConnectionBuilder.newMtlsBuilderFromPath(certPath, keyPath) |
| 258 | + .withClientId("RaspberryPi") |
| 259 | + .withPort(port.shortValue()) |
| 260 | + .withEndpoint(dnsOrIp) |
| 261 | + .withBootstrap(bootstrap) |
| 262 | + .withConnectionEventCallbacks(new MqttClientConnectionEvents() { |
| 263 | + @Override |
| 264 | + public void onConnectionInterrupted(int errorCode) { System.out.println("Connection interrupted: " + errorCode); } |
| 265 | + @Override |
| 266 | + public void onConnectionResumed(boolean sessionPresent) { |
| 267 | + System.out.println("Connection resumed!"); |
| 268 | + } |
| 269 | + }); |
| 270 | + if(group.getCAs() != null) { |
| 271 | + connectionBuilder.withCertificateAuthority(group.getCAs().get(0)); |
| 272 | + } |
| 273 | + return connectionBuilder.build(); |
| 274 | + } |
| 275 | + } |
| 276 | + throw new RuntimeException("ThingName " + thingName + " does not have a Greengrass group/core configuration"); |
| 277 | + } |
| 278 | +} |
0 commit comments