Skip to content

Commit 5982a0c

Browse files
committed
Add CRONET_READ_BUFFER_SIZE_KEY API to CronetClientStream
By default, CronetClientStreams would use a 4KB buffer to read data from Cronet. This can be inefficient especially if the amount of data being read is huge (~MBs) as each read callback operation incur overhead from Cronet itself (e.g. Context switch, JNI calls). The alternative would be to immediately bump the default to a bigger number but that would incur an increase in memory usage. So in order to safely experiment on this, An OptionKey is introduced which allows setting a per-stream value which will be controlled in a controlled environment to ensure we find the best new default.
1 parent 9193701 commit 5982a0c

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

cronet/src/main/java/io/grpc/cronet/CronetClientStream.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,12 @@ class CronetClientStream extends AbstractClientStream {
6969

7070
static final CallOptions.Key<Collection<Object>> CRONET_ANNOTATIONS_KEY =
7171
CallOptions.Key.create("cronet-annotations");
72+
/**
73+
* Sets the read buffer size which the GRPC layer will use to read data from Cronet. Higher buffer
74+
* size leads to less overhead but more memory consumption. The current default value is 4KB.
75+
*/
76+
public static final CallOptions.Key<Integer> CRONET_READ_BUFFER_SIZE_KEY =
77+
CallOptions.Key.createWithDefault("cronet-read-buffer-size", 4 * 1024);
7278

7379
private final String url;
7480
private final String userAgent;
@@ -85,6 +91,8 @@ class CronetClientStream extends AbstractClientStream {
8591
private final Collection<Object> annotations;
8692
private final TransportState state;
8793
private final Sink sink = new Sink();
94+
@VisibleForTesting
95+
final int readBufferSize;
8896
private StreamBuilderFactory streamFactory;
8997

9098
CronetClientStream(
@@ -120,6 +128,7 @@ class CronetClientStream extends AbstractClientStream {
120128
this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY);
121129
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer,
122130
callOptions);
131+
this.readBufferSize = callOptions.getOption(CRONET_READ_BUFFER_SIZE_KEY);
123132

124133
// Tests expect the "plain" deframer behavior, not MigratingDeframer
125134
// https://github.com/grpc/grpc-java/issues/7140

cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.grpc.cronet;
1818

19+
import static io.grpc.cronet.CronetClientStream.CRONET_READ_BUFFER_SIZE_KEY;
1920
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
2021
import static org.junit.Assert.assertFalse;
2122
import static org.junit.Assert.assertSame;
@@ -92,6 +93,41 @@ public void alwaysUsePut_defaultsToFalse() throws Exception {
9293
assertFalse(stream.idempotent);
9394
}
9495

96+
@Test
97+
public void channelBuilderReadBufferSize_defaultsTo4Kb() throws Exception {
98+
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
99+
CronetTransportFactory transportFactory =
100+
(CronetTransportFactory) builder.buildTransportFactory();
101+
CronetClientTransport transport =
102+
(CronetClientTransport)
103+
transportFactory.newClientTransport(
104+
new InetSocketAddress("localhost", 443),
105+
new ClientTransportOptions(),
106+
channelLogger);
107+
CronetClientStream stream = transport.newStream(
108+
method, new Metadata(), CallOptions.DEFAULT, tracers);
109+
110+
assertEquals(4 * 1024, stream.readBufferSize);
111+
}
112+
113+
@Test
114+
public void channelBuilderReadBufferSize_changeReflected() throws Exception {
115+
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
116+
CronetTransportFactory transportFactory =
117+
(CronetTransportFactory) builder.buildTransportFactory();
118+
CronetClientTransport transport =
119+
(CronetClientTransport)
120+
transportFactory.newClientTransport(
121+
new InetSocketAddress("localhost", 443),
122+
new ClientTransportOptions(),
123+
channelLogger);
124+
CronetClientStream stream = transport.newStream(
125+
method, new Metadata(),
126+
CallOptions.DEFAULT.withOption(CRONET_READ_BUFFER_SIZE_KEY, 32 * 1024), tracers);
127+
128+
assertEquals(32 * 1024, stream.readBufferSize);
129+
}
130+
95131
@Test
96132
public void scheduledExecutorService_default() {
97133
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);

0 commit comments

Comments
 (0)