Skip to content

Commit 1b7c9e6

Browse files
committed
ING-1399: Fully mirror grpc hooks for Data API
1 parent 80e2fb1 commit 1b7c9e6

5 files changed

Lines changed: 174 additions & 55 deletions

File tree

gateway/hooks/hookscontext.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package hooks
22

33
import (
44
"context"
5+
"net/http"
56
"sync"
67

78
"github.com/couchbase/goprotostellar/genproto/internal_hooks_v1"
@@ -138,3 +139,35 @@ func (i *HooksContext) acquireRunLock(ctx context.Context) error {
138139
func (i *HooksContext) releaseRunLock() {
139140
i.lock.Unlock()
140141
}
142+
143+
func (i *HooksContext) dispatchHTTPReqToWatchers(r *http.Request) {
144+
headers := make(map[string][]string)
145+
for key, values := range r.Header {
146+
headers[key] = values
147+
}
148+
149+
reqInfo := &RequestInfo{
150+
MetaData: headers,
151+
FullMethod: r.URL.Path,
152+
HTTPRequest: r,
153+
}
154+
155+
i.reqWatchers.Send(reqInfo)
156+
}
157+
158+
func (i *HooksContext) HandleHTTPRequest(w http.ResponseWriter, r *http.Request, next http.Handler) {
159+
i.dispatchHTTPReqToWatchers(r)
160+
161+
hook := i.findHook(r.URL.Path)
162+
if hook == nil {
163+
next.ServeHTTP(w, r)
164+
return
165+
}
166+
167+
i.logger.Info("calling registered http hook", zap.Any("hook", hook))
168+
169+
rs := newHTTPRunState(i, w, next, hook, i.logger.Named("run-state"))
170+
rs.Run(r.Context(), r) //nolint:errcheck
171+
}
172+
173+

gateway/hooks/httpinterceptor.go

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,8 @@ func makeHTTPMiddleware(manager *HooksManager, log *zap.Logger) func(http.Handle
2121
return
2222
}
2323

24-
barrierID := r.Header.Get("X-Barrier-ID")
25-
if barrierID == "" {
26-
next.ServeHTTP(w, r)
27-
return
28-
}
29-
30-
log.Info("http request waiting on barrier",
31-
zap.String("hooks-id", hooksID),
32-
zap.String("barrier-id", barrierID))
33-
34-
barrier := hooksContext.GetBarrier(barrierID)
35-
barrier.Wait(r.Context(), hooksID+":"+barrierID, nil)
36-
37-
log.Info("http request done waiting on barrier",
38-
zap.String("hooks-id", hooksID),
39-
zap.String("barrier-id", barrierID))
40-
41-
next.ServeHTTP(w, r)
24+
log.Info("calling registered hooks context", zap.String("hooks-id", hooksID), zap.Any("req", r))
25+
hooksContext.HandleHTTPRequest(w, r, next)
4226
})
4327
}
4428
}

gateway/hooks/reqwatchers.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@ package hooks
22

33
import (
44
"context"
5+
"net/http"
56
"sync"
67

78
"golang.org/x/exp/slices"
89
"google.golang.org/protobuf/types/known/anypb"
910
)
1011

1112
type RequestInfo struct {
12-
MetaData map[string][]string
13-
FullMethod string
14-
Request *anypb.Any
13+
MetaData map[string][]string
14+
FullMethod string
15+
Request *anypb.Any
16+
HTTPRequest *http.Request
1517
}
1618

1719
type requestWatcher struct {

gateway/hooks/runstate.go

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7+
"net/http"
78

89
"github.com/couchbase/goprotostellar/genproto/internal_hooks_v1"
910
"github.com/couchbase/stellar-gateway/contrib/govalcmp"
@@ -12,6 +13,7 @@ import (
1213
"google.golang.org/grpc"
1314
"google.golang.org/grpc/codes"
1415
"google.golang.org/grpc/status"
16+
"google.golang.org/protobuf/encoding/protojson"
1517
)
1618

1719
// We encapsulate all the execution of actions into a runState to allow us to
@@ -21,10 +23,13 @@ type runState struct {
2123
ID string
2224
HooksContext *HooksContext
2325
Handler grpc.UnaryHandler
26+
HTTPHandler http.Handler
27+
HTTPWriter http.ResponseWriter
2428
Hook *internal_hooks_v1.Hook
2529
Logger *zap.Logger
2630
ExecResult interface{}
2731
ExecError error
32+
executed bool
2833
}
2934

3035
func newRunState(
@@ -42,6 +47,23 @@ func newRunState(
4247
}
4348
}
4449

50+
func newHTTPRunState(
51+
hooksContext *HooksContext,
52+
writer http.ResponseWriter,
53+
httpHandler http.Handler,
54+
hook *internal_hooks_v1.Hook,
55+
logger *zap.Logger,
56+
) *runState {
57+
return &runState{
58+
ID: uuid.NewString(),
59+
HooksContext: hooksContext,
60+
HTTPHandler: httpHandler,
61+
HTTPWriter: writer,
62+
Hook: hook,
63+
Logger: logger,
64+
}
65+
}
66+
4567
func (s *runState) Run(ctx context.Context, req interface{}) (interface{}, error) {
4668
// actions run synchronously in respect to all hooks for this HooksContext, so we
4769
// need to acquire the run lock to be able to run...
@@ -61,6 +83,14 @@ func (s *runState) Run(ctx context.Context, req interface{}) (interface{}, error
6183
return resp, err
6284
}
6385

86+
if s.HTTPHandler != nil {
87+
// for HTTP: if Execute was not explicitly called, implicitly call the handler
88+
if !s.executed {
89+
s.HTTPHandler.ServeHTTP(s.HTTPWriter, req.(*http.Request).WithContext(ctx))
90+
}
91+
return nil, nil
92+
}
93+
6494
// if the actions explicitly executed the underlying function, return its value
6595
if s.ExecResult != nil || s.ExecError != nil {
6696
return s.ExecResult, s.ExecError
@@ -314,6 +344,17 @@ func (s *runState) runAction_ReturnResponse(
314344
req interface{},
315345
action *internal_hooks_v1.HookAction_ReturnResponse,
316346
) (interface{}, error) {
347+
if s.HTTPHandler != nil {
348+
jsonBytes, err := protojson.Marshal(action.Value)
349+
if err != nil {
350+
http.Error(s.HTTPWriter, err.Error(), http.StatusInternalServerError)
351+
return nil, nil
352+
}
353+
s.HTTPWriter.Header().Set("Content-Type", "application/json")
354+
s.HTTPWriter.Write(jsonBytes) //nolint:errcheck
355+
s.executed = true
356+
return nil, nil
357+
}
317358
return action.Value, nil
318359
}
319360

@@ -322,20 +363,55 @@ func (s *runState) runAction_ReturnError(
322363
req interface{},
323364
action *internal_hooks_v1.HookAction_ReturnError,
324365
) (interface{}, error) {
366+
if s.HTTPHandler != nil {
367+
http.Error(s.HTTPWriter, action.Message, grpcCodeToHTTPStatus(codes.Code(action.Code)))
368+
s.executed = true
369+
return nil, nil
370+
}
325371
st := status.New(codes.Code(action.Code), action.Message)
326372
for _, detail := range action.Details {
327373
st, _ = st.WithDetails(detail)
328374
}
329-
330375
return nil, st.Err()
331376
}
332377

378+
func grpcCodeToHTTPStatus(code codes.Code) int {
379+
switch code {
380+
case codes.OK:
381+
return http.StatusOK
382+
case codes.InvalidArgument:
383+
return http.StatusBadRequest
384+
case codes.NotFound:
385+
return http.StatusNotFound
386+
case codes.AlreadyExists:
387+
return http.StatusConflict
388+
case codes.PermissionDenied:
389+
return http.StatusForbidden
390+
case codes.Unauthenticated:
391+
return http.StatusUnauthorized
392+
case codes.ResourceExhausted:
393+
return http.StatusTooManyRequests
394+
case codes.Unimplemented:
395+
return http.StatusNotImplemented
396+
case codes.Unavailable:
397+
return http.StatusServiceUnavailable
398+
case codes.DeadlineExceeded:
399+
return http.StatusGatewayTimeout
400+
default:
401+
return http.StatusInternalServerError
402+
}
403+
}
404+
333405
func (s *runState) runAction_Execute(
334406
ctx context.Context,
335407
req interface{},
336408
action *internal_hooks_v1.HookAction_Execute,
337409
) (interface{}, error) {
410+
if s.HTTPHandler != nil {
411+
s.HTTPHandler.ServeHTTP(s.HTTPWriter, req.(*http.Request).WithContext(ctx))
412+
s.executed = true
413+
return nil, nil
414+
}
338415
s.ExecResult, s.ExecError = s.Handler(ctx, req)
339-
340416
return nil, nil
341417
}

gateway/test/graceful_shutdown_test.go

Lines changed: 56 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package test
22

33
import (
4-
"bytes"
54
"context"
65
"crypto/tls"
76
"fmt"
@@ -27,6 +26,41 @@ import (
2726

2827
const shutDownTimeout = 20 * time.Second
2928

29+
func (s *GatewayOpsTestSuite) setupDapiKvGetBarrierHook(ctx context.Context, conn *grpc.ClientConn, hooksContextID, targetPath, barrierID string) {
30+
hooksClient := internal_hooks_v1.NewHooksServiceClient(conn)
31+
32+
_, err := hooksClient.CreateHooksContext(ctx, &internal_hooks_v1.CreateHooksContextRequest{
33+
Id: hooksContextID,
34+
})
35+
require.NoError(s.T(), err, "failed to create hooks context")
36+
37+
_, err = hooksClient.AddHooks(ctx, &internal_hooks_v1.AddHooksRequest{
38+
HooksContextId: hooksContextID,
39+
Hooks: []*internal_hooks_v1.Hook{
40+
{
41+
Name: "block-dapi-kv-get",
42+
Description: "Block HTTP document GET on a barrier to simulate a long-running request",
43+
TargetMethod: targetPath,
44+
Actions: []*internal_hooks_v1.HookAction{
45+
{
46+
Action: &internal_hooks_v1.HookAction_WaitOnBarrier_{
47+
WaitOnBarrier: &internal_hooks_v1.HookAction_WaitOnBarrier{
48+
BarrierId: barrierID,
49+
},
50+
},
51+
},
52+
{
53+
Action: &internal_hooks_v1.HookAction_Execute_{
54+
Execute: &internal_hooks_v1.HookAction_Execute{},
55+
},
56+
},
57+
},
58+
},
59+
},
60+
})
61+
require.NoError(s.T(), err, "failed to add hooks")
62+
}
63+
3064
func (s *GatewayOpsTestSuite) setupKvGetBarrierHook(ctx context.Context, conn *grpc.ClientConn, hooksContextID, barrierID string) {
3165
hooksClient := internal_hooks_v1.NewHooksServiceClient(conn)
3266

@@ -118,26 +152,37 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
118152
},
119153
}
120154

155+
connAddr := fmt.Sprintf("%s:%d", "127.0.0.1", startInfo.ServicePorts.PS)
156+
conn, err := grpc.NewClient(connAddr,
157+
grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})),
158+
grpc.WithUserAgent("test-client"),
159+
)
160+
require.NoError(s.T(), err, "failed to create grpc client connection to gateway")
161+
defer func() { _ = conn.Close() }()
162+
163+
testKey := s.testDocId()
164+
165+
// The call to setupKvGetBarrierHook can hang indefinitely if the hooks service is unreachable so we use a context with a
166+
// generous timeout
167+
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
168+
defer cancel()
169+
170+
docPath := fmt.Sprintf("/v1/buckets/%s/scopes/%s/collections/%s/documents/%s", s.bucketName, s.scopeName, s.collectionName, testKey)
171+
121172
eofDapiHooksContextID := "http-eof-test-case"
122-
err = startInfo.HooksManager.CreateHooksContext(eofDapiHooksContextID)
123-
require.NoError(s.T(), err, "failed to create http eof hooks context")
173+
s.setupDapiKvGetBarrierHook(ctx, conn, eofDapiHooksContextID, docPath, "eof-barrier")
124174

125175
// Check that requests to Data API taking longer than the shutdown timeout will eventually be forcibly closed.
126176
wg.Add(1)
127177
eofDapiReqSent := make(chan any)
128178
go func() {
129179
defer wg.Done()
130180

131-
statement := []byte(fmt.Sprintf(`{"statement": "%s"}`, "SELECT 1 == 1"))
132-
reqBody := bytes.NewReader(statement)
133-
134-
req, err := http.NewRequest("POST", fmt.Sprintf("https://%s/_p/query/query/service", dapiAddr), reqBody)
181+
req, err := http.NewRequest("GET", fmt.Sprintf("https://%s%s", dapiAddr, docPath), nil)
135182
require.NoError(s.T(), err, "failed to create http eof request")
136183

137184
req.SetBasicAuth(testConfig.CbUser, testConfig.CbPass)
138-
req.Header.Set("Content-Type", "application/json")
139185
req.Header.Set("X-Hooks-ID", eofDapiHooksContextID)
140-
req.Header.Set("X-Barrier-ID", "eof-barrier")
141186

142187
eofDapiReqSent <- struct{}{}
143188
_, err = dapiCli.Do(req)
@@ -148,25 +193,19 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
148193
}()
149194

150195
successDapiHooksContextID := "http-success-test-case"
151-
err = startInfo.HooksManager.CreateHooksContext(successDapiHooksContextID)
152-
require.NoError(s.T(), err, "failed to create http success hooks context")
196+
s.setupDapiKvGetBarrierHook(ctx, conn, successDapiHooksContextID, docPath, "success-barrier")
153197

154198
// Check that requests that start before shutdown is called but take less than the shutdown timeout complete successfully.
155199
wg.Add(1)
156200
successDapiReqSent := make(chan any)
157201
go func() {
158202
defer wg.Done()
159203

160-
statement := []byte(fmt.Sprintf(`{"statement": "%s"}`, "SELECT 1 == 1"))
161-
reqBody := bytes.NewReader(statement)
162-
163-
req, err := http.NewRequest("POST", fmt.Sprintf("https://%s/_p/query/query/service", dapiAddr), reqBody)
204+
req, err := http.NewRequest("GET", fmt.Sprintf("https://%s%s", dapiAddr, docPath), nil)
164205
require.NoError(s.T(), err, "failed to create http success request")
165206

166207
req.SetBasicAuth(testConfig.CbUser, testConfig.CbPass)
167-
req.Header.Set("Content-Type", "application/json")
168208
req.Header.Set("X-Hooks-ID", successDapiHooksContextID)
169-
req.Header.Set("X-Barrier-ID", "success-barrier")
170209

171210
successDapiReqSent <- struct{}{}
172211
resp, err := dapiCli.Do(req)
@@ -177,21 +216,6 @@ func (s *GatewayOpsTestSuite) TestGracefulShutdown() {
177216
_ = resp.Body.Close()
178217
}()
179218

180-
connAddr := fmt.Sprintf("%s:%d", "127.0.0.1", startInfo.ServicePorts.PS)
181-
conn, err := grpc.NewClient(connAddr,
182-
grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})),
183-
grpc.WithUserAgent("test-client"),
184-
)
185-
require.NoError(s.T(), err, "failed to create grpc client connection to gateway")
186-
defer func() { _ = conn.Close() }()
187-
188-
testKey := s.testDocId()
189-
190-
// The call to setupKvGetBarrierHook can hang indefinitely if the hooks service is unreachable so we use a context with a
191-
// generous timeout
192-
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
193-
defer cancel()
194-
195219
// Check that grpc requests that take longer than the shutdown timeout will eventually be forcibly closed.
196220
eofGrpcReqSent := make(chan any)
197221
wg.Add(1)

0 commit comments

Comments
 (0)