From 69a42437976684366f5556d12c18e6da79f04e05 Mon Sep 17 00:00:00 2001 From: Kishen Maloor Date: Sun, 7 Mar 2021 11:19:36 -0800 Subject: [PATCH] Update sample-app Producer to notify via websocket This change illustrates a new method for issuing notifications based on a change to the 'edgenode' repository that exposes a bi-directional websocket with a send path for posting notifications. Signed-off-by: Kishen Maloor --- .../sample-app/simpleEaaProducer/main.go | 39 +++++++++++++++--- .../sample-app/simpleEaaProducer/producer.go | 41 +++++++++++++++++++ 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/applications/sample-app/simpleEaaProducer/main.go b/applications/sample-app/simpleEaaProducer/main.go index c93a61e8..09f2d47f 100644 --- a/applications/sample-app/simpleEaaProducer/main.go +++ b/applications/sample-app/simpleEaaProducer/main.go @@ -8,6 +8,7 @@ import ( "log" "time" + "github.com/gorilla/websocket" "github.com/open-ness/edgeapps/applications/sample-app/common" "github.com/pkg/errors" ) @@ -97,6 +98,17 @@ func main() { // stop producer after set timeout done := time.After(time.Second * time.Duration(common.Cfg.ProducerTimeout)) + // Establish websocket connection for issuing notifications + websocketConn, err := ConnectProducerWebsocket(cli) + if err != nil { + log.Panicln("Cannot open web socket: " + err.Error()) + } + defer func() { + if conErr := websocketConn.Close(); conErr != nil { + log.Println("Failed to close socket") + } + }() + workLoop: for { select { @@ -114,17 +126,34 @@ workLoop: break } - // bild event + // build notification event newEvent := common.NotificationFromProducer{ Name: common.Cfg.Notification, Version: common.Cfg.VerNotif, Payload: send, } - // send event to edgeNode - if err := produceEvent(cli, newEvent); err != nil { - log.Fatal(err) + + // Serialize notification event to JSON + notificationJSON, err := json.Marshal(newEvent) + if err != nil { + log.Println(err) + } + // Post notification event to websocket + if err := websocketConn.WriteMessage(websocket.TextMessage, notificationJSON); err != nil { + log.Println(err) } + + /* + Note: Below is an equivalent method for issuing notifications via a HTTP POST request + to /notifications. As individual notifications require separate POST requests, + establishing a TLS session, etc., it takes a performance hit in comparison to + posting the notification to an open websocket as above. But both methods remain + viable. + // send event to edgeNode + if err := produceEvent(cli, newEvent); err != nil { + log.Fatal(err) + } + */ } } - } diff --git a/applications/sample-app/simpleEaaProducer/producer.go b/applications/sample-app/simpleEaaProducer/producer.go index 6743ecc2..ca3e78b3 100644 --- a/applications/sample-app/simpleEaaProducer/producer.go +++ b/applications/sample-app/simpleEaaProducer/producer.go @@ -5,12 +5,14 @@ package main import ( "bytes" + "crypto/tls" "encoding/json" "log" "net/http" "reflect" "time" + "github.com/gorilla/websocket" "github.com/open-ness/edgeapps/applications/sample-app/common" "github.com/pkg/errors" ) @@ -199,3 +201,42 @@ func isProducerAvailable(cli *http.Client, producer common.Service) (bool, commo } return false, allProducers } + +// ConnectProducerWebsocket establishes a websocket connection for posting +// notifications to all subscribed consumers +func ConnectProducerWebsocket(client *http.Client) (*websocket.Conn, error) { + // Get hold of the session handle + transport, ok := client.Transport.(*http.Transport) + if !ok { + return nil, errors.New("HTTP client doens't have http.Transport") + } + + // Construct websocket handle after populating it with the TLS configuration + socket := &websocket.Dialer{ + TLSClientConfig: &tls.Config{ + RootCAs: transport.TLSClientConfig.RootCAs, + Certificates: []tls.Certificate{ + transport.TLSClientConfig.Certificates[0]}, + ServerName: common.Cfg.EaaCommonName, + }, + } + + // Set header for HTTP request to the websocket endpoint + hostHeader := http.Header{} + hostHeader.Add("Host", common.Cfg.Namespace+":"+common.Cfg.ProducerAppID) + + // Establish websocket connection by issuing a request to /notifications + conn, resp, err := socket.Dial("wss://"+common.Cfg.EdgeNodeEndpoint+ + "/notifications", hostHeader) + if err != nil { + return nil, errors.Wrap(err, "Couldn't dial to wss") + } + defer func() { + if err := resp.Body.Close(); err != nil { + log.Println("Failed to close response body") + } + }() + + // Return websocket connection handle + return conn, nil +}