Skip to content

Commit 21e8e83

Browse files
committed
Add gRPC compression and decompression support
Signed-off-by: Daniel Fiala <danfiala23@gmail.com>
1 parent 76fea76 commit 21e8e83

File tree

59 files changed

+1561
-124
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1561
-124
lines changed

pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@
198198

199199
<modules>
200200
<module>vertx-grpc-common</module>
201+
<module>vertx-grpc-compression</module>
201202
<module>vertx-grpc-transcoding</module>
202203
<module>vertx-grpc-server</module>
203204
<module>vertx-grpc-reflection</module>
@@ -274,4 +275,4 @@
274275
</plugins>
275276
</build>
276277

277-
</project>
278+
</project>

vertx-grpc-client/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@
4242
<type>test-jar</type>
4343
<scope>test</scope>
4444
</dependency>
45+
<dependency>
46+
<groupId>io.vertx</groupId>
47+
<artifactId>vertx-grpc-compression</artifactId>
48+
<version>${project.version}</version>
49+
<scope>test</scope>
50+
</dependency>
4551
<dependency>
4652
<groupId>io.grpc</groupId>
4753
<artifactId>grpc-protobuf</artifactId>
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package io.vertx.grpc.client;
2+
3+
import io.vertx.codegen.annotations.DataObject;
4+
import io.vertx.codegen.annotations.Unstable;
5+
import io.vertx.codegen.json.annotations.JsonGen;
6+
import io.vertx.core.json.JsonObject;
7+
import io.vertx.grpc.common.GrpcCompressionOptions;
8+
9+
@Unstable
10+
@DataObject
11+
@JsonGen(publicConverter = false)
12+
public class GrpcClientCompressionOptions extends GrpcCompressionOptions {
13+
14+
/**
15+
* The default compression algorithm accepted by the client = {@code gzip}
16+
*/
17+
public static final String DEFAULT_COMPRESSION_ALGORITHM = "identity";
18+
19+
private String compressionAlgorithm;
20+
21+
/**
22+
* Default options.
23+
*/
24+
public GrpcClientCompressionOptions() {
25+
this.compressionAlgorithm = DEFAULT_COMPRESSION_ALGORITHM;
26+
}
27+
28+
/**
29+
* Copy constructor.
30+
*/
31+
public GrpcClientCompressionOptions(GrpcClientCompressionOptions other) {
32+
super(other);
33+
this.compressionAlgorithm = other.compressionAlgorithm;
34+
}
35+
36+
/**
37+
* Creates options from JSON.
38+
*/
39+
public GrpcClientCompressionOptions(JsonObject json) {
40+
this();
41+
GrpcClientCompressionOptionsConverter.fromJson(json, this);
42+
}
43+
44+
/**
45+
* @return the compression algorithm accepted by the client
46+
*/
47+
public String getCompressionAlgorithm() {
48+
return compressionAlgorithm;
49+
}
50+
51+
/**
52+
* Set the compression algorithm accepted by the client.
53+
*
54+
* @param compressionAlgorithm the compression algorithm
55+
* @return a reference to this, so the API can be used fluently
56+
*/
57+
public GrpcClientCompressionOptions setCompressionAlgorithm(String compressionAlgorithm) {
58+
this.compressionAlgorithm = compressionAlgorithm;
59+
return this;
60+
}
61+
62+
/**
63+
* @return a JSON representation of options
64+
*/
65+
public JsonObject toJson() {
66+
JsonObject json = new JsonObject();
67+
GrpcClientCompressionOptionsConverter.toJson(this, json);
68+
return json;
69+
}
70+
71+
@Override
72+
public String toString() {
73+
return toJson().encode();
74+
}
75+
}

vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientOptions.java

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,19 @@
1111
package io.vertx.grpc.client;
1212

1313
import io.vertx.codegen.annotations.DataObject;
14+
import io.vertx.codegen.annotations.Unstable;
15+
import io.vertx.codegen.json.annotations.JsonGen;
16+
import io.vertx.core.json.JsonObject;
1417

1518
import java.util.Objects;
1619
import java.util.concurrent.TimeUnit;
1720

1821
/**
1922
* Options configuring a gRPC client.
2023
*/
24+
@Unstable
2125
@DataObject
26+
@JsonGen(publicConverter = false)
2227
public class GrpcClientOptions {
2328

2429
/**
@@ -41,19 +46,26 @@ public class GrpcClientOptions {
4146
*/
4247
public static final long DEFAULT_MAX_MESSAGE_SIZE = 256 * 1024;
4348

49+
/**
50+
* The default compression options
51+
*/
52+
public static final GrpcClientCompressionOptions DEFAULT_COMPRESSION = new GrpcClientCompressionOptions();
53+
4454
private boolean scheduleDeadlineAutomatically;
4555
private int timeout;
4656
private TimeUnit timeoutUnit;
4757
private long maxMessageSize;
58+
private GrpcClientCompressionOptions compression;
4859

4960
/**
5061
* Default constructor.
5162
*/
5263
public GrpcClientOptions() {
53-
scheduleDeadlineAutomatically = DEFAULT_SCHEDULE_DEADLINE_AUTOMATICALLY;
54-
timeout = DEFAULT_TIMEOUT;
55-
timeoutUnit = DEFAULT_TIMEOUT_UNIT;
64+
this.scheduleDeadlineAutomatically = DEFAULT_SCHEDULE_DEADLINE_AUTOMATICALLY;
65+
this.timeout = DEFAULT_TIMEOUT;
66+
this.timeoutUnit = DEFAULT_TIMEOUT_UNIT;
5667
this.maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
68+
this.compression = DEFAULT_COMPRESSION;
5769
}
5870

5971
/**
@@ -62,10 +74,21 @@ public GrpcClientOptions() {
6274
* @param other the options to copy
6375
*/
6476
public GrpcClientOptions(GrpcClientOptions other) {
65-
scheduleDeadlineAutomatically = other.scheduleDeadlineAutomatically;
66-
timeout = other.timeout;
67-
timeoutUnit = other.timeoutUnit;
68-
maxMessageSize = other.maxMessageSize;
77+
this.scheduleDeadlineAutomatically = other.scheduleDeadlineAutomatically;
78+
this.timeout = other.timeout;
79+
this.timeoutUnit = other.timeoutUnit;
80+
this.maxMessageSize = other.maxMessageSize;
81+
this.compression = new GrpcClientCompressionOptions(other.compression);
82+
}
83+
84+
/**
85+
* Create a client options from JSON.
86+
*
87+
* @param json the JSON
88+
*/
89+
public GrpcClientOptions(JsonObject json) {
90+
this();
91+
GrpcClientOptionsConverter.fromJson(json, this);
6992
}
7093

7194
/**
@@ -158,4 +181,36 @@ public GrpcClientOptions setMaxMessageSize(long maxMessageSize) {
158181
this.maxMessageSize = maxMessageSize;
159182
return this;
160183
}
184+
185+
/**
186+
* @return the compression options
187+
*/
188+
public GrpcClientCompressionOptions getCompression() {
189+
return compression;
190+
}
191+
192+
/**
193+
* Set the compression options.
194+
*
195+
* @param compression the compression options
196+
* @return a reference to this, so the API can be used fluently
197+
*/
198+
public GrpcClientOptions setCompression(GrpcClientCompressionOptions compression) {
199+
this.compression = Objects.requireNonNull(compression);
200+
return this;
201+
}
202+
203+
/**
204+
* @return a JSON representation of options
205+
*/
206+
public JsonObject toJson() {
207+
JsonObject json = new JsonObject();
208+
GrpcClientOptionsConverter.toJson(this, json);
209+
return json;
210+
}
211+
212+
@Override
213+
public String toString() {
214+
return toJson().encode();
215+
}
161216
}

vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientImpl.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,17 @@
1414
import io.vertx.core.Vertx;
1515
import io.vertx.core.buffer.Buffer;
1616
import io.vertx.core.http.HttpClient;
17-
import io.vertx.core.http.HttpClientOptions;
1817
import io.vertx.core.http.HttpMethod;
19-
import io.vertx.core.http.HttpVersion;
2018
import io.vertx.core.http.RequestOptions;
2119
import io.vertx.core.internal.ContextInternal;
2220
import io.vertx.core.internal.VertxInternal;
2321
import io.vertx.core.net.Address;
2422
import io.vertx.grpc.client.GrpcClient;
2523
import io.vertx.grpc.client.GrpcClientOptions;
2624
import io.vertx.grpc.client.GrpcClientRequest;
27-
import io.vertx.grpc.common.ServiceMethod;
28-
import io.vertx.grpc.common.GrpcMessageDecoder;
29-
import io.vertx.grpc.common.GrpcMessageEncoder;
30-
import io.vertx.grpc.common.GrpcLocal;
25+
import io.vertx.grpc.common.*;
3126

27+
import java.util.Map;
3228
import java.util.concurrent.TimeUnit;
3329

3430
/**
@@ -44,6 +40,10 @@ public class GrpcClientImpl implements GrpcClient {
4440
private final int timeout;
4541
private final TimeUnit timeoutUnit;
4642

43+
private final String compressionAlgorithm;
44+
private final Map<String, GrpcCompressor> compressors;
45+
private final Map<String, GrpcDecompressor> decompressors;
46+
4747
public GrpcClientImpl(Vertx vertx, HttpClient client) {
4848
this(vertx, new GrpcClientOptions(), client, false);
4949
}
@@ -52,10 +52,14 @@ protected GrpcClientImpl(Vertx vertx, GrpcClientOptions grpcOptions, HttpClient
5252
this.vertx = vertx;
5353
this.client = client;
5454
this.scheduleDeadlineAutomatically = grpcOptions.getScheduleDeadlineAutomatically();
55-
this.maxMessageSize = grpcOptions.getMaxMessageSize();;
55+
this.maxMessageSize = grpcOptions.getMaxMessageSize();
5656
this.timeout = grpcOptions.getTimeout();
5757
this.timeoutUnit = grpcOptions.getTimeoutUnit();
5858
this.closeClient = close;
59+
60+
this.compressionAlgorithm = grpcOptions.getCompression().getCompressionAlgorithm();
61+
this.compressors = grpcOptions.getCompression().getCompressors();
62+
this.decompressors = grpcOptions.getCompression().getDecompressors();
5963
}
6064

6165
public Vertx vertx() {
@@ -70,8 +74,12 @@ public Future<GrpcClientRequest<Buffer, Buffer>> request(RequestOptions options)
7074
maxMessageSize,
7175
scheduleDeadlineAutomatically,
7276
GrpcMessageEncoder.IDENTITY,
73-
GrpcMessageDecoder.IDENTITY);
77+
GrpcMessageDecoder.IDENTITY,
78+
this.compressors,
79+
this.decompressors
80+
);
7481
grpcRequest.init();
82+
grpcRequest.encoding(this.compressionAlgorithm);
7583
configureTimeout(grpcRequest);
7684
return grpcRequest;
7785
});
@@ -123,8 +131,12 @@ private <Req, Resp> Future<GrpcClientRequest<Req, Resp>> request(RequestOptions
123131
maxMessageSize,
124132
scheduleDeadlineAutomatically,
125133
method.encoder(),
126-
method.decoder());
134+
method.decoder(),
135+
this.compressors,
136+
this.decompressors
137+
);
127138
call.init();
139+
call.encoding(this.compressionAlgorithm);
128140
call.serviceName(method.serviceName());
129141
call.methodName(method.methodName());
130142
configureTimeout(call);
@@ -137,7 +149,7 @@ public Future<Void> close() {
137149
if (closeClient) {
138150
return client.close();
139151
} else {
140-
return ((VertxInternal)vertx).getOrCreateContext().succeededFuture();
152+
return ((VertxInternal) vertx).getOrCreateContext().succeededFuture();
141153
}
142154
}
143155
}

vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientRequestImpl.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@ public GrpcClientRequestImpl(HttpClientRequest httpRequest,
5151
long maxMessageSize,
5252
boolean scheduleDeadline,
5353
GrpcMessageEncoder<Req> messageEncoder,
54-
GrpcMessageDecoder<Resp> messageDecoder) {
55-
super( ((PromiseInternal<?>)httpRequest.response()).context(), "application/grpc", httpRequest, messageEncoder);
54+
GrpcMessageDecoder<Resp> messageDecoder,
55+
Map<String, GrpcCompressor> compressors,
56+
Map<String, GrpcDecompressor> decompressors) {
57+
super( ((PromiseInternal<?>)httpRequest.response()).context(), "application/grpc", httpRequest, messageEncoder, compressors, decompressors);
5658
this.httpRequest = httpRequest;
5759
this.scheduleDeadline = scheduleDeadline;
5860
this.timeout = 0L;
@@ -83,7 +85,8 @@ public GrpcClientRequestImpl(HttpClientRequest httpRequest,
8385
format,
8486
status,
8587
httpResponse,
86-
messageDecoder);
88+
messageDecoder,
89+
decompressors);
8790
grpcResponse.init(this, maxMessageSize);
8891
grpcResponse.invalidMessageHandler(invalidMsg -> {
8992
cancel();
@@ -185,7 +188,7 @@ protected void setHeaders(String contentType, MultiMap headers) {
185188
if (encoding != null) {
186189
httpRequest.putHeader(GrpcHeaderNames.GRPC_ENCODING, encoding);
187190
}
188-
httpRequest.putHeader(GrpcHeaderNames.GRPC_ACCEPT_ENCODING, "gzip");
191+
httpRequest.putHeader(GrpcHeaderNames.GRPC_ACCEPT_ENCODING, String.join(",", GrpcDecompressor.getSupportedEncodings()));
189192
httpRequest.putHeader(HttpHeaderNames.TE, "trailers");
190193
httpRequest.setChunked(true);
191194
httpRequest.setURI(uri);

vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientResponseImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.vertx.grpc.common.impl.Http2GrpcMessageDeframer;
2727

2828
import java.nio.charset.StandardCharsets;
29+
import java.util.Map;
2930

3031
/**
3132
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
@@ -41,14 +42,18 @@ public GrpcClientResponseImpl(ContextInternal context,
4142
GrpcClientRequestImpl<Req, Resp> request,
4243
WireFormat format,
4344
GrpcStatus status,
44-
HttpClientResponse httpResponse, GrpcMessageDecoder<Resp> messageDecoder) {
45+
HttpClientResponse httpResponse,
46+
GrpcMessageDecoder<Resp> messageDecoder,
47+
Map<String, GrpcDecompressor> decompressors) {
4548
super(
4649
context,
4750
httpResponse,
4851
httpResponse.headers().get(GrpcHeaderNames.GRPC_ENCODING),
4952
format,
5053
new Http2GrpcMessageDeframer(httpResponse.headers().get(GrpcHeaderNames.GRPC_ENCODING), format),
51-
messageDecoder);
54+
messageDecoder,
55+
decompressors
56+
);
5257
this.request = request;
5358
this.httpResponse = httpResponse;
5459
this.status = status;
Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
module io.vertx.grpc.client{
2+
3+
requires static io.vertx.docgen;
4+
requires static io.vertx.codegen.api;
5+
requires static io.vertx.codegen.json;
6+
27
requires io.netty.buffer;
38
requires io.netty.codec.http;
49
requires io.netty.codec;
510
requires io.vertx.core.logging;
611
requires io.vertx.core;
712
requires io.vertx.grpc.common;
8-
requires static io.vertx.docgen;
9-
requires static io.vertx.codegen.api;
10-
requires static io.vertx.codegen.json;
1113
requires com.google.protobuf;
1214
requires com.google.common;
15+
16+
uses io.vertx.grpc.common.GrpcCompressor;
17+
uses io.vertx.grpc.common.GrpcDecompressor;
18+
1319
exports io.vertx.grpc.client;
1420
exports io.vertx.grpc.client.impl to io.vertx.tests.client;
1521
}

vertx-grpc-client/src/test/java/io/vertx/tests/client/ClientMessageEncodingTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ private void testEncode(TestContext should, String requestEncoding, GrpcMessage
9393
}));
9494
callRequest.endMessage(msg);
9595
}));
96+
97+
test.await(5000L);
9698
}
9799

98100
@Test

0 commit comments

Comments
 (0)