Skip to content

Commit 705c5d2

Browse files
fix: graceful shutdown of the server
1 parent c19255a commit 705c5d2

5 files changed

Lines changed: 121 additions & 70 deletions

File tree

app/server.go

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ func StartServer(ctx context.Context, cancel context.CancelFunc) {
3535
logger.Info("Start worker -->")
3636
workerPool := worker.CreateWorkerPool(config.Worker.WorkersPoolSize, bufferChannel, config.Worker.DeliveryChannelSize, kPublisher)
3737
workerPool.StartWorkers()
38-
go kPublisher.ReportStats()
39-
go reportProcMetrics()
38+
go kPublisher.ReportStats(ctx)
39+
go reportProcMetrics(ctx)
4040
go shutDownServer(ctx, cancel, httpServices, bufferChannel, workerPool, kPublisher)
4141
}
4242

@@ -53,17 +53,19 @@ func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices
5353
timedOut := workerPool.FlushWithTimeOut(config.Worker.WorkerFlushTimeout)
5454
if timedOut {
5555
logger.Info(fmt.Sprintf("WorkerPool flush timedout %t", timedOut))
56+
} else {
57+
logger.Info("WorkerPool flushed all events")
5658
}
5759
flushInterval := config.PublisherKafka.FlushInterval
5860
logger.Info("Closing Kafka producer")
5961
logger.Info(fmt.Sprintf("Wait %d ms for all messages to be delivered", flushInterval))
6062
eventsInProducer := kp.Close()
61-
/**
62-
@TODO - should compute the actual no., of events per batch and therefore the total. We can do this only when we close all the active connections
63-
Until then we fall back to approximation */
64-
eventsInChannel := len(bufferChannel) * 7
65-
logger.Info(fmt.Sprintf("Outstanding unprocessed events in the channel, data lost ~ (No batches %d * 5 events) = ~%d", len(bufferChannel), eventsInChannel))
66-
metrics.Count("kafka_messages_delivered_total", eventsInChannel+eventsInProducer, "success=false")
63+
eventCountInChannel := 0
64+
for i := 0; i < len(bufferChannel); i++ {
65+
req := <-bufferChannel
66+
eventCountInChannel += len(req.Events)
67+
}
68+
metrics.Count("shutdown_event_drops", eventCountInChannel+eventsInProducer, "")
6769
logger.Info("Exiting server")
6870
cancel()
6971
default:
@@ -72,21 +74,28 @@ func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices
7274
}
7375
}
7476

75-
func reportProcMetrics() {
76-
t := time.Tick(config.MetricStatsd.FlushPeriodMs)
77+
func reportProcMetrics(ctx context.Context) {
78+
ticker := time.NewTicker(config.MetricStatsd.FlushPeriodMs)
79+
defer ticker.Stop()
80+
7781
m := &runtime.MemStats{}
7882
for {
79-
<-t
80-
metrics.Gauge("server_go_routines_count_current", runtime.NumGoroutine(), "")
83+
select {
84+
case <-ctx.Done():
85+
logger.Info("Stopping proc metrics reporter")
86+
return
87+
case <-ticker.C:
88+
metrics.Gauge("server_go_routines_count_current", runtime.NumGoroutine(), "")
8189

82-
runtime.ReadMemStats(m)
83-
metrics.Gauge("server_mem_heap_alloc_bytes_current", m.HeapAlloc, "")
84-
metrics.Gauge("server_mem_heap_inuse_bytes_current", m.HeapInuse, "")
85-
metrics.Gauge("server_mem_heap_objects_total_current", m.HeapObjects, "")
86-
metrics.Gauge("server_mem_stack_inuse_bytes_current", m.StackInuse, "")
87-
metrics.Gauge("server_mem_gc_triggered_current", m.LastGC/1000, "")
88-
metrics.Gauge("server_mem_gc_pauseNs_current", m.PauseNs[(m.NumGC+255)%256]/1000, "")
89-
metrics.Gauge("server_mem_gc_count_current", m.NumGC, "")
90-
metrics.Gauge("server_mem_gc_pauseTotalNs_current", m.PauseTotalNs, "")
90+
runtime.ReadMemStats(m)
91+
metrics.Gauge("server_mem_heap_alloc_bytes_current", m.HeapAlloc, "")
92+
metrics.Gauge("server_mem_heap_inuse_bytes_current", m.HeapInuse, "")
93+
metrics.Gauge("server_mem_heap_objects_total_current", m.HeapObjects, "")
94+
metrics.Gauge("server_mem_stack_inuse_bytes_current", m.StackInuse, "")
95+
metrics.Gauge("server_mem_gc_triggered_current", m.LastGC/1000, "")
96+
metrics.Gauge("server_mem_gc_pauseNs_current", m.PauseNs[(m.NumGC+255)%256]/1000, "")
97+
metrics.Gauge("server_mem_gc_count_current", m.NumGC, "")
98+
metrics.Gauge("server_mem_gc_pauseTotalNs_current", m.PauseTotalNs, "")
99+
}
91100
}
92101
}

publisher/kafka.go

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package publisher
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
67
"strings"
@@ -108,35 +109,41 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
108109
return BulkError{Errors: errors}
109110
}
110111

111-
func (pr *Kafka) ReportStats() {
112-
for v := range pr.kp.Events() {
113-
switch e := v.(type) {
114-
case *kafka.Stats:
115-
var stats map[string]interface{}
116-
if err := json.Unmarshal([]byte(e.String()), &stats); err != nil {
117-
logger.Errorf("failed to unmarshal kafka stats: %v", err)
118-
continue
119-
}
120-
brokersRawJson, ok := stats["brokers"]
121-
if !ok || brokersRawJson == nil {
122-
logger.Errorf("kafka broker stats missing or null brokers field")
123-
continue
124-
}
125-
brokers := brokersRawJson.(map[string]interface{})
126-
metrics.Gauge("kafka_tx_messages_total", stats["txmsgs"], "")
127-
metrics.Gauge("kafka_tx_messages_bytes_total", stats["txmsg_bytes"], "")
128-
for _, broker := range brokers {
129-
brokerStats := broker.(map[string]interface{})
130-
rttValue := brokerStats["rtt"].(map[string]interface{})
131-
nodeName := strings.Split(brokerStats["nodename"].(string), ":")[0]
132-
133-
metrics.Gauge("kafka_brokers_tx_total", brokerStats["tx"], fmt.Sprintf("broker=%s", nodeName))
134-
metrics.Gauge("kafka_brokers_tx_bytes_total", brokerStats["txbytes"], fmt.Sprintf("broker=%s", nodeName))
135-
metrics.Gauge("kafka_brokers_rtt_average_milliseconds", rttValue["avg"], fmt.Sprintf("broker=%s", nodeName))
136-
}
112+
func (pr *Kafka) ReportStats(ctx context.Context) {
113+
for {
114+
select {
115+
case <-ctx.Done():
116+
logger.Info("Stopping Kafka stats reporter")
117+
return
118+
case v := <-pr.kp.Events():
119+
switch e := v.(type) {
120+
case *kafka.Stats:
121+
var stats map[string]interface{}
122+
if err := json.Unmarshal([]byte(e.String()), &stats); err != nil {
123+
logger.Errorf("failed to unmarshal kafka stats: %v", err)
124+
continue
125+
}
126+
brokersRawJson, ok := stats["brokers"]
127+
if !ok || brokersRawJson == nil {
128+
logger.Errorf("kafka broker stats missing or null brokers field")
129+
continue
130+
}
131+
brokers := brokersRawJson.(map[string]interface{})
132+
metrics.Gauge("kafka_tx_messages_total", stats["txmsgs"], "")
133+
metrics.Gauge("kafka_tx_messages_bytes_total", stats["txmsg_bytes"], "")
134+
for _, broker := range brokers {
135+
brokerStats := broker.(map[string]interface{})
136+
rttValue := brokerStats["rtt"].(map[string]interface{})
137+
nodeName := strings.Split(brokerStats["nodename"].(string), ":")[0]
138+
139+
metrics.Gauge("kafka_brokers_tx_total", brokerStats["tx"], fmt.Sprintf("broker=%s", nodeName))
140+
metrics.Gauge("kafka_brokers_tx_bytes_total", brokerStats["txbytes"], fmt.Sprintf("broker=%s", nodeName))
141+
metrics.Gauge("kafka_brokers_rtt_average_milliseconds", rttValue["avg"], fmt.Sprintf("broker=%s", nodeName))
142+
}
137143

138-
default:
139-
fmt.Printf("Ignored %v \n", e)
144+
default:
145+
fmt.Printf("Ignored %v \n", e)
146+
}
140147
}
141148
}
142149
}

services/rest/service.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package rest
33
import (
44
"context"
55
"fmt"
6+
"github.com/goto/raccoon/logger"
67
"net/http"
78
"time"
89

@@ -17,16 +18,18 @@ import (
1718
type Service struct {
1819
Collector collection.Collector
1920
s *http.Server
21+
cancel context.CancelFunc
2022
}
2123

2224
func NewRestService(c collection.Collector) *Service {
25+
ctx, cancel := context.WithCancel(context.Background())
2326
pingChannel := make(chan connection.Conn, config.ServerWs.ServerMaxConn)
2427
wh := websocket.NewHandler(pingChannel, c)
25-
go websocket.Pinger(pingChannel, config.ServerWs.PingerSize, config.ServerWs.PingInterval, config.ServerWs.WriteWaitInterval)
28+
go websocket.Pinger(ctx, pingChannel, config.ServerWs.PingerSize, config.ServerWs.PingInterval, config.ServerWs.WriteWaitInterval)
2629

27-
go reportConnectionMetrics(*wh.Table())
30+
go reportConnectionMetrics(ctx, *wh.Table())
2831

29-
go websocket.AckHandler(websocket.AckChan)
32+
go websocket.AckHandler(ctx, websocket.AckChan)
3033

3134
restHandler := NewHandler(c)
3235
router := mux.NewRouter()
@@ -42,6 +45,7 @@ func NewRestService(c collection.Collector) *Service {
4245
return &Service{
4346
s: server,
4447
Collector: c,
48+
cancel: cancel,
4549
}
4650
}
4751

@@ -50,13 +54,21 @@ func pingHandler(w http.ResponseWriter, r *http.Request) {
5054
w.Write([]byte("pong"))
5155
}
5256

53-
func reportConnectionMetrics(conn connection.Table) {
54-
t := time.Tick(config.MetricStatsd.FlushPeriodMs)
57+
func reportConnectionMetrics(ctx context.Context, conn connection.Table) {
58+
ticker := time.NewTicker(config.MetricStatsd.FlushPeriodMs)
59+
defer ticker.Stop()
60+
5561
for {
56-
<-t
57-
conn.RangeConnectionPerGroup(func(k string, v int) {
58-
metrics.Gauge("connections_count_current", v, fmt.Sprintf("conn_group=%s", k))
59-
})
62+
select {
63+
case <-ticker.C:
64+
conn.RangeConnectionPerGroup(func(k string, v int) {
65+
metrics.Gauge("connections_count_current", v, fmt.Sprintf("conn_group=%s", k))
66+
})
67+
case <-ctx.Done():
68+
// cleanup on shutdown
69+
logger.Info("[metrics.reportConnectionMetrics] - stopping metrics reporter")
70+
return
71+
}
6072
}
6173
}
6274

@@ -69,5 +81,6 @@ func (*Service) Name() string {
6981
}
7082

7183
func (s *Service) Shutdown(ctx context.Context) error {
84+
s.cancel()
7285
return s.s.Shutdown(ctx)
7386
}

services/rest/websocket/ack.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package websocket
22

33
import (
4+
"context"
5+
"github.com/goto/raccoon/logger"
46
"time"
57

68
"github.com/goto/raccoon/metrics"
@@ -20,19 +22,27 @@ type AckInfo struct {
2022
AckTimeConsumed time.Time
2123
}
2224

23-
func AckHandler(ch <-chan AckInfo) {
24-
for c := range ch {
25-
ackTim := time.Since(c.AckTimeConsumed)
26-
metrics.Timing("ack_event_rtt_ms", ackTim.Milliseconds(), "")
25+
func AckHandler(ctx context.Context, ch <-chan AckInfo) {
26+
for {
27+
select {
28+
case c := <-ch:
29+
ackTim := time.Since(c.AckTimeConsumed)
30+
metrics.Timing("ack_event_rtt_ms", ackTim.Milliseconds(), "")
31+
32+
tim := time.Since(c.TimeConsumed)
33+
if c.Err != nil {
34+
metrics.Timing("event_rtt_ms", tim.Milliseconds(), "")
35+
writeFailedResponse(c.Conn, c.serializer, c.MessageType, c.RequestGuid, c.Err)
36+
continue
37+
}
2738

28-
tim := time.Since(c.TimeConsumed)
29-
if c.Err != nil {
3039
metrics.Timing("event_rtt_ms", tim.Milliseconds(), "")
31-
writeFailedResponse(c.Conn, c.serializer, c.MessageType, c.RequestGuid, c.Err)
32-
continue
33-
}
40+
writeSuccessResponse(c.Conn, c.serializer, c.MessageType, c.RequestGuid)
3441

35-
metrics.Timing("event_rtt_ms", tim.Milliseconds(), "")
36-
writeSuccessResponse(c.Conn, c.serializer, c.MessageType, c.RequestGuid)
42+
case <-ctx.Done():
43+
// graceful shutdown
44+
logger.Info("[AckHandler] - stopping ack handler exiting")
45+
return
46+
}
3747
}
3848
}

services/rest/websocket/pinger.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package websocket
22

33
import (
4+
"context"
45
"fmt"
56
"time"
67

@@ -11,13 +12,24 @@ import (
1112
)
1213

1314
// Pinger is worker that pings the connected peers based on ping interval.
14-
func Pinger(c chan connection.Conn, size int, PingInterval time.Duration, WriteWaitInterval time.Duration) {
15+
func Pinger(ctx context.Context, c chan connection.Conn, size int, PingInterval time.Duration, WriteWaitInterval time.Duration) {
16+
//shutdown the pinger
1517
for i := 0; i < size; i++ {
1618
go func() {
1719
cSet := make(map[identification.Identifier]connection.Conn)
1820
ticker := time.NewTicker(PingInterval)
21+
defer ticker.Stop()
1922
for {
2023
select {
24+
//close the connection
25+
case <-ctx.Done():
26+
// shutdown signal received
27+
logger.Infof("[websocket.pinger] shutting down, closing %d active connections", len(cSet))
28+
for _, conn := range cSet {
29+
conn.Close()
30+
}
31+
logger.Info("[websocket.pinger] - shutting down, successful")
32+
return
2133
case conn := <-c:
2234
cSet[conn.Identifier] = conn
2335
case <-ticker.C:

0 commit comments

Comments
 (0)