Skip to content

Commit 58974dd

Browse files
committed
ING-1429: Add shutdown timeout to grpc server
1 parent 52d271d commit 58974dd

2 files changed

Lines changed: 57 additions & 6 deletions

File tree

gateway/system/system.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
)
4747

4848
const maxMsgSize = 25 * 1024 * 1024 // 25MiB
49+
const shutdownTimeout = 30 * time.Second
4950

5051
type SystemOptions struct {
5152
Logger *zap.Logger
@@ -301,7 +302,18 @@ func (s *System) Shutdown() {
301302
wg.Add(1)
302303
go func() {
303304
defer wg.Done()
304-
s.dataServer.GracefulStop()
305+
306+
// GracefulStop has no timeout mechanism so we need to take this approach
307+
done := make(chan struct{})
308+
go func() {
309+
s.dataServer.GracefulStop()
310+
close(done)
311+
}()
312+
313+
select {
314+
case <-done:
315+
case <-time.After(shutdownTimeout):
316+
}
305317
}()
306318
}
307319

gateway/test/dapi_graceful_shutdown_test.go renamed to gateway/test/graceful_shutdown_test.go

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,17 @@ import (
1414
"syscall"
1515
"time"
1616

17+
"github.com/couchbase/goprotostellar/genproto/query_v1"
1718
"github.com/couchbase/stellar-gateway/gateway"
1819
"github.com/couchbase/stellar-gateway/testutils"
1920
"github.com/couchbase/stellar-gateway/utils/selfsignedcert"
2021
"github.com/stretchr/testify/assert"
2122
"github.com/stretchr/testify/require"
2223
"go.uber.org/zap"
24+
"google.golang.org/grpc"
25+
"google.golang.org/grpc/codes"
26+
"google.golang.org/grpc/credentials"
27+
"google.golang.org/grpc/status"
2328
)
2429

2530
func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
@@ -56,8 +61,12 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
5661
s.T().Fatalf("failed to initialize graceful-shutdown-gateway: %s", err)
5762
}
5863

64+
var wg sync.WaitGroup
65+
5966
gwClosedCh := make(chan struct{})
67+
wg.Add(1)
6068
go func() {
69+
defer wg.Done()
6170
err := gw.Run(context.Background())
6271
if err != nil {
6372
s.T().Errorf("graceful-shutdown-gateway run failed: %s", err)
@@ -69,6 +78,13 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
6978

7079
startInfo := <-gwStartInfoCh
7180

81+
connAddr := fmt.Sprintf("%s:%d", "127.0.0.1", startInfo.ServicePorts.PS)
82+
conn, err := grpc.NewClient(connAddr,
83+
grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})),
84+
grpc.WithUserAgent("test-client"),
85+
)
86+
assert.NoError(s.T(), err, "failed to create grpc client connection to gateway")
87+
7288
dapiAddr := fmt.Sprintf("%s:%d", "127.0.0.1", startInfo.ServicePorts.DAPI)
7389
dapiCli := &http.Client{
7490
Transport: &http.Transport{
@@ -92,7 +108,6 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
92108
assert.NoError(s.T(), err)
93109

94110
respCloseChan := make(chan (bool), 10000)
95-
var wg sync.WaitGroup
96111

97112
var eofs int
98113
var unexpectedErr error
@@ -133,13 +148,36 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
133148

134149
wg.Add(1)
135150

151+
slowQuery := "SELECT SUM(t + t2) AS total FROM ARRAY_RANGE(0,8000) AS t, ARRAY_RANGE(0,4000) AS t2"
152+
slowGrpcReqSent := make(chan any)
153+
go func() {
154+
defer wg.Done()
155+
queryClient := query_v1.NewQueryServiceClient(conn)
156+
157+
slowGrpcReqSent <- struct{}{}
158+
client, err := queryClient.Query(context.Background(), &query_v1.QueryRequest{
159+
BucketName: &s.bucketName,
160+
Statement: slowQuery,
161+
}, grpc.PerRPCCredentials(s.basicRpcCreds))
162+
assert.NoError(s.T(), err, "failed to send grpc query request")
163+
164+
_, err = client.Recv()
165+
assert.Error(s.T(), err, "expected error receiving from grpc stream after shutdown")
166+
167+
st, ok := status.FromError(err)
168+
assert.True(s.T(), ok)
169+
assert.Equal(s.T(), codes.Unavailable, st.Code())
170+
assert.Contains(s.T(), st.Message(), "EOF")
171+
}()
172+
173+
wg.Add(1)
136174
slowReqSent := make(chan any)
137175
// Send a long running request (this one takes approx 2 mins) and ensure that the gateway forcibly stops this request
138176
// during shutdown.
139177
go func() {
140178
defer wg.Done()
141179

142-
statement := []byte(`{"statement": "SELECT SUM(t + t2) AS total FROM ARRAY_RANGE(0,8000) AS t, ARRAY_RANGE(0,4000) AS t2"}`)
180+
statement := []byte(fmt.Sprintf(`{"statement": "%s"}`, slowQuery))
143181
reqBody := bytes.NewReader(statement)
144182

145183
req, err := http.NewRequestWithContext(context.Background(), "POST", fmt.Sprintf("https://%s/_p/query/query/service", dapiAddr), reqBody)
@@ -156,11 +194,12 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
156194
assert.ErrorIs(s.T(), err, io.EOF)
157195
}()
158196

159-
// Allow some requests to run against the gateway before shutting down
160-
time.Sleep(time.Second)
161-
162197
// Wait for the slow running request to be sent before starting shutdown
163198
<-slowReqSent
199+
<-slowGrpcReqSent
200+
201+
// Allow the server to start processing the requests
202+
time.Sleep(time.Second * 5)
164203

165204
gw.Shutdown()
166205

0 commit comments

Comments
 (0)