Skip to content

Commit 5b5db75

Browse files
authored
Add generic gRPC stream forwarding (#306)
To be able to pass Bazel's build event stream (BES) to the same DNS name, without having to add an extra L7 router in front of the bb-storage frontend, add a configuration to forward specific gRPC methods to other backends. No authorization is possible on the passed through messages because Buildbarn has no knowledge about the semantics of the forwarded messages. The gRPC reflection service has also been extended to forward requests that cannot be resolved locally.
1 parent 51e1a67 commit 5b5db75

File tree

13 files changed

+786
-63
lines changed

13 files changed

+786
-63
lines changed

MODULE.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use_repo(
5757
"com_github_gorilla_mux",
5858
"com_github_grpc_ecosystem_go_grpc_middleware",
5959
"com_github_grpc_ecosystem_go_grpc_prometheus",
60+
"com_github_jhump_protoreflect_v2",
6061
"com_github_jmespath_go_jmespath",
6162
"com_github_klauspost_compress",
6263
"com_github_prometheus_client_golang",

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ require (
2828
github.com/gorilla/mux v1.8.1
2929
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
3030
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
31+
github.com/jhump/protoreflect/v2 v2.0.0-beta.2
3132
github.com/jmespath/go-jmespath v0.4.0
3233
github.com/klauspost/compress v1.18.1
3334
github.com/prometheus/client_golang v1.23.2

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92Bcuy
198198
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
199199
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg=
200200
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4=
201+
github.com/jhump/protoreflect/v2 v2.0.0-beta.2 h1:qZU+rEZUOYTz1Bnhi3xbwn+VxdXkLVeEpAeZzVXLY88=
202+
github.com/jhump/protoreflect/v2 v2.0.0-beta.2/go.mod h1:4tnOYkB/mq7QTyS3YKtVtNrJv4Psqout8HA1U+hZtgM=
201203
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
202204
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
203205
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=

internal/mock/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ gomock(
250250
"ClientConnInterface",
251251
"ClientStream",
252252
"ServerStream",
253+
"ServerTransportStream",
253254
"StreamHandler",
254255
"Streamer",
255256
"UnaryHandler",

pkg/grpc/BUILD.bazel

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
"client_factory.go",
1414
"deduplicating_client_factory.go",
1515
"deny_authenticator.go",
16+
"forwarding_stream_handler.go",
1617
"jmespath_extractor.go",
1718
"lazy_client_dialer.go",
1819
"metadata_adding_interceptor.go",
@@ -26,8 +27,10 @@ go_library(
2627
"peer_transport_credentials_linux.go",
2728
"proto_trace_attributes_extractor.go",
2829
"proxy_dialer.go",
30+
"reflection_relay.go",
2931
"request_headers_authenticator.go",
3032
"request_metadata_tracing_interceptor.go",
33+
"routing_stream_handler.go",
3134
"server.go",
3235
"tls_client_certificate_authenticator.go",
3336
],
@@ -49,6 +52,8 @@ go_library(
4952
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
5053
"@com_github_grpc_ecosystem_go_grpc_middleware//:go-grpc-middleware",
5154
"@com_github_grpc_ecosystem_go_grpc_prometheus//:go-grpc-prometheus",
55+
"@com_github_jhump_protoreflect_v2//grpcreflect",
56+
"@com_github_jhump_protoreflect_v2//protoresolve",
5257
"@io_opentelemetry_go_contrib_instrumentation_google_golang_org_grpc_otelgrpc//:otelgrpc",
5358
"@io_opentelemetry_go_otel//attribute",
5459
"@io_opentelemetry_go_otel_trace//:trace",
@@ -63,11 +68,14 @@ go_library(
6368
"@org_golang_google_grpc//metadata",
6469
"@org_golang_google_grpc//peer",
6570
"@org_golang_google_grpc//reflection",
71+
"@org_golang_google_grpc//reflection/grpc_reflection_v1",
6672
"@org_golang_google_grpc//status",
6773
"@org_golang_google_grpc_security_advancedtls//:advancedtls",
6874
"@org_golang_google_protobuf//encoding/prototext",
6975
"@org_golang_google_protobuf//proto",
7076
"@org_golang_google_protobuf//reflect/protoreflect",
77+
"@org_golang_google_protobuf//types/known/emptypb",
78+
"@org_golang_x_sync//errgroup",
7179
"@org_golang_x_sync//semaphore",
7280
] + select({
7381
"@rules_go//go/platform:android": [
@@ -98,6 +106,7 @@ go_test(
98106
"authenticating_interceptor_test.go",
99107
"deduplicating_client_factory_test.go",
100108
"deny_authenticator_test.go",
109+
"forwarding_stream_handler_test.go",
101110
"jmespath_extractor_test.go",
102111
"lazy_client_dialer_test.go",
103112
"metadata_adding_interceptor_test.go",
@@ -107,6 +116,7 @@ go_test(
107116
"proto_trace_attributes_extractor_test.go",
108117
"request_headers_authenticator_test.go",
109118
"request_metadata_tracing_interceptor_test.go",
119+
"routing_stream_handler_test.go",
110120
] + select({
111121
"@rules_go//go/platform:android": [
112122
"peer_transport_credentials_test.go",
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package grpc
2+
3+
import (
4+
"io"
5+
6+
"golang.org/x/sync/errgroup"
7+
"google.golang.org/grpc"
8+
"google.golang.org/protobuf/types/known/emptypb"
9+
)
10+
11+
// NewForwardingStreamHandler creates a grpc.StreamHandler that forwards gRPC
12+
// calls to a grpc.ClientConnInterface backend.
13+
func NewForwardingStreamHandler(client grpc.ClientConnInterface) grpc.StreamHandler {
14+
forwarder := &forwardingStreamHandler{
15+
backend: client,
16+
}
17+
return forwarder.HandleStream
18+
}
19+
20+
type forwardingStreamHandler struct {
21+
backend grpc.ClientConnInterface
22+
}
23+
24+
// HandleStream creates a new stream to the backend. Requests from
25+
// incomingStream are forwarded to the backend stream and responses from the
26+
// backend stream are sent back in the incomingStream.
27+
func (s *forwardingStreamHandler) HandleStream(srv any, incomingStream grpc.ServerStream) error {
28+
// All gRPC invocations has a grpc.ServerTransportStream context.
29+
method, _ := grpc.Method(incomingStream.Context())
30+
desc := grpc.StreamDesc{
31+
// According to grpc.StreamDesc documentation, StreamName and Handler
32+
// are only used when registering handlers on a server.
33+
StreamName: "",
34+
Handler: nil,
35+
// Streaming behaviour is wanted, single message is treated the same on
36+
// transport level, the application just closes the stream after the
37+
// first message.
38+
ServerStreams: true,
39+
ClientStreams: true,
40+
}
41+
group, groupCtx := errgroup.WithContext(incomingStream.Context())
42+
group.Go(func() error {
43+
// groupCtx is guaranteed to be canceled before returning from this method, so outgoingStream will not leak resources.
44+
outgoingStream, err := s.backend.NewStream(groupCtx, &desc, method)
45+
if err != nil {
46+
return err
47+
}
48+
// Avoid group.Go because incomingStream.RecvMsg might block returning
49+
// an error from the outgoingStream and getting the context for
50+
// incomingStream canceled.
51+
go func() {
52+
for {
53+
msg := &emptypb.Empty{}
54+
if err := incomingStream.RecvMsg(msg); err != nil {
55+
if err == io.EOF {
56+
// Let's continue to receive on outgoingStream, so don't
57+
// cancel grouptCtx.
58+
outgoingStream.CloseSend()
59+
return
60+
}
61+
// Cancel groupCtx immediately.
62+
group.Go(func() error { return err })
63+
return
64+
}
65+
if err := outgoingStream.SendMsg(msg); err != nil {
66+
if err == io.EOF {
67+
// The error will be returned by outgoingStream.RecvMsg(),
68+
// no need to cancel groupCtx now.
69+
return
70+
}
71+
// Cancel groupCtx immediately.
72+
group.Go(func() error { return err })
73+
return
74+
}
75+
}
76+
}()
77+
78+
for {
79+
msg := &emptypb.Empty{}
80+
if err := outgoingStream.RecvMsg(msg); err != nil {
81+
if err == io.EOF {
82+
return nil
83+
}
84+
return err
85+
}
86+
if err := incomingStream.SendMsg(msg); err != nil {
87+
return err
88+
}
89+
}
90+
})
91+
return group.Wait()
92+
}

0 commit comments

Comments
 (0)