Skip to content
This repository was archived by the owner on Oct 30, 2023. It is now read-only.

Update sample-app Producer to notify via websocket #44

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions applications/sample-app/simpleEaaProducer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"
"time"

"github.com/gorilla/websocket"
"github.com/open-ness/edgeapps/applications/sample-app/common"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -97,6 +98,17 @@ func main() {
// stop producer after set timeout
done := time.After(time.Second * time.Duration(common.Cfg.ProducerTimeout))

// Establish websocket connection for issuing notifications
websocketConn, err := ConnectProducerWebsocket(cli)
if err != nil {
log.Panicln("Cannot open web socket: " + err.Error())
}
defer func() {
if conErr := websocketConn.Close(); conErr != nil {
log.Println("Failed to close socket")
}
}()

workLoop:
for {
select {
Expand All @@ -114,17 +126,34 @@ workLoop:
break
}

// bild event
// build notification event
newEvent := common.NotificationFromProducer{
Name: common.Cfg.Notification,
Version: common.Cfg.VerNotif,
Payload: send,
}
// send event to edgeNode
if err := produceEvent(cli, newEvent); err != nil {
log.Fatal(err)

// Serialize notification event to JSON
notificationJSON, err := json.Marshal(newEvent)
if err != nil {
log.Println(err)
}
// Post notification event to websocket
if err := websocketConn.WriteMessage(websocket.TextMessage, notificationJSON); err != nil {
log.Println(err)
}

/*
Note: Below is an equivalent method for issuing notifications via a HTTP POST request
to /notifications. As individual notifications require separate POST requests,
establishing a TLS session, etc., it takes a performance hit in comparison to
posting the notification to an open websocket as above. But both methods remain
viable.
// send event to edgeNode
if err := produceEvent(cli, newEvent); err != nil {
log.Fatal(err)
}
*/
}
}

}
41 changes: 41 additions & 0 deletions applications/sample-app/simpleEaaProducer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ package main

import (
"bytes"
"crypto/tls"
"encoding/json"
"log"
"net/http"
"reflect"
"time"

"github.com/gorilla/websocket"
"github.com/open-ness/edgeapps/applications/sample-app/common"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -199,3 +201,42 @@ func isProducerAvailable(cli *http.Client, producer common.Service) (bool, commo
}
return false, allProducers
}

// ConnectProducerWebsocket establishes a websocket connection for posting
// notifications to all subscribed consumers
func ConnectProducerWebsocket(client *http.Client) (*websocket.Conn, error) {
// Get hold of the session handle
transport, ok := client.Transport.(*http.Transport)
if !ok {
return nil, errors.New("HTTP client doens't have http.Transport")
}

// Construct websocket handle after populating it with the TLS configuration
socket := &websocket.Dialer{
TLSClientConfig: &tls.Config{
RootCAs: transport.TLSClientConfig.RootCAs,
Certificates: []tls.Certificate{
transport.TLSClientConfig.Certificates[0]},
ServerName: common.Cfg.EaaCommonName,
},
}

// Set header for HTTP request to the websocket endpoint
hostHeader := http.Header{}
hostHeader.Add("Host", common.Cfg.Namespace+":"+common.Cfg.ProducerAppID)

// Establish websocket connection by issuing a request to /notifications
conn, resp, err := socket.Dial("wss://"+common.Cfg.EdgeNodeEndpoint+
"/notifications", hostHeader)
if err != nil {
return nil, errors.Wrap(err, "Couldn't dial to wss")
}
defer func() {
if err := resp.Body.Close(); err != nil {
log.Println("Failed to close response body")
}
}()

// Return websocket connection handle
return conn, nil
}