55
66package com .aws .greengrass .localdebugconsole ;
77
8+ import com .aws .greengrass .builtin .services .pubsub .PubSubIPCEventStreamAgent ;
9+ import com .aws .greengrass .builtin .services .pubsub .PublishEvent ;
10+ import com .aws .greengrass .builtin .services .pubsub .SubscribeRequest ;
811import com .aws .greengrass .deployment .DeviceConfiguration ;
912import com .aws .greengrass .lifecyclemanager .Kernel ;
13+ import com .aws .greengrass .localdebugconsole .messageutils .CommunicationMessage ;
1014import com .aws .greengrass .localdebugconsole .messageutils .DeviceDetails ;
1115import com .aws .greengrass .localdebugconsole .messageutils .Message ;
1216import com .aws .greengrass .localdebugconsole .messageutils .MessageType ;
1317import com .aws .greengrass .localdebugconsole .messageutils .PackedRequest ;
1418import com .aws .greengrass .localdebugconsole .messageutils .Request ;
1519import com .aws .greengrass .logging .api .Logger ;
20+ import com .aws .greengrass .mqttclient .MqttClient ;
21+ import com .aws .greengrass .mqttclient .MqttRequestException ;
22+ import com .aws .greengrass .mqttclient .v5 .Publish ;
23+ import com .aws .greengrass .mqttclient .v5 .Subscribe ;
24+ import com .aws .greengrass .mqttclient .v5 .Unsubscribe ;
1625import com .aws .greengrass .util .DefaultConcurrentHashMap ;
1726import com .aws .greengrass .util .Pair ;
1827import com .fasterxml .jackson .core .JsonProcessingException ;
28+ import com .fasterxml .jackson .databind .JsonNode ;
1929import com .fasterxml .jackson .databind .ObjectMapper ;
2030import lombok .AccessLevel ;
2131import lombok .Getter ;
2232import org .java_websocket .WebSocket ;
2333import org .java_websocket .exceptions .WebsocketNotConnectedException ;
2434import org .java_websocket .handshake .ClientHandshake ;
2535import org .java_websocket .server .WebSocketServer ;
36+ import software .amazon .awssdk .aws .greengrass .model .ReceiveMode ;
2637
2738import java .net .InetSocketAddress ;
2839import java .util .HashSet ;
2940import java .util .Map ;
3041import java .util .Set ;
3142import java .util .concurrent .CompletableFuture ;
43+ import java .util .concurrent .ConcurrentHashMap ;
3244import java .util .concurrent .CopyOnWriteArraySet ;
45+ import java .util .concurrent .ExecutionException ;
46+ import java .util .function .Consumer ;
3347import javax .inject .Provider ;
3448import javax .inject .Singleton ;
3549import javax .net .ssl .SSLEngine ;
3650
3751@ Singleton
3852public class DashboardServer extends WebSocketServer implements KernelMessagePusher {
3953 static final String SERVER_START_MESSAGE = "Server started successfully" ;
54+ private static final String IOT_CORE_SOURCE = "iotcore" ;
4055
4156 private final DashboardAPI dashboardAPI ;
4257 private final Logger logger ;
@@ -47,18 +62,29 @@ public class DashboardServer extends WebSocketServer implements KernelMessagePus
4762 new DefaultConcurrentHashMap <>(HashSet ::new );
4863 private final DefaultConcurrentHashMap <String , Set <WebSocket >> logWatchlist =
4964 new DefaultConcurrentHashMap <>(HashSet ::new );
65+ private final DefaultConcurrentHashMap <WebSocket , Map <String , SubscribeRequest >> pubSubWatchList =
66+ new DefaultConcurrentHashMap <>(ConcurrentHashMap ::new );
67+ private final DefaultConcurrentHashMap <WebSocket , Map <String , Subscribe >> mqttWatchList =
68+ new DefaultConcurrentHashMap <>(ConcurrentHashMap ::new );
5069 @ Getter (AccessLevel .PACKAGE )
5170 private final CompletableFuture <Object > started = new CompletableFuture <>();
5271 private final Authenticator authenticator ;
72+ private final MqttClient mqttClient ;
73+
74+ PubSubIPCEventStreamAgent pubSubIPCAgent ;
75+ private final String SERVICE_NAME = "LocalDebugConsole" ;
5376
5477 public DashboardServer (InetSocketAddress address , Logger logger , Kernel root , DeviceConfiguration deviceConfig ,
5578 Authenticator authenticator , Provider <SSLEngine > engineProvider ) {
56- this (address , logger , new KernelCommunicator (root , logger , deviceConfig ), authenticator , engineProvider );
79+ this (address , logger , new KernelCommunicator (root , logger , deviceConfig ), authenticator , engineProvider ,
80+ root .getContext ().get (PubSubIPCEventStreamAgent .class ),
81+ root .getContext ().get (MqttClient .class ));
5782 }
5883
5984 // constructor for unit testing
6085 DashboardServer (InetSocketAddress address , Logger logger , DashboardAPI dashboardAPI , Authenticator authenticator ,
61- Provider <SSLEngine > engineProvider ) {
86+ Provider <SSLEngine > engineProvider , PubSubIPCEventStreamAgent pubSubIPCAgent ,
87+ MqttClient mqttClient ) {
6288 super (address );
6389 setReuseAddr (true );
6490 setTcpNoDelay (true );
@@ -69,6 +95,8 @@ public DashboardServer(InetSocketAddress address, Logger logger, Kernel root, De
6995 this .dashboardAPI = dashboardAPI ;
7096 this .authenticator = authenticator ;
7197 this .logger .atInfo ().log ("Starting dashboard server on address: {}" , address );
98+ this .pubSubIPCAgent = pubSubIPCAgent ;
99+ this .mqttClient = mqttClient ;
72100 }
73101
74102 // links the API impl and starts the socket server
@@ -180,7 +208,6 @@ public void onMessage(WebSocket conn, String msg) {
180208 dashboardAPI .updateConfig (req .args [0 ], req .args [1 ])));
181209 break ;
182210 }
183-
184211 case subscribeToComponent : {
185212 statusWatchlist .get (req .args [0 ]).add (conn );
186213 pushComponentChange (req .args [0 ]);
@@ -212,6 +239,18 @@ public void onMessage(WebSocket conn, String msg) {
212239 sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , true ));
213240 break ;
214241 }
242+ case subscribeToPubSubTopic : {
243+ subscribeToPubSubTopic (conn , packedRequest , req );
244+ break ;
245+ }
246+ case publishToPubSubTopic : {
247+ publishToPubSubTopic (conn , packedRequest , req );
248+ break ;
249+ }
250+ case unsubscribeToPubSubTopic : {
251+ unsubscribeFromPubSubTopic (conn , packedRequest , req );
252+ break ;
253+ }
215254 default : { // echo
216255 sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , req .call ));
217256 break ;
@@ -220,11 +259,117 @@ public void onMessage(WebSocket conn, String msg) {
220259 }
221260 }
222261
262+ private void subscribeToPubSubTopic (WebSocket conn , PackedRequest packedRequest , Request req ) {
263+ JsonNode tree ;
264+ try {
265+ tree = jsonMapper .readTree (req .args [0 ]);
266+ } catch (JsonProcessingException e ) {
267+ sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , e .getMessage ()));
268+ return ;
269+ }
270+
271+ String topicFilter = tree .get ("topicFilter" ).textValue ();
272+ String source = tree .get ("source" ).textValue ();
273+ String subId = tree .get ("subId" ).textValue ();
274+ try {
275+ if (IOT_CORE_SOURCE .equals (source )) {
276+ mqttWatchList .get (conn ).computeIfAbsent (subId , (a ) -> {
277+ Consumer <Publish > cb = (c ) -> {
278+ CommunicationMessage resMessage =
279+ new CommunicationMessage (subId , topicFilter , c .getTopic (),
280+ new String (c .getPayload ()));
281+ sendIfOpen (conn , new Message (MessageType .PUB_SUB_MSG , resMessage ));
282+ };
283+ Subscribe subReq = Subscribe .builder ().callback (cb ).topic (topicFilter ).build ();
284+ try {
285+ mqttClient .subscribe (subReq ).get ();
286+ } catch (MqttRequestException | InterruptedException | ExecutionException e ) {
287+ throw new RuntimeException (e );
288+ }
289+ return subReq ;
290+ });
291+ } else {
292+ pubSubWatchList .get (conn ).computeIfAbsent (subId , (a ) -> {
293+ Consumer <PublishEvent > cb = (c ) -> {
294+ CommunicationMessage resMessage =
295+ new CommunicationMessage (subId , topicFilter , c .getTopic (),
296+ new String (c .getPayload ()));
297+ sendIfOpen (conn , new Message (MessageType .PUB_SUB_MSG , resMessage ));
298+ };
299+ SubscribeRequest subReq = SubscribeRequest .builder ().callback (cb )
300+ .receiveMode (ReceiveMode .RECEIVE_ALL_MESSAGES ).topic (topicFilter )
301+ .serviceName (SERVICE_NAME ).build ();
302+ pubSubIPCAgent .subscribe (subReq );
303+ return subReq ;
304+ });
305+ }
306+ sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , true ));
307+ } catch (Exception e ) {
308+ sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , e .getMessage ()));
309+ }
310+ }
311+
312+ private void unsubscribeFromPubSubTopic (WebSocket conn , PackedRequest packedRequest , Request req ) {
313+ SubscribeRequest subReq = pubSubWatchList .get (conn ).remove (req .args [0 ]);
314+ if (subReq != null ) {
315+ pubSubIPCAgent .unsubscribe (subReq );
316+ }
317+ Subscribe mqttSub = mqttWatchList .get (conn ).remove (req .args [0 ]);
318+ if (mqttSub != null ) {
319+ try {
320+ mqttClient .unsubscribe (Unsubscribe .builder ().topic (mqttSub .getTopic ())
321+ .subscriptionCallback (mqttSub .getCallback ()).build ());
322+ } catch (MqttRequestException e ) {
323+ sendIfOpen (conn ,
324+ new Message (MessageType .RESPONSE , packedRequest .requestID , e .getMessage ()));
325+ return ;
326+ }
327+ }
328+ sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , true ));
329+ }
330+
331+ private void publishToPubSubTopic (WebSocket conn , PackedRequest packedRequest , Request req ) {
332+ JsonNode tree ;
333+ try {
334+ tree = jsonMapper .readTree (req .args [0 ]);
335+ } catch (JsonProcessingException e ) {
336+ sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , e .getMessage ()));
337+ return ;
338+ }
339+
340+ String topic = tree .get ("topic" ).textValue ();
341+ String destination = tree .get ("destination" ).textValue ();
342+ String payload = tree .get ("payload" ).textValue ();
343+ try {
344+ if (IOT_CORE_SOURCE .equals (destination )) {
345+ mqttClient .publish (Publish .builder ()
346+ .topic (topic )
347+ .payload (payload .getBytes ())
348+ .build ());
349+ } else {
350+ pubSubIPCAgent .publish (topic , payload .getBytes (), SERVICE_NAME );
351+ }
352+ sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , true ));
353+ } catch (Exception e ) {
354+ sendIfOpen (conn , new Message (MessageType .RESPONSE , packedRequest .requestID , e .getMessage ()));
355+ }
356+ }
357+
223358 @ Override
224359 public void onClose (WebSocket conn , int code , String reason , boolean remote ) {
225360 connections .remove (conn );
226361 statusWatchlist .forEach ((name , set ) -> set .remove (conn ));
227362 logWatchlist .forEach ((name , set ) -> set .remove (conn ));
363+ pubSubWatchList .get (conn ).forEach ((topic , sub ) -> pubSubIPCAgent .unsubscribe (sub ));
364+ mqttWatchList .get (conn ).forEach ((topic , sub ) -> {
365+ try {
366+ mqttClient .unsubscribe (Unsubscribe .builder ()
367+ .subscriptionCallback (sub .getCallback ())
368+ .topic (sub .getTopic ()).build ());
369+ } catch (MqttRequestException e ) {
370+ logger .error ("failed to unsubscribe" , e );
371+ }
372+ });
228373 logger .atInfo ()
229374 .log ("closed {} with exit code {}, additional info: {}" , conn .getRemoteSocketAddress (), code , reason );
230375 }
@@ -244,7 +389,7 @@ public void onStart() {
244389 @ Override
245390 public void pushComponentListUpdate () {
246391 for (WebSocket conn : connections ) {
247- sendIfOpen (conn , new Message (MessageType .COMPONENT_LIST , - 1 , dashboardAPI .getComponentList ()));
392+ sendIfOpen (conn , new Message (MessageType .COMPONENT_LIST , dashboardAPI .getComponentList ()));
248393 }
249394 }
250395
@@ -253,7 +398,7 @@ public void pushComponentChange(String name) {
253398 if (statusWatchlist .containsKey (name )) {
254399 statusWatchlist .computeIfPresent (name , (k ,set ) -> {
255400 for (WebSocket conn : set ) {
256- sendIfOpen (conn , new Message (MessageType .COMPONENT_CHANGE , - 1 , dashboardAPI .getComponent (name )));
401+ sendIfOpen (conn , new Message (MessageType .COMPONENT_CHANGE , dashboardAPI .getComponent (name )));
257402 }
258403 return set ;
259404 });
@@ -263,7 +408,7 @@ public void pushComponentChange(String name) {
263408 @ Override
264409 public void pushDependencyGraphUpdate () {
265410 for (WebSocket conn : connections ) {
266- sendIfOpen (conn , new Message (MessageType .DEPS_GRAPH , - 1 , dashboardAPI .getDependencyGraph ()));
411+ sendIfOpen (conn , new Message (MessageType .DEPS_GRAPH , dashboardAPI .getDependencyGraph ()));
267412 }
268413 }
269414
0 commit comments