Skip to content

Commit b79ffbe

Browse files
committed
Add CRONET_READ_BUFFER_SIZE_KEY API to CronetClientStream
By default, CronetClientStreams would use a 4KB buffer to read data from Cronet. This can be inefficient especially if the amount of data being read is huge (~MBs) as each read callback operation incur overhead from Cronet itself (e.g. Context switch, JNI calls). The alternative would be to immediately bump the default to a bigger number but that would incur an increase in memory usage. So in order to safely experiment on this, An OptionKey is introduced which allows setting a per-stream value which will be controlled in a controlled environment to ensure we find the best new default.
1 parent 9193701 commit b79ffbe

File tree

2 files changed

+47
-3
lines changed

2 files changed

+47
-3
lines changed

cronet/src/main/java/io/grpc/cronet/CronetClientStream.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
* Client stream for the cronet transport.
6060
*/
6161
class CronetClientStream extends AbstractClientStream {
62-
private static final int READ_BUFFER_CAPACITY = 4 * 1024;
6362
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
6463
private static final String LOG_TAG = "grpc-java-cronet";
6564

@@ -69,6 +68,12 @@ class CronetClientStream extends AbstractClientStream {
6968

7069
static final CallOptions.Key<Collection<Object>> CRONET_ANNOTATIONS_KEY =
7170
CallOptions.Key.create("cronet-annotations");
71+
/**
72+
* Sets the read buffer size which the GRPC layer will use to read data from Cronet. Higher buffer
73+
* size leads to less overhead but more memory consumption. The current default value is 4KB.
74+
*/
75+
public static final CallOptions.Key<Integer> CRONET_READ_BUFFER_SIZE_KEY =
76+
CallOptions.Key.createWithDefault("cronet-read-buffer-size", 4 * 1024);
7277

7378
private final String url;
7479
private final String userAgent;
@@ -85,6 +90,8 @@ class CronetClientStream extends AbstractClientStream {
8590
private final Collection<Object> annotations;
8691
private final TransportState state;
8792
private final Sink sink = new Sink();
93+
@VisibleForTesting
94+
final int readBufferSize;
8895
private StreamBuilderFactory streamFactory;
8996

9097
CronetClientStream(
@@ -120,6 +127,7 @@ class CronetClientStream extends AbstractClientStream {
120127
this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY);
121128
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer,
122129
callOptions);
130+
this.readBufferSize = callOptions.getOption(CRONET_READ_BUFFER_SIZE_KEY);
123131

124132
// Tests expect the "plain" deframer behavior, not MigratingDeframer
125133
// https://github.com/grpc/grpc-java/issues/7140
@@ -309,7 +317,7 @@ public void bytesRead(int processedBytes) {
309317
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
310318
Log.v(LOG_TAG, "BidirectionalStream.read");
311319
}
312-
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
320+
stream.read(ByteBuffer.allocateDirect(readBufferSize));
313321
}
314322
}
315323

@@ -429,7 +437,7 @@ public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInf
429437
Log.v(LOG_TAG, "BidirectionalStream.read");
430438
}
431439
reportHeaders(info.getAllHeadersAsList(), false);
432-
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
440+
stream.read(ByteBuffer.allocateDirect(readBufferSize));
433441
}
434442

435443
@Override

cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.grpc.cronet;
1818

19+
import static io.grpc.cronet.CronetClientStream.CRONET_READ_BUFFER_SIZE_KEY;
1920
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
2021
import static org.junit.Assert.assertFalse;
2122
import static org.junit.Assert.assertSame;
@@ -92,6 +93,41 @@ public void alwaysUsePut_defaultsToFalse() throws Exception {
9293
assertFalse(stream.idempotent);
9394
}
9495

96+
@Test
97+
public void channelBuilderReadBufferSize_defaultsTo4Kb() throws Exception {
98+
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
99+
CronetTransportFactory transportFactory =
100+
(CronetTransportFactory) builder.buildTransportFactory();
101+
CronetClientTransport transport =
102+
(CronetClientTransport)
103+
transportFactory.newClientTransport(
104+
new InetSocketAddress("localhost", 443),
105+
new ClientTransportOptions(),
106+
channelLogger);
107+
CronetClientStream stream = transport.newStream(
108+
method, new Metadata(), CallOptions.DEFAULT, tracers);
109+
110+
assertEquals(4 * 1024, stream.readBufferSize);
111+
}
112+
113+
@Test
114+
public void channelBuilderReadBufferSize_changeReflected() throws Exception {
115+
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
116+
CronetTransportFactory transportFactory =
117+
(CronetTransportFactory) builder.buildTransportFactory();
118+
CronetClientTransport transport =
119+
(CronetClientTransport)
120+
transportFactory.newClientTransport(
121+
new InetSocketAddress("localhost", 443),
122+
new ClientTransportOptions(),
123+
channelLogger);
124+
CronetClientStream stream = transport.newStream(
125+
method, new Metadata(),
126+
CallOptions.DEFAULT.withOption(CRONET_READ_BUFFER_SIZE_KEY, 32 * 1024), tracers);
127+
128+
assertEquals(32 * 1024, stream.readBufferSize);
129+
}
130+
95131
@Test
96132
public void scheduledExecutorService_default() {
97133
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);

0 commit comments

Comments
 (0)