Skip to content

Commit a374a9c

Browse files
committed
Fix forwardingStreamHandler errgroup race
Commit 5b5db75, "Add generic gRPC stream forwarding (#306)", introduced a flakiness in the grpc_test due to calling `errgroup.Go(func() error { return err })` when an error had occurred in a go routine that was asynchrounous to the errgroup.Wait() call. This could lead to `errgroup.Go()` being called just when `errgroup.Wait()` was about to return, which is not allowed.
1 parent bdb75e9 commit a374a9c

File tree

2 files changed

+17
-20
lines changed

2 files changed

+17
-20
lines changed

pkg/grpc/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ go_library(
7575
"@org_golang_google_protobuf//proto",
7676
"@org_golang_google_protobuf//reflect/protoreflect",
7777
"@org_golang_google_protobuf//types/known/emptypb",
78-
"@org_golang_x_sync//errgroup",
7978
"@org_golang_x_sync//semaphore",
8079
] + select({
8180
"@rules_go//go/platform:android": [

pkg/grpc/forwarding_stream_handler.go

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package grpc
22

33
import (
4+
"context"
45
"io"
56

6-
"golang.org/x/sync/errgroup"
7+
"github.com/buildbarn/bb-storage/pkg/program"
78
"google.golang.org/grpc"
89
"google.golang.org/protobuf/types/known/emptypb"
910
)
@@ -38,42 +39,40 @@ func (s *forwardingStreamHandler) HandleStream(srv any, incomingStream grpc.Serv
3839
ServerStreams: true,
3940
ClientStreams: true,
4041
}
41-
group, groupCtx := errgroup.WithContext(incomingStream.Context())
42-
group.Go(func() error {
43-
// groupCtx is guaranteed to be canceled before returning from this method, so outgoingStream will not leak resources.
44-
outgoingStream, err := s.backend.NewStream(groupCtx, &desc, method)
42+
43+
return program.RunLocal(incomingStream.Context(), func(ctx context.Context, siblingsGroup, dependenciesGroup program.Group) error {
44+
// ctx is guaranteed to be canceled when returning from this function,
45+
// so outgoingStream will not leak resources.
46+
outgoingStream, err := s.backend.NewStream(ctx, &desc, method)
4547
if err != nil {
4648
return err
4749
}
48-
// Avoid group.Go because incomingStream.RecvMsg might block returning
49-
// an error from the outgoingStream and getting the context for
50-
// incomingStream canceled.
51-
go func() {
50+
51+
// The only way to cancel a blocking incomingStream.RecvMsg is to return
52+
// from this method. Therefore, an error from outgoingStream.RecvMsg
53+
// needs to be returned without waiting for incomingStream.RecvMsg.
54+
siblingsGroup.GoAsync(func() error {
5255
for {
5356
msg := &emptypb.Empty{}
5457
if err := incomingStream.RecvMsg(msg); err != nil {
5558
if err == io.EOF {
5659
// Let's continue to receive on outgoingStream, so don't
5760
// cancel grouptCtx.
5861
outgoingStream.CloseSend()
59-
return
62+
return nil
6063
}
61-
// Cancel groupCtx immediately.
62-
group.Go(func() error { return err })
63-
return
64+
return err
6465
}
6566
if err := outgoingStream.SendMsg(msg); err != nil {
6667
if err == io.EOF {
6768
// The error will be returned by outgoingStream.RecvMsg(),
6869
// no need to cancel groupCtx now.
69-
return
70+
return nil
7071
}
71-
// Cancel groupCtx immediately.
72-
group.Go(func() error { return err })
73-
return
72+
return err
7473
}
7574
}
76-
}()
75+
})
7776

7877
for {
7978
msg := &emptypb.Empty{}
@@ -88,5 +87,4 @@ func (s *forwardingStreamHandler) HandleStream(srv any, incomingStream grpc.Serv
8887
}
8988
}
9089
})
91-
return group.Wait()
9290
}

0 commit comments

Comments
 (0)