Skip to content

Commit ad18f3a

Browse files
authored
Use websocket rather than http streaming for real time state step data (#15)
1 parent de394b3 commit ad18f3a

File tree

5 files changed

+77
-278
lines changed

5 files changed

+77
-278
lines changed

flow-state/event/eventlistener.go

+73-54
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,38 @@
11
package event
22

33
import (
4+
"fmt"
5+
"net"
46
"net/http"
57
"time"
68

7-
9+
"github.com/gorilla/websocket"
810
"github.com/julienschmidt/httprouter"
911
"github.com/project-flogo/core/engine/event"
1012
"github.com/project-flogo/core/support/log"
1113
"github.com/project-flogo/flow/state"
12-
"github.com/project-flogo/services/flow-state/stream"
1314
)
1415

1516
var recorderLog = log.ChildLogger(log.RootLogger(), "step-listener")
1617

17-
var stepEventQueue = make(chan *event.Context, 1)
18-
var done = make(chan bool, 1)
19-
var stepChan = make(chan *state.Step, 100)
18+
const (
19+
// PongWait Time allowed to read the next pong message from the peer.
20+
pongWait = 60 * time.Second
21+
22+
// PingPeriod Send pings to peer with this period. Must be less than PongWait.
23+
pingPeriod = 10 * time.Second
24+
)
25+
26+
var upgrader = websocket.Upgrader{
27+
CheckOrigin: func(r *http.Request) bool {
28+
return true
29+
},
30+
ReadBufferSize: 1024,
31+
WriteBufferSize: 1024,
32+
}
2033

21-
var s stream.EventStreamScheduler
34+
var stepEventQueue = make(chan *event.Context, 10)
35+
var stepChan = make(chan *state.Step)
2236

2337
type recorderEvent struct {
2438
}
@@ -42,63 +56,68 @@ func handleRecordEvent() {
4256
case stepE := <-stepEventQueue:
4357
switch t := stepE.GetEvent().(type) {
4458
case *stepEvent:
45-
if s == nil {
46-
for checking := true; checking; {
47-
if s != nil {
48-
checking = false
49-
break
50-
}
51-
recorderLog.Infof("Waiting for client step steaming connection")
52-
time.Sleep(1 * time.Second)
53-
}
54-
recorderLog.Infof("Client steaming steps connected")
55-
}
56-
step := t.Step()
57-
if len(step.FlowChanges) > 0 {
58-
//main flow
59-
flowChange, ok := step.FlowChanges[0]
60-
if ok && (flowChange.Status == 500 || flowChange.Status == 600 || flowChange.Status == 700) {
61-
//Finish it after flow completed/canceled/failed
62-
s.Finish(&stream.Response{Step: step, Status: stream.Completed})
63-
s = nil
64-
done <- true
65-
} else {
66-
s.UpdateResponse(&stream.Response{Step: step, Status: stream.Running})
67-
}
68-
} else {
69-
s.UpdateResponse(&stream.Response{Step: step, Status: stream.Running})
70-
}
59+
stepChan <- t.step
7160
}
7261
}
7362
}
7463
}
7564

76-
func setWriter(ss stream.EventStreamScheduler) {
77-
s = ss
78-
}
79-
8065
// Status is a basic health check for the server to determine if it is up
8166
func HandleStepEvent(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
82-
connectionMonitor := stream.NewConnectionMonitor(make(chan int, 10))
83-
if cn, ok := w.(http.CloseNotifier); ok {
84-
// Listen to the closing of the http connection via the CloseNotifier
85-
notify := cn.CloseNotify()
86-
go func() {
87-
<-notify
88-
recorderLog.Info("Connection closed by the client or broken.")
89-
connectionMonitor.ConnectionLost()
90-
}()
67+
recorderLog.Debugf("Received step event websocket request: %+v", r)
68+
conn, err := upgrader.Upgrade(w, r, nil)
69+
if err != nil {
70+
recorderLog.Errorf("websocket upgrade failed: %s", err.Error())
71+
return
9172
}
92-
93-
scheduler := stream.NewEventStreamScheduler(w, r, &stream.Response{Status: stream.Running, Details: "Waiting step data", Step: nil})
94-
go func() {
95-
// Start the scheduler
96-
recorderLog.Debug("Starting connection monitor scheduler...")
97-
scheduler.Start(connectionMonitor)
73+
defer func() {
74+
if err := conn.Close(); err != nil {
75+
recorderLog.Errorf("close websocket error: %v", err)
76+
}
9877
}()
78+
// Send ping
79+
ticker := time.NewTicker(pingPeriod)
80+
conn.SetPingHandler(func(string) error {
81+
if recorderLog.DebugEnabled() {
82+
recorderLog.Debug("Ping Handler")
83+
}
84+
if err := conn.WriteControl(websocket.PongMessage, nil, time.Now().Add(pongWait)); err != nil {
85+
recorderLog.Warnf("Sending PONG to client error: %v", err)
86+
}
87+
return nil
88+
})
9989

100-
recorderLog.Info("Step streaming request comes")
90+
conn.SetPongHandler(func(string) error {
91+
if recorderLog.DebugEnabled() {
92+
recorderLog.Debug("PONG Handler")
93+
}
94+
conn.SetReadDeadline(time.Now().Add(pongWait)) // reset read timeout on receiving a pong
95+
return nil
96+
})
10197

102-
setWriter(scheduler)
103-
<-done
98+
for {
99+
select {
100+
case step := <-stepChan:
101+
err = conn.WriteJSON(step)
102+
if err != nil {
103+
conn.WriteControl(websocket.CloseMessage,
104+
websocket.FormatCloseMessage(websocket.CloseInternalServerErr, fmt.Sprintf("Write json data error:%s", err.Error())),
105+
time.Now())
106+
recorderLog.Error("error writing message: %s", err.Error())
107+
return
108+
}
109+
case <-ticker.C:
110+
recorderLog.Debugf("Sending heartbeat Ping to client")
111+
// NOTE: Control frame writes do not need to be synchronized
112+
if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(pongWait)); err != nil {
113+
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
114+
recorderLog.Warnf("Sent heartbeat Ping timeout: %v", err)
115+
} else {
116+
// for any other error we need to close tunnel
117+
recorderLog.Warnf("Sent heartbeat Ping error: %v", err)
118+
conn.Close()
119+
}
120+
}
121+
}
122+
}
104123
}

flow-state/go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
module github.com/project-flogo/services/flow-state
22

33
require (
4+
github.com/gorilla/websocket v1.4.2
45
github.com/julienschmidt/httprouter v1.3.0
56
github.com/project-flogo/core v0.10.1
67
github.com/project-flogo/flow v0.10.0

flow-state/go.sum

+3
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
44
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
55
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
66
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
7+
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
8+
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
79
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
810
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
911
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
12+
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
1013
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
1114
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
1215
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=

flow-state/stream/connection.go

-38
This file was deleted.

0 commit comments

Comments
 (0)