Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions default_dial_option_server_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (s) TestJoinDialOption(t *testing.T) {
if cc.dopts.copts.InitialWindowSize != initialWindowSize {
t.Fatalf("Unexpected cc.dopts.copts.InitialWindowSize: %d != %d", cc.dopts.copts.InitialWindowSize, initialWindowSize)
}
// Make sure static window size is not enabled when using WithInitialWindowSize.
if cc.dopts.copts.StaticWindowSize {
t.Fatalf("Unexpected cc.dopts.copts.StaticWindowSize: %t", cc.dopts.copts.StaticWindowSize)
}
}

// TestJoinServerOption tests the join server option. It configures a joined
Expand All @@ -162,6 +166,10 @@ func (s) TestJoinServerOption(t *testing.T) {
if s.opts.initialWindowSize != initialWindowSize {
t.Fatalf("Unexpected s.opts.initialWindowSize: %d != %d", s.opts.initialWindowSize, initialWindowSize)
}
// Make sure static window size is not enabled when using InitialWindowSize.
if s.opts.staticWindowSize {
t.Fatalf("Unexpected s.opts.staticWindowSize: %t", s.opts.staticWindowSize)
}
}

// funcTestHeaderListSizeDialOptionServerOption tests
Expand Down
14 changes: 12 additions & 2 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,20 +210,30 @@ func WithReadBufferSize(s int) DialOption {
// WithInitialWindowSize returns a DialOption which sets the value for initial
// window size on a stream. The lower bound for window size is 64K and any value
// smaller than that will be ignored.
//
// Deprecated: use WithInitialStreamWindowSize to set a stream window size without disabling
// dynamic flow control.
// Will be supported throughout 1.x.
func WithInitialWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialWindowSize = s
o.copts.StaticWindowSize = true
})
}

// WithInitialStreamWindowSize returns a DialOption which sets the value for
// initial window size on a stream without disabling dynamic flow control. The
// lower bound for window size is 64K and any value smaller than that will be
// ignored.
func WithInitialStreamWindowSize(s int32) DialOption {
return WithInitialWindowSize(s)
}

// WithInitialConnWindowSize returns a DialOption which sets the value for
// initial window size on a connection. The lower bound for window size is 64K
// and any value smaller than that will be ignored.
func WithInitialConnWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialConnWindowSize = s
o.copts.StaticWindowSize = true
})
}

Expand Down
13 changes: 11 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,19 +278,28 @@ func ReadBufferSize(s int) ServerOption {

// InitialWindowSize returns a ServerOption that sets window size for stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
//
// Deprecated: use InitialStreamWindowSize to set a stream window size without disabling
// dynamic flow control.
// Will be supported throughout 1.x.
func InitialWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialWindowSize = s
o.staticWindowSize = true
})
}

// InitialStreamWindowSize returns a ServerOption that sets window size for
// stream without disabling dynamic flow control. The lower bound for window
// size is 64K and any value smaller than that will be ignored.
func InitialStreamWindowSize(s int32) ServerOption {
return InitialWindowSize(s)
}

// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func InitialConnWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialConnWindowSize = s
o.staticWindowSize = true
})
}

Expand Down
6 changes: 6 additions & 0 deletions test/channelz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,10 @@ func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) {
func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) {
e := tcpClearRREnv
te := newTest(t, e)
// Before behavior change in PR #8665, large window sizes set
// using InitialWindowSize disabled BDP by default. Post the
// behavior change, we have to explicitly disable BDP.
te.serverStaticWindow = true
te.serverInitialWindowSize = 65536
// Avoid overflowing connection level flow control window, which will lead to
// transport being closed.
Expand Down Expand Up @@ -1146,6 +1150,8 @@ func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
e := tcpClearRREnv
te := newTest(t, e)
// disable BDP
te.serverStaticWindow = true
te.clientStaticWindow = true
te.serverInitialWindowSize = 65536
te.serverInitialConnWindowSize = 65536
te.clientInitialWindowSize = 65536
Expand Down
54 changes: 42 additions & 12 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ type test struct {
unaryServerInt grpc.UnaryServerInterceptor
streamServerInt grpc.StreamServerInterceptor
serverInitialWindowSize int32
serverStaticWindow bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this variable be given a more boolean like name like isServerWindowStatic or useStaticServerWindow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isServerWindowStatic reads more like natural English and answers the question "Is the server window static?". isServerStaticWindow feels less natural. Please change.

serverInitialConnWindowSize int32
customServerOptions []grpc.ServerOption

Expand All @@ -510,6 +511,7 @@ type test struct {
streamClientInt grpc.StreamClientInterceptor
clientInitialWindowSize int32
clientInitialConnWindowSize int32
clientStaticWindow bool
perRPCCreds credentials.PerRPCCredentials
customDialOptions []grpc.DialOption
resolverScheme string
Expand Down Expand Up @@ -606,11 +608,21 @@ func (te *test) listenAndServe(ts testgrpc.TestServiceServer, listen func(networ
sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler))
}
if te.serverInitialWindowSize > 0 {
sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
if te.serverStaticWindow {
sopts = append(sopts, grpc.StaticStreamWindowSize(te.serverInitialWindowSize))
} else {
sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
}
}

if te.serverInitialConnWindowSize > 0 {
sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
if te.serverStaticWindow {
sopts = append(sopts, grpc.StaticConnWindowSize(te.serverInitialConnWindowSize))
} else {
sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
}
}

la := ":0"
if te.e.network == "unix" {
la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano())
Expand Down Expand Up @@ -817,10 +829,19 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string)
opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, te.e.balancer)))
}
if te.clientInitialWindowSize > 0 {
opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
if te.clientStaticWindow {
opts = append(opts, grpc.WithStaticStreamWindowSize(te.clientInitialWindowSize))
} else {
opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add tests with both WithInitialStreamWindowSize and WithInitialWindowSize options to ensure both code paths are verified if they diverge in future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}

if te.clientInitialConnWindowSize > 0 {
opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
if te.clientStaticWindow {
opts = append(opts, grpc.WithStaticConnWindowSize(te.clientInitialConnWindowSize))
} else {
opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
}
}
if te.perRPCCreds != nil {
opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds))
Expand Down Expand Up @@ -5418,18 +5439,25 @@ func (s) TestClientWriteFailsAfterServerClosesStream(t *testing.T) {
}

type windowSizeConfig struct {
serverStream int32
serverConn int32
clientStream int32
clientConn int32
serverStream int32
serverConn int32
clientStream int32
clientConn int32
serverStaticWindow bool
clientStaticWindow bool
}

func (s) TestConfigurableWindowSizeWithLargeWindow(t *testing.T) {
// Before behavior change in PR #8665, large window sizes set
// using WithInitialWindowSize disabled BDP by default. Post the
// behavior change, we have to explicitly disable BDP.
wc := windowSizeConfig{
serverStream: 8 * 1024 * 1024,
serverConn: 12 * 1024 * 1024,
clientStream: 6 * 1024 * 1024,
clientConn: 8 * 1024 * 1024,
serverStream: 8 * 1024 * 1024,
serverConn: 12 * 1024 * 1024,
clientStream: 6 * 1024 * 1024,
clientConn: 8 * 1024 * 1024,
serverStaticWindow: true,
clientStaticWindow: true,
}
for _, e := range listTestEnv() {
testConfigurableWindowSize(t, e, wc)
Expand All @@ -5454,6 +5482,8 @@ func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) {
te.serverInitialConnWindowSize = wc.serverConn
te.clientInitialWindowSize = wc.clientStream
te.clientInitialConnWindowSize = wc.clientConn
te.serverStaticWindow = wc.serverStaticWindow
te.clientStaticWindow = wc.clientStaticWindow

te.startServer(&testServer{security: e.security})
defer te.tearDown()
Expand Down
7 changes: 5 additions & 2 deletions test/stream_cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ func (s) TestStreamCleanup(t *testing.T) {
return &testpb.Empty{}, nil
},
}
if err := ss.Start(nil, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {

// Use a static flow control window.
if err := ss.Start(nil, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithStaticStreamWindowSize(int32(initialWindowSize))); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
Expand Down Expand Up @@ -81,7 +83,8 @@ func (s) TestStreamCleanupAfterSendStatus(t *testing.T) {
})
},
}
if err := ss.Start(nil, grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {
// Use a static flow control window.
if err := ss.Start(nil, grpc.WithStaticStreamWindowSize(int32(initialWindowSize))); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
Expand Down