|
1 | 1 | package test |
2 | 2 |
|
3 | 3 | import ( |
| 4 | + "bytes" |
4 | 5 | "context" |
5 | 6 | "crypto/tls" |
6 | 7 | "errors" |
@@ -44,6 +45,8 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() { |
44 | 45 | GrpcCertificate: *gwCert, |
45 | 46 | DapiCertificate: *gwCert, |
46 | 47 | NumInstances: 1, |
| 48 | + ProxyServices: []string{"query"}, |
| 49 | + Debug: true, |
47 | 50 |
|
48 | 51 | StartupCallback: func(m *gateway.StartupInfo) { |
49 | 52 | gwStartInfoCh <- m |
@@ -94,6 +97,8 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() { |
94 | 97 | var eofs int |
95 | 98 | var unexpectedErr error |
96 | 99 | wg.Add(1) |
| 100 | + |
| 101 | + // Continually run basic requests against the gateway and continue during shutdown. |
97 | 102 | go func() { |
98 | 103 | defer wg.Done() |
99 | 104 |
|
@@ -126,9 +131,41 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() { |
126 | 131 | } |
127 | 132 | }() |
128 | 133 |
|
| 134 | + wg.Add(1) |
| 135 | + |
| 136 | + slowReqSent := make(chan any) |
| 137 | + // Send a long running request (this one takes approx 2 mins) and ensure that the gateway forcibly stops this request |
| 138 | + // during shutdown. |
| 139 | + go func() { |
| 140 | + defer wg.Done() |
| 141 | + |
| 142 | + statement := []byte(`{"statement": "SELECT SUM(t + t2) AS total FROM ARRAY_RANGE(0,8000) AS t, ARRAY_RANGE(0,4000) AS t2"}`) |
| 143 | + reqBody := bytes.NewReader(statement) |
| 144 | + |
| 145 | + req, err := http.NewRequestWithContext(context.Background(), "POST", fmt.Sprintf("https://%s/_p/query/query/service", dapiAddr), reqBody) |
| 146 | + assert.NoError(s.T(), err, "failed to create request with client trace") |
| 147 | + |
| 148 | + req.SetBasicAuth(testConfig.CbUser, testConfig.CbPass) |
| 149 | + req.Header.Set("Content-Type", "application/json") |
| 150 | + |
| 151 | + slowReqSent <- struct{}{} |
| 152 | + resp, err := dapiCli.Do(req) |
| 153 | + assert.NoError(s.T(), err) |
| 154 | + assert.Equal(s.T(), http.StatusBadGateway, resp.StatusCode, "expected to recieve a 502 response from the gateway") |
| 155 | + |
| 156 | + defer func() { _ = resp.Body.Close() }() |
| 157 | + body, err := io.ReadAll(resp.Body) |
| 158 | + assert.NoError(s.T(), err) |
| 159 | + |
| 160 | + assert.Equal(s.T(), string(body), "failed to receive response: context canceled") |
| 161 | + }() |
| 162 | + |
129 | 163 | // Allow some requests to run against the gateway before shutting down |
130 | 164 | time.Sleep(time.Second) |
131 | 165 |
|
| 166 | + // Wait for the slow running request to be sent before starting shutdown |
| 167 | + <-slowReqSent |
| 168 | + |
132 | 169 | gw.Shutdown() |
133 | 170 |
|
134 | 171 | wg.Wait() |
|
0 commit comments