Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/couchbase/stellar-gateway/gateway/dapiimpl"
"github.com/couchbase/stellar-gateway/gateway/dapiimpl/proxy"
"github.com/couchbase/stellar-gateway/gateway/dataimpl"
"github.com/couchbase/stellar-gateway/gateway/hooks"
"github.com/couchbase/stellar-gateway/gateway/ratelimiting"
"github.com/couchbase/stellar-gateway/gateway/system"
"github.com/couchbase/stellar-gateway/pkg/metrics"
Expand All @@ -42,6 +43,7 @@ type StartupInfo struct {
ServerGroup string
AdvertiseAddr string
ServicePorts ServicePorts
HooksManager *hooks.HooksManager
}

type Config struct {
Expand Down Expand Up @@ -489,6 +491,7 @@ func (g *Gateway) Run(ctx context.Context) error {
PS: boundPsPort,
DAPI: boundDapiPort,
},
HooksManager: gatewaySys.HooksManager(),
})
}

Expand Down
5 changes: 5 additions & 0 deletions gateway/hooks/hooksmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hooks

import (
"errors"
"net/http"
"sync"

"github.com/couchbase/goprotostellar/genproto/internal_hooks_v1"
Expand Down Expand Up @@ -67,3 +68,7 @@ func (m *HooksManager) Server() internal_hooks_v1.HooksServiceServer {
func (m *HooksManager) UnaryInterceptor() grpc.UnaryServerInterceptor {
return makeGrpcUnaryInterceptor(m, m.logger)
}

func (m *HooksManager) HTTPMiddleware() func(http.Handler) http.Handler {
return makeHTTPMiddleware(m, m.logger)
}
44 changes: 44 additions & 0 deletions gateway/hooks/httpinterceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package hooks

import (
"net/http"

"go.uber.org/zap"
)

func makeHTTPMiddleware(manager *HooksManager, log *zap.Logger) func(http.Handler) http.Handler {
Comment thread
brett19 marked this conversation as resolved.
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
hooksID := r.Header.Get("X-Hooks-ID")
if hooksID == "" {
next.ServeHTTP(w, r)
return
}

hooksContext := manager.GetHooksContext(hooksID)
if hooksContext == nil {
next.ServeHTTP(w, r)
return
}

barrierID := r.Header.Get("X-Barrier-ID")
if barrierID == "" {
next.ServeHTTP(w, r)
return
}

log.Info("http request waiting on barrier",
zap.String("hooks-id", hooksID),
zap.String("barrier-id", barrierID))

barrier := hooksContext.GetBarrier(barrierID)
barrier.Wait(r.Context(), hooksID+":"+barrierID, nil)

log.Info("http request done waiting on barrier",
zap.String("hooks-id", hooksID),
zap.String("barrier-id", barrierID))

next.ServeHTTP(w, r)
Comment thread
Westwooo marked this conversation as resolved.
Outdated
})
}
}
9 changes: 9 additions & 0 deletions gateway/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type System struct {
dataServer *grpc.Server
dapiServer *http.Server

hooksManager *hooks.HooksManager
shutdownTimeout time.Duration
}

Expand Down Expand Up @@ -231,6 +232,9 @@ func NewSystem(opts *SystemOptions) (*System, error) {
})

var httpHandler http.Handler = mux
if opts.Debug {
httpHandler = hooksManager.HTTPMiddleware()(httpHandler)
}
if opts.RateLimiter != nil {
httpHandler = opts.RateLimiter.HttpMiddleware(httpHandler)
}
Expand All @@ -252,12 +256,17 @@ func NewSystem(opts *SystemOptions) (*System, error) {
logger: opts.Logger,
dataServer: dataSrv,
dapiServer: dapiSrv,
hooksManager: hooksManager,
shutdownTimeout: opts.ShutdownTimeout,
}

return s, nil
}

func (s *System) HooksManager() *hooks.HooksManager {
return s.hooksManager
}

func (s *System) Serve(ctx context.Context, l *Listeners) error {
var wg sync.WaitGroup

Expand Down
Loading
Loading