Skip to content

Commit b697a95

Browse files
fix: graceful shutdown of the service (#19)
close the connection during shutdown and capture the data loss metric appropriately
1 parent d9780ef commit b697a95

7 files changed

Lines changed: 241 additions & 22 deletions

File tree

app/proc.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ func Run() error {
1212
ctx, cancel := context.WithCancel(ctx)
1313

1414
//@TODO - init config
15-
15+
shutdown := make(chan bool)
1616
//start server
17-
StartServer(ctx, cancel)
17+
StartServer(ctx, cancel, shutdown)
1818
logger.Info("App.Run --> Complete")
19-
<-ctx.Done()
19+
<-shutdown
2020
return nil
2121
}

app/server.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ import (
1919
)
2020

2121
// StartServer starts the server
22-
func StartServer(ctx context.Context, cancel context.CancelFunc) {
22+
func StartServer(ctx context.Context, cancel context.CancelFunc, shutdown chan bool) {
2323
bufferChannel := make(chan collection.CollectRequest, config.Worker.ChannelSize)
24-
httpServices := services.Create(bufferChannel)
24+
httpServices := services.Create(bufferChannel, ctx)
2525
logger.Info("Start Server -->")
2626
httpServices.Start(ctx, cancel)
2727
logger.Info("Start publisher -->")
@@ -37,38 +37,47 @@ func StartServer(ctx context.Context, cancel context.CancelFunc) {
3737
workerPool.StartWorkers()
3838
go kPublisher.ReportStats()
3939
go reportProcMetrics()
40-
go shutDownServer(ctx, cancel, httpServices, bufferChannel, workerPool, kPublisher)
40+
// create signal channel at startup
41+
signalChan := make(chan os.Signal, 1)
42+
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
43+
44+
go shutDownServer(ctx, cancel, httpServices, bufferChannel, workerPool, kPublisher, shutdown, signalChan)
4145
}
4246

43-
func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices services.Services, bufferChannel chan collection.CollectRequest, workerPool *worker.Pool, kp *publisher.Kafka) {
44-
signalChan := make(chan os.Signal)
45-
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
47+
func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices services.Services, bufferChannel chan collection.CollectRequest,
48+
workerPool *worker.Pool, kp *publisher.Kafka, shutdown chan bool, signalChan chan os.Signal) {
4649
for {
4750
sig := <-signalChan
4851
switch sig {
4952
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
5053
logger.Info(fmt.Sprintf("[App.Server] Received a signal %s", sig))
5154
httpServices.Shutdown(ctx)
5255
logger.Info("Server shutdown all the listeners")
56+
cancel()
57+
close(bufferChannel)
5358
timedOut := workerPool.FlushWithTimeOut(config.Worker.WorkerFlushTimeout)
5459
if timedOut {
5560
logger.Info(fmt.Sprintf("WorkerPool flush timedout %t", timedOut))
61+
} else {
62+
logger.Info("WorkerPool flushed all events")
5663
}
5764
flushInterval := config.PublisherKafka.FlushInterval
5865
logger.Info("Closing Kafka producer")
5966
logger.Info(fmt.Sprintf("Wait %d ms for all messages to be delivered", flushInterval))
6067
eventsInProducer := kp.Close()
61-
/**
62-
@TODO - should compute the actual no., of events per batch and therefore the total. We can do this only when we close all the active connections
63-
Until then we fall back to approximation */
64-
eventsInChannel := len(bufferChannel) * 7
65-
logger.Info(fmt.Sprintf("Outstanding unprocessed events in the channel, data lost ~ (No batches %d * 5 events) = ~%d", len(bufferChannel), eventsInChannel))
66-
metrics.Count("kafka_messages_delivered_total", eventsInChannel+eventsInProducer, "success=false")
68+
eventCountInChannel := 0
69+
for i := 0; i < len(bufferChannel); i++ {
70+
req := <-bufferChannel
71+
eventCountInChannel += len(req.Events)
72+
}
73+
logger.Info(fmt.Sprintf("number of events dropped during the shutdown %d", eventCountInChannel+eventsInProducer))
74+
metrics.Count("total_data_loss", eventCountInChannel+eventsInProducer, "reason=shutdown")
6775
logger.Info("Exiting server")
68-
cancel()
76+
shutdown <- true
6977
default:
7078
logger.Info(fmt.Sprintf("[App.Server] Received a unexpected signal %s", sig))
7179
}
80+
return
7281
}
7382
}
7483

app/server_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package app
2+
3+
import (
4+
"context"
5+
"github.com/goto/raccoon/publisher"
6+
"github.com/goto/raccoon/services"
7+
"github.com/goto/raccoon/worker"
8+
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
9+
"os"
10+
"syscall"
11+
"testing"
12+
"time"
13+
14+
"github.com/goto/raccoon/collection"
15+
)
16+
17+
func TestShutDownServer(t *testing.T) {
18+
ctx, cancel := context.WithCancel(context.Background())
19+
defer cancel()
20+
mockKafka := &mockKafkaClient{}
21+
22+
kp := publisher.NewKafkaFromClient(mockKafka, 50, "test")
23+
24+
shutdownCh := make(chan bool, 1)
25+
bufferCh := make(chan collection.CollectRequest, 1)
26+
27+
services := services.Create(bufferCh, ctx)
28+
29+
wp := worker.CreateWorkerPool(1, bufferCh, 1, kp)
30+
31+
// run shutdown in background
32+
sigCh := make(chan os.Signal, 1)
33+
go shutDownServer(ctx, cancel, services, bufferCh, wp, kp, shutdownCh, sigCh)
34+
35+
// send a termination signal after short delay
36+
go func() {
37+
time.Sleep(50 * time.Millisecond)
38+
sigCh <- syscall.SIGTERM
39+
}()
40+
41+
select {
42+
case <-shutdownCh:
43+
t.Log("shutdown executed successfully")
44+
case <-time.After(500 * time.Millisecond):
45+
t.Error("shutdown execution failed")
46+
}
47+
48+
if !isClosed(bufferCh) {
49+
t.Errorf("expected buffer channel to be closed")
50+
}
51+
if !mockKafka.FlushCalled {
52+
t.Errorf("expected Kafka.FlushCalled to be called")
53+
}
54+
if !mockKafka.CloseCalled {
55+
t.Errorf("expected Kafka.CloseCalled to be called")
56+
}
57+
}
58+
59+
func isClosed(ch <-chan collection.CollectRequest) bool {
60+
select {
61+
case _, ok := <-ch:
62+
return !ok
63+
default:
64+
return false
65+
}
66+
}
67+
68+
// ---- Mocks ----
69+
// mockKafkaClient is a mock for the Client interface
70+
type mockKafkaClient struct {
71+
// Tracking flags
72+
ProduceCalled bool
73+
CloseCalled bool
74+
FlushCalled bool
75+
EventsCalled bool
76+
77+
ReturnFlushLeft int
78+
EventChan chan kafka.Event
79+
}
80+
81+
func (m *mockKafkaClient) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
82+
m.ProduceCalled = true
83+
return nil
84+
}
85+
86+
func (m *mockKafkaClient) Close() {
87+
m.CloseCalled = true
88+
}
89+
90+
func (m *mockKafkaClient) Flush(timeout int) int {
91+
m.FlushCalled = true
92+
return m.ReturnFlushLeft
93+
}
94+
95+
func (m *mockKafkaClient) Events() chan kafka.Event {
96+
m.EventsCalled = true
97+
if m.EventChan == nil {
98+
m.EventChan = make(chan kafka.Event, 1)
99+
}
100+
return m.EventChan
101+
}

services/rest/service.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ type Service struct {
1919
s *http.Server
2020
}
2121

22-
func NewRestService(c collection.Collector) *Service {
22+
func NewRestService(c collection.Collector, ctx context.Context) *Service {
2323
pingChannel := make(chan connection.Conn, config.ServerWs.ServerMaxConn)
2424
wh := websocket.NewHandler(pingChannel, c)
25-
go websocket.Pinger(pingChannel, config.ServerWs.PingerSize, config.ServerWs.PingInterval, config.ServerWs.WriteWaitInterval)
25+
go websocket.Pinger(ctx, pingChannel, config.ServerWs.PingerSize, config.ServerWs.PingInterval, config.ServerWs.WriteWaitInterval)
2626

2727
go reportConnectionMetrics(*wh.Table())
2828

services/rest/websocket/pinger.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package websocket
22

33
import (
4+
"context"
45
"fmt"
56
"time"
67

@@ -10,14 +11,24 @@ import (
1011
"github.com/goto/raccoon/services/rest/websocket/connection"
1112
)
1213

13-
// Pinger is worker that pings the connected peers based on ping interval.
14-
func Pinger(c chan connection.Conn, size int, PingInterval time.Duration, WriteWaitInterval time.Duration) {
14+
// Pinger is worker that pings the connected peers based on ping interval. This will also shutdown all connection kept in the cset map
15+
func Pinger(ctx context.Context, c chan connection.Conn, size int, PingInterval time.Duration, WriteWaitInterval time.Duration) {
1516
for i := 0; i < size; i++ {
1617
go func() {
1718
cSet := make(map[identification.Identifier]connection.Conn)
1819
ticker := time.NewTicker(PingInterval)
20+
defer ticker.Stop()
1921
for {
2022
select {
23+
//close the connection
24+
case <-ctx.Done():
25+
// shutdown signal received
26+
logger.Infof("[websocket.pinger] shutting down, closing %d active connections", len(cSet))
27+
for _, conn := range cSet {
28+
conn.Close()
29+
}
30+
logger.Info("[websocket.pinger] - shutting down, successful")
31+
return
2132
case conn := <-c:
2233
cSet[conn.Identifier] = conn
2334
case <-ticker.C:
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package websocket
2+
3+
import (
4+
"context"
5+
"github.com/gorilla/websocket"
6+
"github.com/goto/raccoon/services/rest/websocket/connection"
7+
"net/http"
8+
"net/http/httptest"
9+
"strings"
10+
"sync"
11+
"testing"
12+
"time"
13+
)
14+
15+
func TestPinger(t *testing.T) {
16+
ctx, cancel := context.WithCancel(context.Background())
17+
18+
connCh := make(chan connection.Conn, 1)
19+
20+
upgrader := connection.NewUpgrader(connection.UpgraderConfig{
21+
ReadBufferSize: 1024,
22+
WriteBufferSize: 1024,
23+
CheckOrigin: true,
24+
MaxUser: 10,
25+
PongWaitInterval: 1 * time.Second,
26+
WriteWaitInterval: 1 * time.Second,
27+
ConnIDHeader: "X-Conn-ID",
28+
ConnGroupHeader: "X-Conn-Group",
29+
ConnGroupDefault: "default",
30+
})
31+
32+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
33+
conn, err := upgrader.Upgrade(w, r)
34+
if err != nil {
35+
t.Errorf("failed to upgrade: %v", err)
36+
return
37+
}
38+
connCh <- conn
39+
}))
40+
defer srv.Close()
41+
42+
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
43+
client, _, err := websocket.DefaultDialer.Dial(wsURL, http.Header{
44+
"X-Conn-ID": {"1"},
45+
"X-Conn-Group": {"test"},
46+
})
47+
if err != nil {
48+
t.Fatalf("failed to dial test server: %v", err)
49+
}
50+
51+
// ping detection
52+
pingReceived := make(chan struct{}, 1)
53+
client.SetPingHandler(func(appData string) error {
54+
t.Logf("client received ping: %s", appData)
55+
select { // avoid blocking if already signaled
56+
case pingReceived <- struct{}{}:
57+
default:
58+
}
59+
return client.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(time.Second))
60+
})
61+
62+
// close detection
63+
closed := make(chan struct{})
64+
var once sync.Once
65+
go func() {
66+
for {
67+
_, _, err := client.ReadMessage()
68+
if err != nil {
69+
once.Do(func() { close(closed) })
70+
return
71+
}
72+
}
73+
}()
74+
75+
// start pinger
76+
go Pinger(ctx, connCh, 1, 20*time.Millisecond, 5*time.Millisecond)
77+
78+
// assert ping received
79+
select {
80+
case <-pingReceived:
81+
t.Log("client received ping")
82+
case <-time.After(500 * time.Millisecond):
83+
t.Fatal("client did not receive ping")
84+
}
85+
86+
// now cancel -> triggers connection close
87+
cancel()
88+
89+
select {
90+
case <-closed:
91+
t.Log("connection closed after cancel()")
92+
case <-time.After(500 * time.Millisecond):
93+
t.Fatal("connection not closed after cancel()")
94+
}
95+
96+
// close client at the very end
97+
client.Close()
98+
}

services/services.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ func (s *Services) Shutdown(ctx context.Context) {
4343
}
4444
}
4545

46-
func Create(b chan collection.CollectRequest) Services {
46+
func Create(b chan collection.CollectRequest, ctx context.Context) Services {
4747
c := collection.NewChannelCollector(b)
4848
return Services{
4949
b: []bootstrapper{
5050
grpc.NewGRPCService(c),
5151
pprof.NewPprofService(),
52-
rest.NewRestService(c),
52+
rest.NewRestService(c, ctx),
5353
},
5454
}
5555
}

0 commit comments

Comments
 (0)