Skip to content

Commit 3a81055

Browse files
committed
ING-1399: Add long running request case for graceful shutdown
1 parent cca7a7c commit 3a81055

2 files changed

Lines changed: 50 additions & 3 deletions

File tree

gateway/system/system.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9+
"net"
910
"net/http"
1011
"sync"
1112
"time"
@@ -66,6 +67,8 @@ type System struct {
6667

6768
dataServer *grpc.Server
6869
dapiServer *http.Server
70+
71+
cancelFunc context.CancelFunc
6972
}
7073

7174
func NewSystem(opts *SystemOptions) (*System, error) {
@@ -231,17 +234,21 @@ func NewSystem(opts *SystemOptions) (*System, error) {
231234
}
232235
httpHandler = c.Handler(httpHandler)
233236

237+
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
234238
dapiSrv := &http.Server{
235239
ReadHeaderTimeout: time.Second * 5,
236240
IdleTimeout: time.Second * 60,
237241
Handler: httpHandler,
238242
TLSConfig: opts.DapiTlsConfig,
243+
BaseContext: func(net.Listener) context.Context { return shutdownCtx },
239244
}
240245

241246
s := &System{
242247
logger: opts.Logger,
243248
dataServer: dataSrv,
244249
dapiServer: dapiSrv,
250+
251+
cancelFunc: shutdownCancel,
245252
}
246253

247254
return s, nil
@@ -299,7 +306,10 @@ func (s *System) Shutdown() {
299306
defer wg.Done()
300307
s.dapiServer.SetKeepAlivesEnabled(false)
301308
time.Sleep(time.Second * 5)
302-
_ = s.dapiServer.Shutdown(context.Background())
309+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
310+
defer cancel()
311+
_ = s.dapiServer.Shutdown(ctx)
312+
s.cancelFunc()
303313
}()
304314
}
305315

gateway/test/dapi_graceful_shutdown_test.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package test
22

33
import (
4+
"bytes"
45
"context"
56
"crypto/tls"
67
"errors"
@@ -44,6 +45,8 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
4445
GrpcCertificate: *gwCert,
4546
DapiCertificate: *gwCert,
4647
NumInstances: 1,
48+
ProxyServices: []string{"query"},
49+
Debug: true,
4750

4851
StartupCallback: func(m *gateway.StartupInfo) {
4952
gwStartInfoCh <- m
@@ -94,6 +97,8 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
9497
var eofs int
9598
var unexpectedErr error
9699
wg.Add(1)
100+
101+
// Continually run basic requests against the gateway and continue during shutdown.
97102
go func() {
98103
defer wg.Done()
99104

@@ -104,7 +109,7 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
104109
case errors.Is(err, io.EOF):
105110
// There is a small window between the flushing the request bytes to the socket and the http handler receiving
106111
// the bytes where the server sees the connection as idle and will close it. We record these errors and
107-
// include them when checking all requests recieved responses.
112+
// include them when checking all requests received responses.
108113
eofs++
109114
case errors.Is(err, syscall.ECONNREFUSED):
110115
// This is what we expect to see once the listeners have closed
@@ -113,7 +118,7 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
113118
// wroteRequest hook has not triggered. Such racy errors are unavoidable when running directly against an
114119
// http server.
115120
default:
116-
// Any errors not mentinoned above are not expected and should cause a failure to be investigated.
121+
// Any errors not mentioned above are not expected and should cause a failure to be investigated.
117122
unexpectedErr = err
118123
return
119124
}
@@ -126,9 +131,41 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
126131
}
127132
}()
128133

134+
wg.Add(1)
135+
136+
slowReqSent := make(chan any)
137+
// Send a long running request (this one takes approx 2 mins) and ensure that the gateway forcibly stops this request
138+
// during shutdown.
139+
go func() {
140+
defer wg.Done()
141+
142+
statement := []byte(`{"statement": "SELECT SUM(t + t2) AS total FROM ARRAY_RANGE(0,8000) AS t, ARRAY_RANGE(0,4000) AS t2"}`)
143+
reqBody := bytes.NewReader(statement)
144+
145+
req, err := http.NewRequestWithContext(context.Background(), "POST", fmt.Sprintf("https://%s/_p/query/query/service", dapiAddr), reqBody)
146+
assert.NoError(s.T(), err, "failed to create request with client trace")
147+
148+
req.SetBasicAuth(testConfig.CbUser, testConfig.CbPass)
149+
req.Header.Set("Content-Type", "application/json")
150+
151+
slowReqSent <- struct{}{}
152+
resp, err := dapiCli.Do(req)
153+
assert.NoError(s.T(), err)
154+
assert.Equal(s.T(), http.StatusBadGateway, resp.StatusCode, "expected to receive a 502 response from the gateway")
155+
156+
defer func() { _ = resp.Body.Close() }()
157+
body, err := io.ReadAll(resp.Body)
158+
assert.NoError(s.T(), err)
159+
160+
assert.Equal(s.T(), "failed to receive response: context canceled", string(body))
161+
}()
162+
129163
// Allow some requests to run against the gateway before shutting down
130164
time.Sleep(time.Second)
131165

166+
// Wait for the slow running request to be sent before starting shutdown
167+
<-slowReqSent
168+
132169
gw.Shutdown()
133170

134171
wg.Wait()

0 commit comments

Comments
 (0)