Skip to content

Commit 0d63f28

Browse files
committed
Merge branch 'main' into compression
# Conflicts: # vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java # vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java
2 parents afca224 + 831180a commit 0d63f28

File tree

25 files changed

+446
-128
lines changed

25 files changed

+446
-128
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ to write gRPC middlewares like reverse proxies.
88

99
# License
1010

11-
Eclipse Public License - Version 1.0 / Apache License - Version 2.0
11+
Eclipse Public License - Version 2.0 / Apache License - Version 2.0

vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientResponse.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@
3131
@VertxGen
3232
public interface GrpcClientResponse<Req, Resp> extends GrpcReadStream<Resp> {
3333

34+
/**
35+
* @return the associated client request
36+
*/
37+
GrpcClientRequest<Req, Resp> request();
38+
3439
/**
3540
* @return the gRPC status or {@code null} when the status has not yet been received
3641
*/

vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientRequestImpl.java

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ public class GrpcClientRequestImpl<Req, Resp> extends GrpcWriteStreamBase<GrpcCl
4646
private TimeUnit timeoutUnit;
4747
private String timeoutHeader;
4848
private Timer deadline;
49-
private boolean cancelled;
5049

5150
public GrpcClientRequestImpl(HttpClientRequest httpRequest,
5251
long maxMessageSize,
@@ -228,31 +227,11 @@ void cancelTimeout() {
228227
}
229228

230229
@Override
231-
public void cancel() {
232-
if (cancelled) {
233-
return;
234-
}
235-
cancelled = true;
236-
context.execute(() -> {
237-
boolean responseEnded;
238-
if (response.failed()) {
239-
return;
240-
} else if (response.succeeded()) {
241-
GrpcClientResponse<Req, Resp> resp = response.result();
242-
if (resp.end().failed()) {
243-
return;
244-
} else {
245-
responseEnded = resp.end().succeeded();
246-
}
247-
} else {
248-
responseEnded = false;
249-
}
250-
if (!isTrailersSent() || !responseEnded) {
251-
httpRequest
252-
.reset(GrpcError.CANCELLED.http2ResetCode)
253-
.onSuccess(v -> handleError(GrpcError.CANCELLED));
254-
}
255-
});
230+
protected boolean sendCancel() {
231+
httpRequest
232+
.reset(GrpcError.CANCELLED.http2ResetCode)
233+
.onSuccess(v -> handleError(GrpcError.CANCELLED));
234+
return true;
256235
}
257236

258237
@Override

vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientResponseImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.vertx.core.http.HttpClientResponse;
1919

2020
import io.vertx.core.internal.ContextInternal;
21+
import io.vertx.grpc.client.GrpcClientRequest;
2122
import io.vertx.grpc.client.GrpcClientResponse;
2223
import io.vertx.grpc.client.InvalidStatusException;
2324
import io.vertx.grpc.common.*;
@@ -58,6 +59,11 @@ public GrpcClientResponseImpl(ContextInternal context,
5859
this.status = status;
5960
}
6061

62+
@Override
63+
public GrpcClientRequest<Req, Resp> request() {
64+
return request;
65+
}
66+
6167
@Override
6268
public MultiMap headers() {
6369
return httpResponse.headers();
@@ -79,6 +85,7 @@ protected void handleEnd() {
7985
}
8086
}
8187
super.handleEnd();
88+
request.handleStatus(status);
8289
if (!request.isTrailersSent()) {
8390
request.cancel();
8491
}

vertx-grpc-client/src/test/java/io/vertx/tests/client/ClientRequestTest.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -293,15 +293,19 @@ public void testClientStreamingCompletedBeforeHalfClose(TestContext should) thro
293293

294294
super.testClientStreamingCompletedBeforeHalfClose(should);
295295

296-
Async done = should.async();
296+
Async done = should.async(2);
297297
client = GrpcClient.client(vertx);
298298
client.request(SocketAddress.inetSocketAddress(port, "localhost"), SINK)
299299
.onComplete(should.asyncAssertSuccess(callRequest -> {
300+
callRequest.exceptionHandler(err -> {
301+
should.assertTrue(callRequest.isCancelled());
302+
done.countDown();
303+
});
300304
callRequest.response().onComplete(should.asyncAssertFailure(failure -> {
301305
should.assertEquals(GrpcErrorException.class, failure.getClass());
302306
GrpcErrorException f = (GrpcErrorException) failure;
303307
should.assertEquals(GrpcStatus.CANCELLED, f.status());
304-
done.complete();
308+
done.countDown();
305309
}));
306310
callRequest.write(Request.newBuilder().setName("the-value").build());
307311
}));
@@ -469,12 +473,17 @@ public void onCompleted() {
469473
client.request(SocketAddress.inetSocketAddress(port, "localhost"), SINK)
470474
.onComplete(should.asyncAssertSuccess(callRequest -> {
471475
callRequest.write(Request.getDefaultInstance());
472-
cf.whenComplete((v, t) -> {
473-
callRequest.cancel();
474-
try {
475-
callRequest.write(Request.getDefaultInstance());
476-
} catch (IllegalStateException ignore) {
477-
}
476+
io.vertx.core.Context ctx = vertx.getOrCreateContext();
477+
cf.whenComplete((v1, t) -> {
478+
ctx.runOnContext(v2 -> {
479+
should.assertFalse(callRequest.isCancelled());
480+
callRequest.cancel();
481+
should.assertTrue(callRequest.isCancelled());
482+
try {
483+
callRequest.write(Request.getDefaultInstance());
484+
} catch (IllegalStateException ignore) {
485+
}
486+
});
478487
});
479488
}));
480489
}

vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcWriteStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,9 @@ public interface GrpcWriteStream<T> extends WriteStream<T> {
6868
*/
6969
void cancel();
7070

71+
/**
72+
* @return whether the stream is cancelled
73+
*/
74+
boolean isCancelled();
75+
7176
}

vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcWriteStreamBase.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public abstract class GrpcWriteStreamBase<S extends GrpcWriteStreamBase<S, T>, T
3030
private boolean headersSent;
3131
private boolean trailersSent;
3232
private GrpcError error;
33+
private boolean cancelled;
3334

3435
private MultiMap headers;
3536
private MultiMap trailers;
@@ -52,9 +53,8 @@ public void init() {
5253
StreamResetException reset = (StreamResetException) err;
5354
GrpcError error = mapHttp2ErrorCode(reset.getCode());
5455
handleError(error);
55-
} else {
56-
handleException(err);
5756
}
57+
handleException(err);
5858
});
5959
}
6060

@@ -65,6 +65,7 @@ public S errorHandler(Handler<GrpcError> handler) {
6565

6666
public void handleError(GrpcError error) {
6767
if (this.error == null) {
68+
cancelled |= error == GrpcError.CANCELLED;
6869
this.error = error;
6970
Handler<GrpcError> handler = errorHandler;
7071
if (handler != null) {
@@ -80,6 +81,22 @@ private void handleException(Throwable err) {
8081
}
8182
}
8283

84+
public void handleStatus(GrpcStatus status) {
85+
cancelled |= status == GrpcStatus.CANCELLED;
86+
}
87+
88+
@Override
89+
public boolean isCancelled() {
90+
return cancelled;
91+
}
92+
93+
@Override
94+
public void cancel() {
95+
if (!cancelled) {
96+
cancelled = sendCancel();
97+
}
98+
}
99+
83100
@Override
84101
public final S encoding(String encoding) {
85102
if (headersSent) {
@@ -110,10 +127,6 @@ public boolean isTrailersSent() {
110127
return trailersSent;
111128
}
112129

113-
public boolean isCancelled() {
114-
return error == GrpcError.CANCELLED;
115-
}
116-
117130
@Override
118131
public final MultiMap headers() {
119132
if (headersSent) {
@@ -192,6 +205,8 @@ public final Future<Void> end() {
192205
protected abstract Future<Void> sendMessage(Buffer message, boolean compressed);
193206
protected abstract Future<Void> sendEnd();
194207
protected abstract Future<Void> sendHead();
208+
protected abstract boolean sendCancel();
209+
195210

196211
protected String contentType(WireFormat wireFormat) {
197212
if (wireFormat != null) {

vertx-grpc-protoc-plugin2/pom.xml

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
<name>Vert.x gRPC Protoc Plugin</name>
3131

3232
<properties>
33-
<canteen.version>1.1.0</canteen.version>
3433
<protoc.version>3.21.12</protoc.version>
3534
</properties>
3635

@@ -88,18 +87,6 @@
8887
</archive>
8988
</configuration>
9089
</plugin>
91-
<plugin>
92-
<groupId>com.salesforce.servicelibs</groupId>
93-
<artifactId>canteen-maven-plugin</artifactId>
94-
<version>${canteen.version}</version>
95-
<executions>
96-
<execution>
97-
<goals>
98-
<goal>bootstrap</goal>
99-
</goals>
100-
</execution>
101-
</executions>
102-
</plugin>
10390
</plugins>
10491
</build>
10592
</project>

vertx-grpc-server/src/main/generated/io/vertx/grpc/server/GrpcServerOptionsConverter.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import io.vertx.core.json.JsonObject;
44
import io.vertx.core.json.JsonArray;
5-
import java.time.Instant;
6-
import java.time.format.DateTimeFormatter;
75

86
/**
97
* Converter and mapper for {@link io.vertx.grpc.server.GrpcServerOptions}.

vertx-grpc-server/src/main/java/io/vertx/grpc/server/Service.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.protobuf.Descriptors;
44
import io.vertx.codegen.annotations.GenIgnore;
5+
import io.vertx.core.Future;
56
import io.vertx.grpc.common.ServiceName;
67
import io.vertx.grpc.server.impl.ServiceBuilderImpl;
78

@@ -50,6 +51,13 @@ static ServiceBuilder service(ServiceName serviceName, Descriptors.ServiceDescri
5051
*/
5152
void bind(GrpcServer server);
5253

54+
/**
55+
* Close the service.
56+
*/
57+
default Future<Void> close() {
58+
return Future.succeededFuture();
59+
}
60+
5361
/**
5462
* Get a list of all method descriptors for this service.
5563
*

0 commit comments

Comments
 (0)