Skip to content

Commit 0b3d992

Browse files
craig[bot]cthumuluru-crdb
craig[bot]
andcommitted
Merge #145907
145907: rpc, server: properly handle DRPC stream context cancellations r=cthumuluru-crdb a=cthumuluru-crdb The graceful way to cancel a stream in DRPC is to close the stream. When stream context is canceled, DRPC closes the underlying transport by default as we see `<manager closed>` errors as a result. storj/drpc@a5d487a/drpcmanager/manager.go#L363-L378 `SoftClose` option on the DRPC manager allows for reusing the underlying transport as long as it's not busy (active write). In our case, since the stream is idle and at any point in time only one stream can use a connection we can use that option to avoid too many connection reopens. storj/drpc@a5d487a/drpcmanager/manager.go#L348-L363 Fixes: #140670 Epic: CRDB-48929 Release note: none Co-authored-by: Chandra Thumuluru <[email protected]>
2 parents 7f17911 + 027b9e6 commit 0b3d992

File tree

3 files changed

+89
-1
lines changed

3 files changed

+89
-1
lines changed

pkg/rpc/drpc.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"crypto/tls"
1111
"math"
1212
"net"
13+
"time"
1314

1415
"github.com/cockroachdb/cockroach/pkg/util/log"
1516
"github.com/cockroachdb/errors"
@@ -28,6 +29,9 @@ import (
2829
// have the DRPC server enabled.
2930
var ErrDRPCDisabled = errors.New("DRPC is not enabled")
3031

32+
// Default idle connection timeout for DRPC connections in the pool.
33+
var defaultDRPCConnIdleTimeout = 5 * time.Minute
34+
3135
type drpcServerI interface {
3236
Serve(ctx context.Context, lis net.Listener) error
3337
}
@@ -93,7 +97,9 @@ func newDRPCServer(_ context.Context, rpcCtx *Context) (*DRPCServer, error) {
9397
func dialDRPC(rpcCtx *Context) func(ctx context.Context, target string) (drpcpool.Conn, error) {
9498
return func(ctx context.Context, target string) (drpcpool.Conn, error) {
9599
// TODO(server): could use connection class instead of empty key here.
96-
pool := drpcpool.New[struct{}, drpcpool.Conn](drpcpool.Options{})
100+
pool := drpcpool.New[struct{}, drpcpool.Conn](drpcpool.Options{
101+
Expiration: defaultDRPCConnIdleTimeout,
102+
})
97103
pooledConn := pool.Get(ctx /* unused */, struct{}{}, func(ctx context.Context,
98104
_ struct{}) (drpcpool.Conn, error) {
99105

@@ -110,6 +116,7 @@ func dialDRPC(rpcCtx *Context) func(ctx context.Context, target string) (drpcpoo
110116
Stream: drpcstream.Options{
111117
MaximumBufferSize: 0, // unlimited
112118
},
119+
SoftCancel: true, // don't close the transport when stream context is canceled
113120
},
114121
}
115122
var conn *drpcconn.Conn

pkg/server/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,7 @@ go_test(
618618
"@in_gopkg_yaml_v2//:yaml_v2",
619619
"@io_opentelemetry_go_otel//attribute",
620620
"@io_storj_drpc//drpcconn",
621+
"@io_storj_drpc//drpcmanager",
621622
"@io_storj_drpc//drpcmigrate",
622623
"@org_golang_google_grpc//:grpc",
623624
"@org_golang_google_grpc//codes",

pkg/server/drpc_test.go

+80
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/util/log"
2121
"github.com/stretchr/testify/require"
2222
"storj.io/drpc/drpcconn"
23+
"storj.io/drpc/drpcmanager"
2324
"storj.io/drpc/drpcmigrate"
2425
)
2526

@@ -79,3 +80,82 @@ func TestDRPCBatchServer(t *testing.T) {
7980
require.NoError(t, err)
8081
})
8182
}
83+
84+
func TestStreamContextCancel(t *testing.T) {
85+
defer leaktest.AfterTest(t)()
86+
defer log.Scope(t).Close(t)
87+
88+
const numNodes = 1
89+
args := base.TestClusterArgs{
90+
ReplicationMode: base.ReplicationManual,
91+
ServerArgs: base.TestServerArgs{
92+
Settings: cluster.MakeClusterSettings(),
93+
},
94+
}
95+
96+
ctx := context.Background()
97+
rpc.ExperimentalDRPCEnabled.Override(ctx, &args.ServerArgs.Settings.SV, true)
98+
c := testcluster.StartTestCluster(t, numNodes, args)
99+
defer c.Stopper().Stop(ctx)
100+
101+
rpcAddr := c.Server(0).RPCAddr()
102+
103+
// Dial the drpc server with the drpc connection header.
104+
rawconn, err := drpcmigrate.DialWithHeader(ctx, "tcp", rpcAddr, drpcmigrate.DRPCHeader)
105+
require.NoError(t, err)
106+
107+
cm, err := c.Server(0).RPCContext().GetCertificateManager()
108+
require.NoError(t, err)
109+
tlsCfg, err := cm.GetNodeClientTLSConfig()
110+
require.NoError(t, err)
111+
tlsCfg = tlsCfg.Clone()
112+
tlsCfg.ServerName = "*.local"
113+
tlsConn := tls.Client(rawconn, tlsCfg)
114+
conn := drpcconn.NewWithOptions(tlsConn, drpcconn.Options{
115+
Manager: drpcmanager.Options{
116+
SoftCancel: true, // don't close the transport when stream context is canceled
117+
},
118+
})
119+
defer func() {
120+
require.NoError(t, conn.Close())
121+
}()
122+
123+
desc := c.LookupRangeOrFatal(t, c.ScratchRange(t))
124+
client := kvpb.NewDRPCBatchClient(conn)
125+
126+
singleRequest := func() {
127+
streamCtx, streamCtxCancel := context.WithCancel(ctx)
128+
defer streamCtxCancel()
129+
130+
s, err := client.BatchStream(streamCtx)
131+
require.NoError(t, err)
132+
133+
ba := &kvpb.BatchRequest{}
134+
ba.RangeID = desc.RangeID
135+
136+
var ok bool
137+
ba.Replica, ok = desc.GetReplicaDescriptor(1)
138+
require.True(t, ok)
139+
140+
req := &kvpb.LeaseInfoRequest{}
141+
req.Key = desc.StartKey.AsRawKey()
142+
ba.Add(req)
143+
144+
err = s.Send(ba)
145+
require.NoError(t, err)
146+
147+
_, err = s.Recv()
148+
require.NoError(t, err)
149+
}
150+
151+
// Make two consecutive stream requests using the same connection.
152+
for i := 0; i < 2; i++ {
153+
select {
154+
case <-conn.Closed():
155+
t.Fatal("connection closed unexpectedly")
156+
default:
157+
}
158+
159+
singleRequest()
160+
}
161+
}

0 commit comments

Comments
 (0)