Skip to content

Commit 89a6376

Browse files
JamesMBartlettcopybaranaut
authored andcommitted
[Golang API] Add support for resuming execution after transient failures of the GRPC connection.
Summary: The Query Broker is setup to accept ExecuteScript requests that are pointed at a particular query ID. This diff adds support to the Golang API for resuming queries after the grpc connection fails with Unavailable or RST_STREAM. The retry/resumption respects the original context the user passed into ExecuteScript, and as long as that context is valid, it will retry indefinitely. Test Plan: Tested that query resumption works by deploying a vizier with Read/WriteTimeouts set to 60s on the grpc servers, and then seeing that the query failed and then was resumed after 60s. Reviewers: michelle, vihang, philkuz, zasgar Reviewed By: philkuz Signed-off-by: James Bartlett <[email protected]> Differential Revision: https://phab.corp.pixielabs.ai/D12305 GitOrigin-RevId: 1588c30d50651e00d25eba976e0ffa66f2697b24
1 parent 06ff452 commit 89a6376

File tree

3 files changed

+58
-0
lines changed

3 files changed

+58
-0
lines changed

BUILD.bazel

+2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@ go_library(
3535
"//src/api/proto/cloudpb:cloudapi_pl_go_proto",
3636
"//src/api/proto/vizierpb:vizier_pl_go_proto",
3737
"@org_golang_google_grpc//:go_default_library",
38+
"@org_golang_google_grpc//codes",
3839
"@org_golang_google_grpc//credentials",
3940
"@org_golang_google_grpc//metadata",
41+
"@org_golang_google_grpc//status",
4042
],
4143
)
4244

results.go

+53
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,16 @@ package pxapi
2020

2121
import (
2222
"context"
23+
"errors"
24+
"fmt"
2325
"io"
26+
"strings"
2427
"sync"
2528
"time"
2629

30+
"google.golang.org/grpc/codes"
31+
"google.golang.org/grpc/status"
32+
2733
"px.dev/pxapi/errdefs"
2834
"px.dev/pxapi/types"
2935
"px.dev/pxapi/utils"
@@ -59,6 +65,10 @@ type ScriptResults struct {
5965
wg sync.WaitGroup
6066

6167
stats *ResultsStats
68+
69+
v *VizierClient
70+
queryID string
71+
origCtx context.Context
6272
}
6373

6474
func newScriptResults() *ScriptResults {
@@ -133,6 +143,37 @@ func (s *ScriptResults) handleGRPCMsg(ctx context.Context, resp *vizierpb.Execut
133143
return errdefs.ErrInternalUnImplementedType
134144
}
135145

146+
func isTransientGRPCError(err error) bool {
147+
s, ok := status.FromError(err)
148+
if !ok {
149+
return false
150+
}
151+
if s.Code() == codes.Internal && strings.Contains(s.Message(), "RST_STREAM") {
152+
return true
153+
}
154+
return false
155+
}
156+
157+
func (s *ScriptResults) reconnect() error {
158+
if s.queryID == "" {
159+
return errors.New("cannot reconnect to query that hasn't returned a QueryID yet")
160+
}
161+
req := &vizierpb.ExecuteScriptRequest{
162+
ClusterID: s.v.vizierID,
163+
QueryID: s.queryID,
164+
EncryptionOptions: s.v.encOpts,
165+
}
166+
ctx, cancel := context.WithCancel(s.origCtx)
167+
res, err := s.v.vzClient.ExecuteScript(s.v.cloud.cloudCtxWithMD(ctx), req)
168+
if err != nil {
169+
cancel()
170+
return err
171+
}
172+
s.cancel = cancel
173+
s.c = res
174+
return nil
175+
}
176+
136177
func (s *ScriptResults) run() error {
137178
ctx := s.c.Context()
138179
for {
@@ -143,11 +184,23 @@ func (s *ScriptResults) run() error {
143184
// Stream has terminated.
144185
return nil
145186
}
187+
if isTransientGRPCError(err) {
188+
origErr := err
189+
err = s.reconnect()
190+
if err != nil {
191+
return fmt.Errorf("streaming failed: %w, error occurred while reconnecting: %v", origErr, err)
192+
}
193+
ctx = s.c.Context()
194+
continue
195+
}
146196
return err
147197
}
148198
if resp == nil {
149199
return nil
150200
}
201+
if s.queryID == "" {
202+
s.queryID = resp.QueryID
203+
}
151204
if err := s.handleGRPCMsg(ctx, resp); err != nil {
152205
return err
153206
}

vizier.go

+3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func (v *VizierClient) ExecuteScript(ctx context.Context, pxl string, mux TableM
4242
QueryStr: pxl,
4343
EncryptionOptions: v.encOpts,
4444
}
45+
origCtx := ctx
4546
ctx, cancel := context.WithCancel(ctx)
4647
res, err := v.vzClient.ExecuteScript(v.cloud.cloudCtxWithMD(ctx), req)
4748
if err != nil {
@@ -54,6 +55,8 @@ func (v *VizierClient) ExecuteScript(ctx context.Context, pxl string, mux TableM
5455
sr.cancel = cancel
5556
sr.tm = mux
5657
sr.decOpts = v.decOpts
58+
sr.v = v
59+
sr.origCtx = origCtx
5760

5861
return sr, nil
5962
}

0 commit comments

Comments
 (0)