Skip to content

Commit 85426a2

Browse files
committed
ING-1399: Add long running request case for graceful shutdown
1 parent cca7a7c commit 85426a2

3 files changed

Lines changed: 46 additions & 4 deletions

File tree

gateway/gateway.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,8 @@ func (g *Gateway) Run(ctx context.Context) error {
409409
Password: config.Password,
410410
})
411411

412+
ctx, cancel := context.WithCancel(ctx)
413+
412414
config.Logger.Info("initializing protostellar system")
413415
gatewaySys, err := system.NewSystem(&system.SystemOptions{
414416
Logger: config.Logger.Named("gateway-system"),
@@ -432,6 +434,7 @@ func (g *Gateway) Run(ctx context.Context) error {
432434
},
433435
AlphaEndpoints: config.AlphaEndpoints,
434436
Debug: config.Debug,
437+
Cancel: cancel,
435438
})
436439
if err != nil {
437440
config.Logger.Error("error creating legacy proxy")

gateway/system/system.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,17 @@ type SystemOptions struct {
5959
DapiTlsConfig *tls.Config
6060
AlphaEndpoints bool
6161
Debug bool
62+
63+
Cancel context.CancelFunc
6264
}
6365

6466
type System struct {
6567
logger *zap.Logger
6668

6769
dataServer *grpc.Server
6870
dapiServer *http.Server
71+
72+
cancelFunc context.CancelFunc
6973
}
7074

7175
func NewSystem(opts *SystemOptions) (*System, error) {
@@ -242,6 +246,7 @@ func NewSystem(opts *SystemOptions) (*System, error) {
242246
logger: opts.Logger,
243247
dataServer: dataSrv,
244248
dapiServer: dapiSrv,
249+
cancelFunc: opts.Cancel,
245250
}
246251

247252
return s, nil
@@ -299,9 +304,12 @@ func (s *System) Shutdown() {
299304
defer wg.Done()
300305
s.dapiServer.SetKeepAlivesEnabled(false)
301306
time.Sleep(time.Second * 5)
302-
_ = s.dapiServer.Shutdown(context.Background())
307+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
308+
defer cancel()
309+
_ = s.dapiServer.Shutdown(ctx)
303310
}()
304311
}
305312

306313
wg.Wait()
314+
s.cancelFunc()
307315
}

gateway/test/dapi_graceful_shutdown_test.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package test
22

33
import (
4+
"bytes"
45
"context"
56
"crypto/tls"
67
"errors"
@@ -44,7 +45,7 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
4445
GrpcCertificate: *gwCert,
4546
DapiCertificate: *gwCert,
4647
NumInstances: 1,
47-
48+
ProxyServices: []string{"query"},
4849
StartupCallback: func(m *gateway.StartupInfo) {
4950
gwStartInfoCh <- m
5051
},
@@ -94,6 +95,8 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
9495
var eofs int
9596
var unexpectedErr error
9697
wg.Add(1)
98+
99+
// Continually run basic requests against the gateway and continue during shutdown.
97100
go func() {
98101
defer wg.Done()
99102

@@ -104,7 +107,7 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
104107
case errors.Is(err, io.EOF):
105108
// There is a small window between the flushing the request bytes to the socket and the http handler receiving
106109
// the bytes where the server sees the connection as idle and will close it. We record these errors and
107-
// include them when checking all requests recieved responses.
110+
// include them when checking all requests received responses.
108111
eofs++
109112
case errors.Is(err, syscall.ECONNREFUSED):
110113
// This is what we expect to see once the listeners have closed
@@ -113,7 +116,7 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
113116
// wroteRequest hook has not triggered. Such racy errors are unavoidable when running directly against an
114117
// http server.
115118
default:
116-
// Any errors not mentinoned above are not expected and should cause a failure to be investigated.
119+
// Any errors not mentioned above are not expected and should cause a failure to be investigated.
117120
unexpectedErr = err
118121
return
119122
}
@@ -126,9 +129,37 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
126129
}
127130
}()
128131

132+
wg.Add(1)
133+
134+
slowReqSent := make(chan any)
135+
// Send a long running request (this one takes approx 2 mins) and ensure that the gateway forcibly stops this request
136+
// during shutdown.
137+
go func() {
138+
defer wg.Done()
139+
140+
statement := []byte(`{"statement": "SELECT SUM(t + t2) AS total FROM ARRAY_RANGE(0,8000) AS t, ARRAY_RANGE(0,4000) AS t2"}`)
141+
reqBody := bytes.NewReader(statement)
142+
143+
req, err := http.NewRequestWithContext(context.Background(), "POST", fmt.Sprintf("https://%s/_p/query/query/service", dapiAddr), reqBody)
144+
assert.NoError(s.T(), err, "failed to create request with client trace")
145+
146+
req.SetBasicAuth(testConfig.CbUser, testConfig.CbPass)
147+
req.Header.Set("Content-Type", "application/json")
148+
149+
slowReqSent <- struct{}{}
150+
_, err = dapiCli.Do(req)
151+
152+
// After the configured period shutdown will reuturn and we cancel the context. This triggers the go routine in Serve()
153+
// that stops the http server forcibly, causing EOFs on existing connections doing work.
154+
assert.ErrorIs(s.T(), err, io.EOF)
155+
}()
156+
129157
// Allow some requests to run against the gateway before shutting down
130158
time.Sleep(time.Second)
131159

160+
// Wait for the slow running request to be sent before starting shutdown
161+
<-slowReqSent
162+
132163
gw.Shutdown()
133164

134165
wg.Wait()

0 commit comments

Comments
 (0)