responseObserver) {
return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(getPipeMethod(), responseObserver);
}
+ }
+
+ /**
+ * Base class for the server implementation of the service Streaming.
+ *
+ * Interface exported by the server.
+ *
+ */
+ public static abstract class StreamingImplBase
+ implements io.grpc.BindableService, AsyncService {
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
- return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
- .addMethod(
- getSourceMethod(),
- io.grpc.stub.ServerCalls.asyncServerStreamingCall(
- new MethodHandlers<
- examples.Empty,
- examples.Item>(
- this, METHODID_SOURCE)))
- .addMethod(
- getSinkMethod(),
- io.grpc.stub.ServerCalls.asyncClientStreamingCall(
- new MethodHandlers<
- examples.Item,
- examples.Empty>(
- this, METHODID_SINK)))
- .addMethod(
- getPipeMethod(),
- io.grpc.stub.ServerCalls.asyncBidiStreamingCall(
- new MethodHandlers<
- examples.Item,
- examples.Item>(
- this, METHODID_PIPE)))
- .build();
+ return StreamingGrpc.bindService(this);
}
}
/**
+ * A stub to allow clients to do asynchronous rpc calls to service Streaming.
*
* Interface exported by the server.
*
*/
- public static final class StreamingStub extends io.grpc.stub.AbstractAsyncStub {
+ public static final class StreamingStub
+ extends io.grpc.stub.AbstractAsyncStub {
private StreamingStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
@@ -253,11 +243,13 @@ public io.grpc.stub.StreamObserver pipe(
}
/**
+ * A stub to allow clients to do synchronous rpc calls to service Streaming.
*
* Interface exported by the server.
*
*/
- public static final class StreamingBlockingStub extends io.grpc.stub.AbstractBlockingStub {
+ public static final class StreamingBlockingStub
+ extends io.grpc.stub.AbstractBlockingStub {
private StreamingBlockingStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
@@ -279,11 +271,13 @@ public java.util.Iterator source(
}
/**
+ * A stub to allow clients to do ListenableFuture-style rpc calls to service Streaming.
*
* Interface exported by the server.
*
*/
- public static final class StreamingFutureStub extends io.grpc.stub.AbstractFutureStub {
+ public static final class StreamingFutureStub
+ extends io.grpc.stub.AbstractFutureStub {
private StreamingFutureStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
@@ -305,10 +299,10 @@ private static final class MethodHandlers implements
io.grpc.stub.ServerCalls.ServerStreamingMethod,
io.grpc.stub.ServerCalls.ClientStreamingMethod,
io.grpc.stub.ServerCalls.BidiStreamingMethod {
- private final StreamingImplBase serviceImpl;
+ private final AsyncService serviceImpl;
private final int methodId;
- MethodHandlers(StreamingImplBase serviceImpl, int methodId) {
+ MethodHandlers(AsyncService serviceImpl, int methodId) {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@@ -343,6 +337,32 @@ public io.grpc.stub.StreamObserver invoke(
}
}
+ public static final io.grpc.ServerServiceDefinition bindService(AsyncService service) {
+ return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
+ .addMethod(
+ getSourceMethod(),
+ io.grpc.stub.ServerCalls.asyncServerStreamingCall(
+ new MethodHandlers<
+ examples.Empty,
+ examples.Item>(
+ service, METHODID_SOURCE)))
+ .addMethod(
+ getSinkMethod(),
+ io.grpc.stub.ServerCalls.asyncClientStreamingCall(
+ new MethodHandlers<
+ examples.Item,
+ examples.Empty>(
+ service, METHODID_SINK)))
+ .addMethod(
+ getPipeMethod(),
+ io.grpc.stub.ServerCalls.asyncBidiStreamingCall(
+ new MethodHandlers<
+ examples.Item,
+ examples.Item>(
+ service, METHODID_PIPE)))
+ .build();
+ }
+
private static abstract class StreamingBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier {
StreamingBaseDescriptorSupplier() {}
@@ -366,9 +386,9 @@ private static final class StreamingFileDescriptorSupplier
private static final class StreamingMethodDescriptorSupplier
extends StreamingBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
- private final String methodName;
+ private final java.lang.String methodName;
- StreamingMethodDescriptorSupplier(String methodName) {
+ StreamingMethodDescriptorSupplier(java.lang.String methodName) {
this.methodName = methodName;
}
diff --git a/vertx-grpc-client/src/main/java/examples/StreamingProto.java b/vertx-grpc-client/src/main/java/examples/StreamingProto.java
index 4ad5518d..7988c431 100644
--- a/vertx-grpc-client/src/main/java/examples/StreamingProto.java
+++ b/vertx-grpc-client/src/main/java/examples/StreamingProto.java
@@ -1,6 +1,7 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: streaming.proto
+// Protobuf Java Version: 3.25.5
package examples;
public final class StreamingProto {
diff --git a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientChannel.java b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientChannel.java
index 7469f4ea..d1612d8b 100644
--- a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientChannel.java
+++ b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientChannel.java
@@ -12,13 +12,9 @@
import io.grpc.CallOptions;
import io.grpc.ClientCall;
-import io.grpc.Compressor;
-import io.grpc.CompressorRegistry;
import io.grpc.MethodDescriptor;
import io.vertx.core.net.SocketAddress;
-import java.util.concurrent.Executor;
-
/**
* Bridge a gRPC service with a {@link io.vertx.grpc.client.GrpcClient}.
*/
@@ -34,20 +30,7 @@ public GrpcClientChannel(GrpcClient client, SocketAddress server) {
@Override
public ClientCall newCall(MethodDescriptor methodDescriptor, CallOptions callOptions) {
-
- String encoding = callOptions.getCompressor();
-
- Compressor compressor;
- if (encoding != null) {
- compressor = CompressorRegistry.getDefaultInstance().lookupCompressor(encoding);
- } else {
- compressor = null;
- }
-
-
- Executor exec = callOptions.getExecutor();
-
- return new VertxClientCall<>(client, server, exec, methodDescriptor, encoding, compressor);
+ return new VertxClientCall<>(client, server, methodDescriptor, callOptions, authority());
}
@Override
diff --git a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/VertxClientCall.java b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/VertxClientCall.java
index 1d9ece7e..9aac9799 100644
--- a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/VertxClientCall.java
+++ b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/VertxClientCall.java
@@ -1,12 +1,6 @@
package io.vertx.grpc.client;
-import io.grpc.ClientCall;
-import io.grpc.Compressor;
-import io.grpc.Decompressor;
-import io.grpc.DecompressorRegistry;
-import io.grpc.Metadata;
-import io.grpc.MethodDescriptor;
-import io.grpc.Status;
+import io.grpc.*;
import io.vertx.core.Future;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.net.SocketAddress;
@@ -22,12 +16,14 @@
class VertxClientCall extends ClientCall {
+ private final String authority;
private final GrpcClient client;
private final SocketAddress server;
private final Executor exec;
private final MethodDescriptor methodDescriptor;
private final String encoding;
private final Compressor compressor;
+ private final CallOptions callOptions;
private Future> fut;
private Listener listener;
private WriteStreamAdapter writeAdapter;
@@ -35,13 +31,21 @@ class VertxClientCall extends ClientCall request;
private GrpcClientResponse grpcResponse;
- VertxClientCall(GrpcClient client, SocketAddress server, Executor exec, MethodDescriptor methodDescriptor, String encoding, Compressor compressor) {
+ VertxClientCall(GrpcClient client, SocketAddress server, MethodDescriptor methodDescriptor, CallOptions callOptions, String authority) {
+ this.authority = authority;
this.client = client;
this.server = server;
- this.exec = exec;
+ this.exec = callOptions.getExecutor();
this.methodDescriptor = methodDescriptor;
- this.encoding = encoding;
- this.compressor = compressor;
+ this.encoding = callOptions.getCompressor();
+ this.callOptions = callOptions;
+
+ if (callOptions.getCompressor() != null) {
+ this.compressor = CompressorRegistry.getDefaultInstance().lookupCompressor(callOptions.getCompressor());
+ } else {
+ this.compressor = null;
+ }
+
writeAdapter = new WriteStreamAdapter() {
@Override
protected void handleReady() {
@@ -72,6 +76,13 @@ public void start(Listener responseListener, Metadata headers) {
fut.onComplete(ar1 -> {
if (ar1.succeeded()) {
request = ar1.result();
+
+ Status applyCallOptionsResult = applyCallOptions();
+ if (!applyCallOptionsResult.isOk()) {
+ doClose(applyCallOptionsResult, new Metadata());
+ return;
+ }
+
Utils.writeMetadata(headers, request.headers());
if (encoding != null) {
request.encoding(encoding);
@@ -133,6 +144,40 @@ public void start(Listener responseListener, Metadata headers) {
});
}
+ private Status applyCallOptions() {
+ if (this.callOptions.getAuthority() != null) {
+ return Status.INTERNAL.withCause(new UnsupportedOperationException("unsupported callOptions: authority"));
+ }
+
+ if (this.callOptions.getMaxInboundMessageSize() != null) {
+ return Status.INTERNAL.withCause(new UnsupportedOperationException("unsupported callOptions: maxInboundMessageSize"));
+ }
+
+ if (this.callOptions.getMaxOutboundMessageSize() != null) {
+ return Status.INTERNAL.withCause(new UnsupportedOperationException("unsupported callOptions: maxOutboundMessageSize"));
+ }
+
+ if (this.callOptions.getDeadline() != null) {
+ return Status.INTERNAL.withCause(new UnsupportedOperationException("unsupported callOptions: deadline"));
+ }
+
+ if (this.callOptions.getCredentials() != null) {
+ VertxCallCredentialsMetadataApplier metadataApplier = new VertxCallCredentialsMetadataApplier();
+
+ try {
+ this.callOptions.getCredentials().applyRequestMetadata(new VertxCallCredentialsRequestInfo(), exec, metadataApplier);
+
+ if (metadataApplier.failure() != null) {
+ return metadataApplier.failure();
+ }
+
+ } catch (Throwable throwable) {
+ return Status.UNAUTHENTICATED.withDescription("Credentials should use fail() instead of throwing exceptions").withCause(throwable);
+ }
+ }
+ return Status.OK;
+ }
+
private void doClose(Status status, Metadata trailers) {
Runnable cmd = () -> {
listener.onClose(status, trailers);
@@ -169,4 +214,47 @@ public void sendMessage(RequestT message) {
writeAdapter.write(message);
});
}
+
+ private class VertxCallCredentialsRequestInfo extends CallCredentials.RequestInfo {
+ public MethodDescriptor, ?> getMethodDescriptor() {
+ return methodDescriptor;
+ }
+
+ public CallOptions getCallOptions() {
+ return callOptions;
+ }
+
+ public SecurityLevel getSecurityLevel() {
+ return SecurityLevel.NONE;
+ }
+
+ public String getAuthority() {
+ return authority;
+ }
+
+ public Attributes getTransportAttrs() {
+ return Attributes.EMPTY;
+ }
+ }
+
+ private class VertxCallCredentialsMetadataApplier extends CallCredentials.MetadataApplier {
+ Status failure = null;
+
+ Status failure() {
+ return failure;
+ }
+
+ public VertxCallCredentialsMetadataApplier() {
+ }
+
+ @Override
+ public void apply(Metadata metadata) {
+ Utils.writeMetadata(metadata, request.headers());
+ }
+
+ @Override
+ public void fail(Status status) {
+ failure = status;
+ }
+ }
}
diff --git a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientRequestImpl.java b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientRequestImpl.java
index 9650b041..3c0526f9 100644
--- a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientRequestImpl.java
+++ b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientRequestImpl.java
@@ -148,6 +148,11 @@ public GrpcClientRequest drainHandler(Handler handler) {
return httpRequest.end();
}
+ @Override
+ public void end(Handler> handler) {
+ end().onComplete(handler);
+ }
+
private Future writeMessage(GrpcMessage message, boolean end) {
if (cancelled) {
throw new IllegalStateException("The stream has been cancelled");
@@ -217,6 +222,11 @@ public Future write(Req message) {
return writeMessage(messageEncoder.encode(message));
}
+ @Override
+ public void write(Req req, Handler> handler) {
+ write(req).onComplete(handler);
+ }
+
@Override
public Future end(Req message) {
return endMessage(messageEncoder.encode(message));
diff --git a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientResponseImpl.java b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientResponseImpl.java
index b3e3f78a..085fedb1 100644
--- a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientResponseImpl.java
+++ b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientResponseImpl.java
@@ -14,15 +14,14 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
-import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClientResponse;
-
import io.vertx.grpc.client.GrpcClientResponse;
import io.vertx.grpc.common.CodecException;
+import io.vertx.grpc.common.GrpcException;
import io.vertx.grpc.common.GrpcMessageDecoder;
-import io.vertx.grpc.common.impl.GrpcReadStreamBase;
import io.vertx.grpc.common.GrpcStatus;
+import io.vertx.grpc.common.impl.GrpcReadStreamBase;
import java.nio.charset.StandardCharsets;
@@ -101,7 +100,7 @@ public Future end() {
if (status == GrpcStatus.OK) {
return Future.succeededFuture();
} else {
- return Future.failedFuture("Invalid gRPC status " + status);
+ return Future.failedFuture(new GrpcException(statusMessage, status, httpResponse));
}
});
}
diff --git a/vertx-grpc-common/build.gradle b/vertx-grpc-common/build.gradle
new file mode 100644
index 00000000..8cb64797
--- /dev/null
+++ b/vertx-grpc-common/build.gradle
@@ -0,0 +1,71 @@
+plugins {
+ id 'java-library'
+}
+
+sourceSets {
+ main {
+ java {
+ exclude 'src/main/java/examples/**'
+ }
+ }
+ examples {
+ java {
+ srcDir 'src/main/java/examples'
+ }
+ resources {
+ srcDir 'src/main/proto'
+ }
+ }
+}
+
+configurations {
+ examplesImplementation
+ examplesRuntimeOnly
+}
+
+dependencies {
+ api "io.vertx:vertx-core:$vertxVersion"
+ api("io.grpc:grpc-stub:$grpcIoVersion") {
+ exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
+ }
+
+ api("io.grpc:grpc-api:$grpcIoVersion") {
+ exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
+ exclude group: 'org.codehaus.mojo', module: 'animal-sniffer-annotations'
+ }
+ implementation 'com.google.guava:guava:31.1-jre'
+
+ testImplementation "io.grpc:grpc-netty:$grpcIoVersion"
+ testImplementation "io.vertx:vertx-unit:$vertxVersion"
+ testImplementation "org.bouncycastle:bcpkix-jdk15on:1.70"
+ testImplementation("io.grpc:grpc-netty:$grpcIoVersion") {
+ exclude group: '*', module: '*'
+ }
+
+ // Examples-specific dependencies
+ examplesImplementation project(':vertx-grpc-common')
+ examplesImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: javaProtobufVersion
+
+ examplesImplementation("io.grpc:grpc-protobuf:$grpcIoVersion") {
+ exclude group: 'com.google.guava', module: 'guava'
+ }
+ examplesImplementation "io.grpc:grpc-stub:$grpcIoVersion"
+ examplesImplementation "io.grpc:grpc-api:$grpcIoVersion"
+}
+
+// In vertx-grpc-common/build.gradle
+tasks.register('testJar', Jar) {
+ archiveClassifier = 'tests'
+ from sourceSets.test.output
+}
+
+configurations {
+ testClasses {
+ canBeResolved = true
+ canBeConsumed = true
+ }
+}
+
+artifacts {
+ testClasses testJar
+}
diff --git a/vertx-grpc-common/pom.xml b/vertx-grpc-common/pom.xml
deleted file mode 100644
index 0c039cad..00000000
--- a/vertx-grpc-common/pom.xml
+++ /dev/null
@@ -1,160 +0,0 @@
-
-
-
-
- 4.0.0
-
-
- io.vertx
- vertx-grpc-aggregator
- 5.0.0-SNAPSHOT
- ../pom.xml
-
-
- vertx-grpc-common
-
- Vert.x gRPC Common
-
-
- false
- ${project.basedir}/src/main/resources/META-INF/MANIFEST.MF
-
-
-
-
- io.vertx
- vertx-core
-
-
- io.grpc
- grpc-protobuf
- ${grpc.version}
-
-
- com.google.guava
- guava
-
-
- com.google.errorprone
- error_prone_annotations
-
-
- org.codehaus.mojo
- animal-sniffer-annotations
-
-
-
-
- io.grpc
- grpc-api
- ${grpc.version}
-
-
- com.google.errorprone
- error_prone_annotations
-
-
-
-
- com.google.guava
- guava
-
-
-
- io.grpc
- grpc-netty
- ${grpc.version}
- test
-
-
- com.google.guava
- guava
-
-
- com.google.errorprone
- error_prone_annotations
-
-
- org.codehaus.mojo
- animal-sniffer-annotations
-
-
- io.netty
- *
-
-
-
-
- io.vertx
- vertx-unit
- test
-
-
-
-
-
-
- kr.motd.maven
- os-maven-plugin
- 1.4.1.Final
-
-
-
-
- org.xolstice.maven.plugins
- protobuf-maven-plugin
- 0.6.1
-
-
- com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
- grpc-java
- io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
-
-
-
- test-compile
-
- test-compile
- test-compile-custom
-
-
-
-
-
-
-
-
-
- java-11
-
- (1.8,)
-
-
-
- javax.annotation
- javax.annotation-api
- 1.3.2
-
-
-
-
-
-
diff --git a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcException.java b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcException.java
new file mode 100644
index 00000000..52ca42cc
--- /dev/null
+++ b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcException.java
@@ -0,0 +1,42 @@
+package io.vertx.grpc.common;
+
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import io.vertx.core.http.HttpClientResponse;
+
+public class GrpcException extends StatusRuntimeException {
+
+ private static final long serialVersionUID = -7838327176604697641L;
+
+ private HttpClientResponse httpResponse;
+
+ public GrpcException(String msg, GrpcStatus status, HttpClientResponse httpResponse) {
+ super(Status.fromCodeValue(status.code).withDescription(msg));
+ this.httpResponse = httpResponse;
+ }
+
+ public GrpcException(StatusRuntimeException statusRuntimeException) {
+ super(Status.fromThrowable(statusRuntimeException));
+ }
+
+ public GrpcException(GrpcStatus status) {
+ super(Status.fromCodeValue(status.code));
+ }
+
+ public GrpcException(GrpcStatus status, Throwable err) {
+ super(err instanceof StatusRuntimeException ? Status.fromThrowable(err) : err instanceof StatusException ? Status.fromThrowable(err) : Status.fromCodeValue(status.code).withCause(err));
+ }
+
+ public GrpcException(GrpcStatus status, String msg) {
+ super(Status.fromCodeValue(status.code).withDescription(msg));
+ }
+
+ public GrpcStatus status() {
+ return GrpcStatus.valueOf(getStatus().getCode().value());
+ }
+
+ public HttpClientResponse response() {
+ return httpResponse;
+ }
+}
diff --git a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMessageDecoder.java b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMessageDecoder.java
index ad4bf078..ce53e34e 100644
--- a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMessageDecoder.java
+++ b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMessageDecoder.java
@@ -19,7 +19,6 @@
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.buffer.Buffer;
-import io.vertx.core.buffer.impl.BufferInternal;
import io.vertx.core.buffer.impl.VertxByteBufAllocator;
import java.io.ByteArrayInputStream;
@@ -42,7 +41,7 @@ public Buffer decode(GrpcMessage msg) throws CodecException {
EmbeddedChannel channel = new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
channel.config().setAllocator(VertxByteBufAllocator.UNPOOLED_ALLOCATOR);
try {
- ChannelFuture fut = channel.writeOneInbound(((BufferInternal)msg.payload()).getByteBuf());
+ ChannelFuture fut = channel.writeOneInbound(msg.payload().getByteBuf());
if (fut.isSuccess()) {
Buffer decoded = null;
while (true) {
@@ -51,9 +50,9 @@ public Buffer decode(GrpcMessage msg) throws CodecException {
break;
}
if (decoded == null) {
- decoded = BufferInternal.buffer(buf);
+ decoded = Buffer.buffer(buf);
} else {
- decoded.appendBuffer(BufferInternal.buffer(buf));
+ decoded.appendBuffer(Buffer.buffer(buf));
}
}
if (decoded == null) {
diff --git a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMessageEncoder.java b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMessageEncoder.java
index 88b2e9a4..35dc9fab 100644
--- a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMessageEncoder.java
+++ b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcMessageEncoder.java
@@ -14,7 +14,6 @@
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
-import io.vertx.core.buffer.impl.BufferInternal;
import io.vertx.core.buffer.impl.VertxByteBufAllocator;
import java.io.IOException;
@@ -39,7 +38,7 @@ public GrpcMessage encode(Buffer payload) {
ZlibEncoder encoder = ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP, options.compressionLevel(), options.windowBits(), options.memLevel());
EmbeddedChannel channel = new EmbeddedChannel(encoder);
channel.config().setAllocator(VertxByteBufAllocator.UNPOOLED_ALLOCATOR);
- channel.writeOutbound(((BufferInternal) payload).getByteBuf());
+ channel.writeOutbound(((Buffer) payload).getByteBuf());
channel.finish();
Queue messages = channel.outboundMessages();
ByteBuf a;
@@ -47,7 +46,7 @@ public GrpcMessage encode(Buffer payload) {
composite.addComponent(true, a);
}
channel.close();
- return GrpcMessage.message("gzip", BufferInternal.buffer(composite));
+ return GrpcMessage.message("gzip", Buffer.buffer(composite));
}
};
diff --git a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcWriteStream.java b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcWriteStream.java
index c7eff90f..6d0fbdf5 100644
--- a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcWriteStream.java
+++ b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/GrpcWriteStream.java
@@ -12,7 +12,7 @@
public interface GrpcWriteStream extends WriteStream {
/**
- * @return the {@link MultiMap} to reader metadata headers
+ * @return the {@link MultiMap} to writer metadata headers
*/
MultiMap headers();
diff --git a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java
index 80ffeb55..59b55324 100644
--- a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java
+++ b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcMessageImpl.java
@@ -14,7 +14,6 @@
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.vertx.core.buffer.Buffer;
-import io.vertx.core.buffer.impl.BufferInternal;
import io.vertx.grpc.common.GrpcMessage;
public class GrpcMessageImpl implements GrpcMessage {
@@ -38,7 +37,7 @@ public Buffer payload() {
}
public static Buffer encode(GrpcMessage message) {
- ByteBuf bbuf = ((BufferInternal)message.payload()).getByteBuf();
+ ByteBuf bbuf = ((Buffer)message.payload()).getByteBuf();
int len = bbuf.readableBytes();
boolean compressed = !message.encoding().equals("identity");
ByteBuf prefix = Unpooled.buffer(5, 5);
@@ -47,6 +46,6 @@ public static Buffer encode(GrpcMessage message) {
CompositeByteBuf composite = Unpooled.compositeBuffer();
composite.addComponent(true, prefix);
composite.addComponent(true, bbuf);
- return BufferInternal.buffer(composite);
+ return Buffer.buffer(composite);
}
}
diff --git a/vertx-grpc-context-storage/build.gradle b/vertx-grpc-context-storage/build.gradle
new file mode 100644
index 00000000..b4da0d7d
--- /dev/null
+++ b/vertx-grpc-context-storage/build.gradle
@@ -0,0 +1,97 @@
+plugins {
+ id 'java-library'
+ id 'com.google.protobuf' version '0.9.4'
+}
+
+evaluationDependsOn(":vertx-grpc-protoc-plugin2")
+
+sourceSets {
+ main {
+ java {
+ exclude 'examples/**'
+ }
+ }
+ examples {
+ java {
+ srcDir 'src/main/java/examples'
+ }
+ resources {
+ srcDir 'src/main/proto'
+ }
+ }
+}
+
+configurations {
+ examplesImplementation
+ examplesRuntimeOnly
+ protocPlugin
+}
+
+dependencies {
+ api "io.vertx:vertx-core:$vertxVersion"
+ api project(':vertx-grpc-common')
+
+ implementation("io.grpc:grpc-protobuf:$grpcIoVersion") {
+ exclude group: 'com.google.guava', module: 'guava'
+ }
+
+ implementation("io.grpc:grpc-stub:$grpcIoVersion") {
+ exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
+ }
+ implementation("io.grpc:grpc-api:$grpcIoVersion") {
+ exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
+ exclude group: 'org.codehaus.mojo', module: 'animal-sniffer-annotations'
+ }
+
+
+ testCompileOnly "javax.annotation:javax.annotation-api:1.3.2"
+ testImplementation "io.vertx:vertx-unit:$vertxVersion"
+ testImplementation project(':vertx-grpc-client')
+ testImplementation project(':vertx-grpc-server')
+ testImplementation("io.grpc:grpc-netty:$grpcIoVersion") {
+ exclude group: '*', module: '*'
+ }
+ testImplementation("io.grpc:grpc-protobuf:$grpcIoVersion") {
+ exclude group: 'com.google.guava', module: 'guava'
+ }
+
+ // Examples-specific dependencies
+ examplesImplementation project(':vertx-grpc-common')
+ examplesImplementation("io.grpc:grpc-protobuf:$grpcIoVersion") {
+ exclude group: 'com.google.guava', module: 'guava'
+ }
+ examplesImplementation "io.grpc:grpc-stub:$grpcIoVersion"
+ examplesImplementation "io.grpc:grpc-api:$grpcIoVersion"
+
+ // Protoc plugin dependency
+ protocPlugin project(path: ':vertx-grpc-protoc-plugin2', configuration: 'shadow')
+}
+
+protobuf {
+ protoc {
+ artifact = "com.google.protobuf:protoc:3.25.5"
+ }
+ plugins {
+ grpc {
+ artifact = "io.grpc:protoc-gen-grpc-java:$grpcIoVersion"
+ }
+ vertx {
+ path = tasks.getByPath(':vertx-grpc-protoc-plugin2:shadowJar').archiveFile.get().asFile
+ }
+ }
+ generateProtoTasks {
+ all().each { task ->
+ task.plugins {
+ grpc {}
+ vertx {}
+ }
+ task.dependsOn ':vertx-grpc-protoc-plugin2:shadowJar'
+ }
+ ofSourceSet('examples').each { task ->
+ task.plugins {
+ grpc {} // For regular gRPC classes
+ vertx {} // For Vert.x specific classes
+ }
+ }
+ }
+}
diff --git a/vertx-grpc-context-storage/pom.xml b/vertx-grpc-context-storage/pom.xml
deleted file mode 100644
index 87c509a6..00000000
--- a/vertx-grpc-context-storage/pom.xml
+++ /dev/null
@@ -1,125 +0,0 @@
-
-
-
- 4.0.0
-
-
- io.vertx
- vertx-grpc-aggregator
- 5.0.0-SNAPSHOT
- ../pom.xml
-
-
- vertx-grpc-context-storage
-
- Vert.x gRPC Context Storage implementation
- This modules provides an io.grpc.override.ContextStorageOverride implementation which uses Vert.x local
- context data maps.
-
-
-
- false
-
-
-
-
- io.vertx
- vertx-grpc-common
-
-
-
- io.vertx
- vertx-unit
- test
-
-
- io.vertx
- vertx-grpc-client
- test
-
-
- io.vertx
- vertx-grpc-server
- test
-
-
- io.grpc
- grpc-netty
- ${grpc.version}
- test
-
-
- com.google.guava
- guava
-
-
- com.google.errorprone
- error_prone_annotations
-
-
- org.codehaus.mojo
- animal-sniffer-annotations
-
-
- io.netty
- *
-
-
-
-
-
-
-
-
- kr.motd.maven
- os-maven-plugin
- 1.4.1.Final
-
-
-
-
- org.xolstice.maven.plugins
- protobuf-maven-plugin
- 0.6.1
-
-
- com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
- grpc-java
- io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
-
-
-
- test-compile
-
- test-compile
- test-compile-custom
-
-
-
-
-
-
-
-
-
- java-11
-
- (1.8,)
-
-
-
- javax.annotation
- javax.annotation-api
- 1.3.2
-
-
-
-
-
-
diff --git a/vertx-grpc-it/build.gradle b/vertx-grpc-it/build.gradle
new file mode 100644
index 00000000..7f582cd2
--- /dev/null
+++ b/vertx-grpc-it/build.gradle
@@ -0,0 +1,68 @@
+plugins {
+ id 'java-library'
+ id 'com.google.protobuf' version '0.9.4'
+}
+
+configurations {
+ protocPlugin
+}
+
+dependencies {
+ implementation project(':vertx-grpc-client')
+ implementation project(':vertx-grpc-server')
+
+ testImplementation project(path: ':vertx-grpc-common', configuration: 'testClasses')
+
+ testImplementation project(':vertx-grpc-server')
+ testImplementation project(':vertx-grpc-client')
+ testImplementation "io.vertx:vertx-unit:$vertxVersion"
+ testImplementation "org.bouncycastle:bcpkix-jdk15on:1.70"
+ testImplementation("io.grpc:grpc-stub:$grpcIoVersion") {
+ exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
+ }
+ testImplementation("io.grpc:grpc-netty:$grpcIoVersion") {
+ exclude group: '*', module: '*'
+ }
+ testImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: javaProtobufVersion
+ testCompileOnly "javax.annotation:javax.annotation-api:1.3.2"
+
+ testImplementation("io.grpc:grpc-protobuf:$grpcIoVersion") {
+ exclude group: 'com.google.guava', module: 'guava'
+ }
+
+ testImplementation("io.grpc:grpc-stub:$grpcIoVersion") {
+ exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
+ }
+ testImplementation("io.grpc:grpc-api:$grpcIoVersion") {
+ exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
+ exclude group: 'org.codehaus.mojo', module: 'animal-sniffer-annotations'
+ }
+
+
+ protocPlugin project(path: ':vertx-grpc-protoc-plugin2', configuration: 'shadow')
+
+}
+
+
+protobuf {
+ protoc {
+ artifact = "com.google.protobuf:protoc:3.25.5"
+ }
+ plugins {
+ grpc {
+ artifact = "io.grpc:protoc-gen-grpc-java:$grpcIoVersion"
+ }
+ vertx {
+ path = tasks.getByPath(':vertx-grpc-protoc-plugin2:shadowJar').archiveFile.get().asFile
+ }
+ }
+ generateProtoTasks {
+ all().each { task ->
+ task.plugins {
+ grpc {}
+ vertx {}
+ }
+ task.dependsOn ':vertx-grpc-protoc-plugin2:shadowJar'
+ }
+ }
+}
diff --git a/vertx-grpc-it/pom.xml b/vertx-grpc-it/pom.xml
deleted file mode 100644
index 25cc13c7..00000000
--- a/vertx-grpc-it/pom.xml
+++ /dev/null
@@ -1,165 +0,0 @@
-
-
-
-
- 4.0.0
-
-
- io.vertx
- vertx-grpc-aggregator
- 5.0.0-SNAPSHOT
- ../pom.xml
-
-
- vertx-grpc-it
-
- Vert.x gRPC integration tests
-
-
- true
- ${project.basedir}/src/main/resources/META-INF/MANIFEST.MF
-
-
-
-
- io.vertx
- vertx-grpc-client
-
-
- io.vertx
- vertx-grpc-server
-
-
-
- io.vertx
- vertx-grpc-common
- ${project.version}
- test-jar
- test
-
-
- io.vertx
- vertx-unit
- test
-
-
-
- org.bouncycastle
- bcpkix-jdk15on
- 1.65
- test
-
-
- io.grpc
- grpc-stub
- ${grpc.version}
- test
-
-
- com.google.errorprone
- error_prone_annotations
-
-
-
-
- io.grpc
- grpc-netty
- ${grpc.version}
- test
-
-
- com.google.guava
- guava
-
-
- com.google.errorprone
- error_prone_annotations
-
-
- org.codehaus.mojo
- animal-sniffer-annotations
-
-
- io.netty
- *
-
-
-
-
-
-
-
-
- kr.motd.maven
- os-maven-plugin
- 1.4.1.Final
-
-
-
-
- org.xolstice.maven.plugins
- protobuf-maven-plugin
- 0.6.1
-
-
- com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
- grpc-java
- io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
-
-
- vertx-grpc-protoc-plugin
- io.vertx
- vertx-grpc-protoc-plugin2
- ${stack.version}
- io.vertx.grpc.plugin.VertxGrpcGenerator
-
-
-
-
-
- test-compile
-
- test-compile
- test-compile-custom
-
-
-
-
-
-
-
-
-
- java-11
-
- (1.8,)
-
-
-
- javax.annotation
- javax.annotation-api
- 1.3.2
-
-
-
-
-
-
diff --git a/vertx-grpc-it/src/test/java/io/vertx/grpc/it/ProtocPluginTest.java b/vertx-grpc-it/src/test/java/io/vertx/grpc/it/ProtocPluginTest.java
index 582c089e..53d6e081 100644
--- a/vertx-grpc-it/src/test/java/io/vertx/grpc/it/ProtocPluginTest.java
+++ b/vertx-grpc-it/src/test/java/io/vertx/grpc/it/ProtocPluginTest.java
@@ -27,10 +27,11 @@
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.grpc.client.GrpcClient;
+import io.vertx.grpc.common.GrpcException;
+import io.vertx.grpc.common.GrpcStatus;
import io.vertx.grpc.server.GrpcServer;
-import io.vertx.grpc.server.GrpcServerResponse;
+import io.vertx.grpc.server.GrpcServerRequest;
import io.vertx.test.fakestream.FakeStream;
-import org.junit.Ignore;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
@@ -77,7 +78,7 @@ public void testUnary_PromiseArg(TestContext should) throws Exception {
GrpcServer grpcServer = GrpcServer.server(vertx);
new VertxTestServiceGrpcServer.TestServiceApi() {
@Override
- public void unaryCall(Messages.SimpleRequest request, Promise response) {
+ public void unaryCall(GrpcServerRequest grpcServerRequest, Messages.SimpleRequest request, Promise response) {
response.complete(Messages.SimpleResponse.newBuilder()
.setUsername("FooBar")
.build());
@@ -156,7 +157,9 @@ public Future unaryCall(Messages.SimpleRequest request)
.setFillUsername(true)
.build())
.onComplete(should.asyncAssertFailure(err -> {
- should.assertEquals("Invalid gRPC status 13", err.getMessage());
+ should.assertTrue(err instanceof GrpcException);
+ GrpcException grpcException = (GrpcException)err;
+ should.assertEquals(GrpcStatus.INTERNAL, grpcException.status());
test.complete();
}));
test.awaitSuccess();
@@ -168,7 +171,7 @@ public void testManyUnary_PromiseArg(TestContext should) throws Exception {
GrpcServer grpcServer = GrpcServer.server(vertx);
new VertxTestServiceGrpcServer.TestServiceApi() {
@Override
- public void streamingInputCall(ReadStream request, Promise response) {
+ public void streamingInputCall(GrpcServerRequest grpcServerRequest, ReadStream request, Promise response) {
List list = new ArrayList<>();
request.handler(list::add);
request.endHandler($ -> {
@@ -277,7 +280,9 @@ public Future streamingInputCall(ReadStream
req.end();
})
.onComplete(should.asyncAssertFailure(err -> {
- should.assertEquals("Invalid gRPC status 13", err.getMessage());
+ should.assertTrue(err instanceof GrpcException);
+ GrpcException grpcException = (GrpcException)err;
+ should.assertEquals(GrpcStatus.INTERNAL, grpcException.status());
test.complete();
}));
test.awaitSuccess();
@@ -289,7 +294,7 @@ public void testUnaryMany_WriteStreamArg(TestContext should) throws Exception {
GrpcServer grpcServer = GrpcServer.server(vertx);
new VertxTestServiceGrpcServer.TestServiceApi() {
@Override
- public void streamingOutputCall(Messages.StreamingOutputCallRequest request, WriteStream response) {
+ public void streamingOutputCall(GrpcServerRequest grpcServerRequest, Messages.StreamingOutputCallRequest request, WriteStream response) {
response.write(Messages.StreamingOutputCallResponse.newBuilder()
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom("StreamingOutputResponse-1", StandardCharsets.UTF_8)).build())
.build());
@@ -392,7 +397,9 @@ public ReadStream streamingOutputCall(Mess
.build();
client.streamingOutputCall(request)
.onComplete(should.asyncAssertFailure(err -> {
- should.assertEquals("Invalid gRPC status 13", err.getMessage());
+ should.assertTrue(err instanceof GrpcException);
+ GrpcException grpcException = (GrpcException)err;
+ should.assertEquals(GrpcStatus.INTERNAL, grpcException.status());
test.complete();
}));
test.awaitSuccess();
@@ -404,7 +411,7 @@ public void testmanyMany_WriteStreamArg(TestContext should) throws Exception {
GrpcServer grpcServer = GrpcServer.server(vertx);
new VertxTestServiceGrpcServer.TestServiceApi() {
@Override
- public void fullDuplexCall(ReadStream request, WriteStream response) {
+ public void fullDuplexCall(GrpcServerRequest grpcServerRequest, ReadStream request, WriteStream response) {
request.endHandler($ -> {
response.write(Messages.StreamingOutputCallResponse.newBuilder()
.setPayload(Messages.Payload.newBuilder().setBody(ByteString.copyFrom("StreamingOutputResponse-1", StandardCharsets.UTF_8)).build())
@@ -525,7 +532,9 @@ public ReadStream fullDuplexCall(ReadStrea
req.end();
})
.onComplete(should.asyncAssertFailure(err -> {
- should.assertEquals("Invalid gRPC status 13", err.getMessage());
+ should.assertTrue(err instanceof GrpcException);
+ GrpcException grpcException = (GrpcException)err;
+ should.assertEquals(GrpcStatus.INTERNAL, grpcException.status());
test.complete();
}));
test.awaitSuccess();
diff --git a/vertx-grpc-it/src/test/resources/keystore.jceks b/vertx-grpc-it/src/test/resources/keystore.jceks
new file mode 100644
index 00000000..10d93c30
Binary files /dev/null and b/vertx-grpc-it/src/test/resources/keystore.jceks differ
diff --git a/vertx-grpc-protoc-plugin2/build.gradle b/vertx-grpc-protoc-plugin2/build.gradle
new file mode 100644
index 00000000..da3a5750
--- /dev/null
+++ b/vertx-grpc-protoc-plugin2/build.gradle
@@ -0,0 +1,23 @@
+plugins {
+ id 'com.gradleup.shadow' version "8.3.5"
+}
+
+dependencies {
+ implementation("com.salesforce.servicelibs:jprotoc:1.2.1") {
+ exclude group: 'javax.annotation', module: 'javax.annotation-api'
+ }
+}
+
+jar {
+ manifest {
+ attributes(
+ 'Main-Class': 'io.vertx.grpc.plugin.VertxGrpcGenerator'
+ )
+ }
+
+ from {
+ configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
+ }
+
+ duplicatesStrategy = DuplicatesStrategy.EXCLUDE
+}
diff --git a/vertx-grpc-protoc-plugin2/pom.xml b/vertx-grpc-protoc-plugin2/pom.xml
deleted file mode 100644
index a4922c1a..00000000
--- a/vertx-grpc-protoc-plugin2/pom.xml
+++ /dev/null
@@ -1,97 +0,0 @@
-
-
-
-
- 4.0.0
-
-
- io.vertx
- vertx-grpc-aggregator
- 5.0.0-SNAPSHOT
- ../pom.xml
-
-
- vertx-grpc-protoc-plugin2
-
- Vert.x gRPC Protoc Plugin
-
-
- false
- 1.1.0
- 3.21.12
- 1.2.1
-
-
-
-
- com.salesforce.servicelibs
- jprotoc
- ${jprotoc.version}
-
-
- javax.annotation
- javax.annotation-api
-
-
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 3.2.1
-
-
- package
-
- shade
-
-
-
-
- false
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
-
-
- true
- io.vertx.grpc.plugin.VertxGrpcGenerator
-
-
-
-
-
- com.salesforce.servicelibs
- canteen-maven-plugin
- ${canteen.version}
-
-
-
- bootstrap
-
-
-
-
-
-
-
diff --git a/vertx-grpc-protoc-plugin2/src/main/resources/client.mustache b/vertx-grpc-protoc-plugin2/src/main/resources/client.mustache
index c0c26520..2bff9cb8 100644
--- a/vertx-grpc-protoc-plugin2/src/main/resources/client.mustache
+++ b/vertx-grpc-protoc-plugin2/src/main/resources/client.mustache
@@ -8,6 +8,7 @@ import io.vertx.core.net.SocketAddress;
import io.vertx.grpc.client.GrpcClient;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
+import io.vertx.grpc.common.GrpcException;
import io.vertx.grpc.common.GrpcStatus;
public class {{className}} {
@@ -34,7 +35,7 @@ public class {{className}} {
req.end(request);
return req.response().flatMap(resp -> {
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
- return Future.failedFuture("Invalid gRPC status " + resp.status());
+ return Future.failedFuture(new GrpcException(resp.statusMessage(), resp.status(), null));
} else {
return Future.succeededFuture(resp);
}
@@ -58,7 +59,7 @@ public class {{className}} {
request.handle(req);
return req.response().flatMap(resp -> {
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
- return Future.failedFuture("Invalid gRPC status " + resp.status());
+ return Future.failedFuture(new GrpcException(resp.statusMessage(), resp.status(), null));
} else {
return Future.succeededFuture(resp);
}
diff --git a/vertx-grpc-protoc-plugin2/src/main/resources/server.mustache b/vertx-grpc-protoc-plugin2/src/main/resources/server.mustache
index 5a1ab51b..5f061c12 100644
--- a/vertx-grpc-protoc-plugin2/src/main/resources/server.mustache
+++ b/vertx-grpc-protoc-plugin2/src/main/resources/server.mustache
@@ -2,6 +2,9 @@
package {{packageName}};
{{/packageName}}
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Handler;
@@ -10,6 +13,7 @@ import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import io.vertx.grpc.common.GrpcStatus;
import io.vertx.grpc.server.GrpcServer;
+import io.vertx.grpc.server.GrpcServerRequest;
import io.vertx.grpc.server.GrpcServerResponse;
import java.util.ArrayList;
@@ -21,8 +25,11 @@ public class {{className}} {
default Future<{{outputType}}> {{methodName}}({{inputType}} request) {
throw new UnsupportedOperationException("Not implemented");
}
- default void {{methodName}}({{inputType}} request, Promise<{{outputType}}> response) {
- {{methodName}}(request)
+ default Future<{{outputType}}> {{methodName}}(GrpcServerRequest<{{inputType}}, {{outputType}}> grpcServerRequest, {{inputType}} request) {
+ return {{methodName}}(request);
+ }
+ default void {{methodName}}(GrpcServerRequest<{{inputType}}, {{outputType}}> grpcServerRequest, {{inputType}} request, Promise<{{outputType}}> response) {
+ {{methodName}}(grpcServerRequest, request)
.onSuccess(msg -> response.complete(msg))
.onFailure(error -> response.fail(error));
}
@@ -31,8 +38,11 @@ public class {{className}} {
default ReadStream<{{outputType}}> {{methodName}}({{inputType}} request) {
throw new UnsupportedOperationException("Not implemented");
}
- default void {{methodName}}({{inputType}} request, WriteStream<{{outputType}}> response) {
- {{methodName}}(request)
+ default ReadStream<{{outputType}}> {{methodName}}(GrpcServerRequest<{{inputType}}, {{outputType}}> grpcServerRequest, {{inputType}} request) {
+ return {{methodName}}(request);
+ }
+ default void {{methodName}}(GrpcServerRequest<{{inputType}}, {{outputType}}> grpcServerRequest, {{inputType}} request, WriteStream<{{outputType}}> response) {
+ {{methodName}}(grpcServerRequest, request)
.handler(msg -> response.write(msg))
.endHandler(msg -> response.end())
.resume();
@@ -42,8 +52,11 @@ public class {{className}} {
default Future<{{outputType}}> {{methodName}}(ReadStream<{{inputType}}> request) {
throw new UnsupportedOperationException("Not implemented");
}
- default void {{methodName}}(ReadStream<{{inputType}}> request, Promise<{{outputType}}> response) {
- {{methodName}}(request)
+ default Future<{{outputType}}> {{methodName}}(GrpcServerRequest<{{inputType}}, {{outputType}}> grpcServerRequest, ReadStream<{{inputType}}> request) {
+ return {{methodName}}(request);
+ }
+ default void {{methodName}}(GrpcServerRequest<{{inputType}}, {{outputType}}> grpcServerRequest, ReadStream<{{inputType}}> request, Promise<{{outputType}}> response) {
+ {{methodName}}(grpcServerRequest, request)
.onSuccess(msg -> response.complete(msg))
.onFailure(error -> response.fail(error));
}
@@ -52,8 +65,11 @@ public class {{className}} {
default ReadStream<{{outputType}}> {{methodName}}(ReadStream<{{inputType}}> request) {
throw new UnsupportedOperationException("Not implemented");
}
- default void {{methodName}}(ReadStream<{{inputType}}> request, WriteStream<{{outputType}}> response) {
- {{methodName}}(request)
+ default ReadStream<{{outputType}}> {{methodName}}(GrpcServerRequest<{{inputType}}, {{outputType}}> grpcServerRequest, ReadStream<{{inputType}}> request) {
+ return {{methodName}}(request);
+ }
+ default void {{methodName}}(GrpcServerRequest<{{inputType}}, {{outputType}}> grpcServerRequest, ReadStream<{{inputType}}> request, WriteStream<{{outputType}}> response) {
+ {{methodName}}(grpcServerRequest, request)
.handler(msg -> response.write(msg))
.endHandler(msg -> response.end())
.resume();
@@ -66,13 +82,20 @@ public class {{className}} {
Promise<{{outputType}}> promise = Promise.promise();
request.handler(req -> {
try {
- {{methodName}}(req, promise);
+ {{methodName}}(request, req, promise);
} catch (RuntimeException err) {
promise.tryFail(err);
}
});
promise.future()
- .onFailure(err -> request.response().status(GrpcStatus.INTERNAL).end())
+ .onFailure(err -> {
+ if (err instanceof StatusRuntimeException) {
+ StatusRuntimeException grpcException = (StatusRuntimeException)err;
+ request.response().status(GrpcStatus.valueOf(grpcException.getStatus().getCode().value())).statusMessage(grpcException.getMessage()).end();
+ } else {
+ request.response().status(GrpcStatus.INTERNAL).end();
+ }
+ })
.onSuccess(resp -> request.response().end(resp));
});
return this;
@@ -83,7 +106,9 @@ public class {{className}} {
server.callHandler({{serviceName}}Grpc.{{methodNameGetter}}(), request -> {
request.handler(req -> {
try {
- {{methodName}}(req, request.response());
+ {{methodName}}(request, req, request.response());
+ } catch (StatusRuntimeException grpcException) {
+ request.response().status(GrpcStatus.valueOf(grpcException.getStatus().getCode().value())).statusMessage(grpcException.getMessage()).end();
} catch (RuntimeException err) {
request.response().status(GrpcStatus.INTERNAL).end();
}
@@ -97,10 +122,17 @@ public class {{className}} {
server.callHandler({{serviceName}}Grpc.{{methodNameGetter}}(), request -> {
Promise<{{outputType}}> promise = Promise.promise();
promise.future()
- .onFailure(err -> request.response().status(GrpcStatus.INTERNAL).end())
+ .onFailure(err -> {
+ if (err instanceof StatusRuntimeException) {
+ StatusRuntimeException grpcException = (StatusRuntimeException)err;
+ request.response().status(GrpcStatus.valueOf(grpcException.getStatus().getCode().value())).statusMessage(grpcException.getMessage()).end();
+ } else {
+ request.response().status(GrpcStatus.INTERNAL).end();
+ }
+ })
.onSuccess(resp -> request.response().end(resp));
try {
- {{methodName}}(request, promise);
+ {{methodName}}(request, request, promise);
} catch (RuntimeException err) {
promise.tryFail(err);
}
@@ -112,7 +144,9 @@ public class {{className}} {
default {{serviceName}}Api bind_{{methodName}}(GrpcServer server) {
server.callHandler({{serviceName}}Grpc.{{methodNameGetter}}(), request -> {
try {
- {{methodName}}(request, request.response());
+ {{methodName}}(request, request, request.response());
+ } catch (StatusRuntimeException grpcException) {
+ request.response().status(GrpcStatus.valueOf(grpcException.getStatus().getCode().value())).statusMessage(grpcException.getMessage()).end();
} catch (RuntimeException err) {
request.response().status(GrpcStatus.INTERNAL).end();
}
diff --git a/vertx-grpc-server/build.gradle b/vertx-grpc-server/build.gradle
new file mode 100644
index 00000000..0af706c3
--- /dev/null
+++ b/vertx-grpc-server/build.gradle
@@ -0,0 +1,96 @@
+plugins {
+ id 'java-library'
+ id 'com.google.protobuf' version '0.9.4'
+}
+
+evaluationDependsOn(":vertx-grpc-protoc-plugin2")
+
+sourceSets {
+ main {
+ java {
+ exclude 'examples/**'
+ }
+ }
+ examples {
+ java {
+ srcDir 'src/main/java/examples'
+ }
+ resources {
+ srcDir 'src/main/proto'
+ }
+ }
+}
+
+configurations {
+ examplesImplementation
+ examplesRuntimeOnly
+ protocPlugin
+}
+
+dependencies {
+ api "io.vertx:vertx-core:$vertxVersion"
+ api project(':vertx-grpc-common')
+
+ implementation "io.vertx:vertx-auth-common:$vertxVersion"
+ implementation "io.vertx:vertx-web:$vertxVersion"
+ implementation("io.grpc:grpc-protobuf:$grpcIoVersion") {
+ exclude group: 'com.google.guava', module: 'guava'
+ }
+
+ implementation("io.grpc:grpc-stub:$grpcIoVersion") {
+ exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
+ }
+ implementation("io.grpc:grpc-api:$grpcIoVersion") {
+ exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
+ exclude group: 'org.codehaus.mojo', module: 'animal-sniffer-annotations'
+ }
+
+ testCompileOnly "javax.annotation:javax.annotation-api:1.3.2"
+
+ testImplementation project(':vertx-grpc-client')
+ testImplementation project(path: ':vertx-grpc-common', configuration: 'testClasses')
+ testImplementation "io.vertx:vertx-unit:$vertxVersion"
+ testImplementation "org.bouncycastle:bcpkix-jdk15on:1.65"
+ testImplementation("io.grpc:grpc-netty:$grpcIoVersion") {
+ exclude group: '*', module: '*'
+ }
+
+ examplesImplementation project(':vertx-grpc-common')
+ examplesImplementation("io.grpc:grpc-protobuf:$grpcIoVersion") {
+ exclude group: 'com.google.guava', module: 'guava'
+ }
+ examplesImplementation "io.grpc:grpc-stub:$grpcIoVersion"
+ examplesImplementation "io.grpc:grpc-api:$grpcIoVersion"
+
+ protocPlugin project(path: ':vertx-grpc-protoc-plugin2', configuration: 'shadow')
+
+}
+
+protobuf {
+ protoc {
+ artifact = "com.google.protobuf:protoc:3.25.5"
+ }
+ plugins {
+ grpc {
+ artifact = "io.grpc:protoc-gen-grpc-java:$grpcIoVersion"
+ }
+ vertx {
+ path = tasks.getByPath(':vertx-grpc-protoc-plugin2:shadowJar').archiveFile.get().asFile
+ }
+ }
+ generateProtoTasks {
+ all().each { task ->
+ task.plugins {
+ grpc {}
+ vertx {}
+ }
+ task.dependsOn ':vertx-grpc-protoc-plugin2:shadowJar'
+ }
+ ofSourceSet('examples').each { task ->
+ task.plugins {
+ grpc {} // For regular gRPC classes
+ vertx {} // For Vert.x specific classes
+ }
+ }
+ }
+}
diff --git a/vertx-grpc-server/pom.xml b/vertx-grpc-server/pom.xml
deleted file mode 100644
index 4af5769d..00000000
--- a/vertx-grpc-server/pom.xml
+++ /dev/null
@@ -1,162 +0,0 @@
-
-
-
-
- 4.0.0
-
-
- io.vertx
- vertx-grpc-aggregator
- 5.0.0-SNAPSHOT
- ../pom.xml
-
-
- vertx-grpc-server
-
- Vert.x gRPC Server
-
-
- false
- ${project.basedir}/src/main/resources/META-INF/MANIFEST.MF
-
-
-
-
- io.vertx
- vertx-grpc-common
-
-
- io.grpc
- grpc-stub
- ${grpc.version}
-
-
- com.google.errorprone
- error_prone_annotations
-
-
-
-
-
- io.vertx
- vertx-grpc-common
- ${project.version}
- test-jar
- test
-
-
- io.vertx
- vertx-unit
- test
-
-
-
- org.bouncycastle
- bcpkix-jdk15on
- 1.65
- test
-
-
- io.grpc
- grpc-netty
- ${grpc.version}
- test
-
-
- com.google.guava
- guava
-
-
- com.google.errorprone
- error_prone_annotations
-
-
- org.codehaus.mojo
- animal-sniffer-annotations
-
-
- io.netty
- *
-
-
-
-
-
-
-
-
- kr.motd.maven
- os-maven-plugin
- 1.4.1.Final
-
-
-
-
- org.xolstice.maven.plugins
- protobuf-maven-plugin
- 0.6.1
-
-
- com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
- grpc-java
- io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
-
-
-
- compile
-
- compile
- compile-custom
-
-
- ${project.basedir}/src/main/java
- false
-
-
-
- test-compile
-
- test-compile
- test-compile-custom
-
-
-
-
-
-
-
-
-
- java-11
-
- (1.8,)
-
-
-
- javax.annotation
- javax.annotation-api
- 1.3.2
-
-
-
-
-
-
diff --git a/vertx-grpc-server/src/main/java/examples/Empty.java b/vertx-grpc-server/src/main/java/examples/Empty.java
index 7e85bb6b..dfb79ef1 100644
--- a/vertx-grpc-server/src/main/java/examples/Empty.java
+++ b/vertx-grpc-server/src/main/java/examples/Empty.java
@@ -1,6 +1,7 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: streaming.proto
+// Protobuf Java Version: 3.25.5
package examples;
/**
@@ -25,11 +26,6 @@ protected java.lang.Object newInstance(
return new Empty();
}
- @java.lang.Override
- public final com.google.protobuf.UnknownFieldSet
- getUnknownFields() {
- return this.unknownFields;
- }
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return examples.StreamingProto.internal_static_streaming_Empty_descriptor;
@@ -141,11 +137,13 @@ public static examples.Empty parseFrom(
return com.google.protobuf.GeneratedMessageV3
.parseWithIOException(PARSER, input, extensionRegistry);
}
+
public static examples.Empty parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input);
}
+
public static examples.Empty parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
diff --git a/vertx-grpc-server/src/main/java/examples/EmptyOrBuilder.java b/vertx-grpc-server/src/main/java/examples/EmptyOrBuilder.java
index 77dd6fab..ad115e1e 100644
--- a/vertx-grpc-server/src/main/java/examples/EmptyOrBuilder.java
+++ b/vertx-grpc-server/src/main/java/examples/EmptyOrBuilder.java
@@ -1,6 +1,7 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: streaming.proto
+// Protobuf Java Version: 3.25.5
package examples;
public interface EmptyOrBuilder extends
diff --git a/vertx-grpc-server/src/main/java/examples/GreeterGrpc.java b/vertx-grpc-server/src/main/java/examples/GreeterGrpc.java
index 297ee427..f08ea87d 100644
--- a/vertx-grpc-server/src/main/java/examples/GreeterGrpc.java
+++ b/vertx-grpc-server/src/main/java/examples/GreeterGrpc.java
@@ -8,14 +8,14 @@
*
*/
@javax.annotation.Generated(
- value = "by gRPC proto compiler (version 1.50.2)",
+ value = "by gRPC proto compiler (version 1.68.1)",
comments = "Source: helloworld.proto")
@io.grpc.stub.annotations.GrpcGenerated
public final class GreeterGrpc {
private GreeterGrpc() {}
- public static final String SERVICE_NAME = "helloworld.Greeter";
+ public static final java.lang.String SERVICE_NAME = "helloworld.Greeter";
// Static method descriptors that strictly reflect the proto.
private static volatile io.grpc.MethodDescriptor
*/
- public static abstract class GreeterImplBase implements io.grpc.BindableService {
+ public interface AsyncService {
/**
*
* Sends a greeting
*
*/
- public void sayHello(examples.HelloRequest request,
+ default void sayHello(examples.HelloRequest request,
io.grpc.stub.StreamObserver responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getSayHelloMethod(), responseObserver);
}
+ }
+
+ /**
+ * Base class for the server implementation of the service Greeter.
+ *
+ * The greeting service definition.
+ *
+ */
+ public static abstract class GreeterImplBase
+ implements io.grpc.BindableService, AsyncService {
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
- return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
- .addMethod(
- getSayHelloMethod(),
- io.grpc.stub.ServerCalls.asyncUnaryCall(
- new MethodHandlers<
- examples.HelloRequest,
- examples.HelloReply>(
- this, METHODID_SAY_HELLO)))
- .build();
+ return GreeterGrpc.bindService(this);
}
}
/**
+ * A stub to allow clients to do asynchronous rpc calls to service Greeter.
*
* The greeting service definition.
*
*/
- public static final class GreeterStub extends io.grpc.stub.AbstractAsyncStub {
+ public static final class GreeterStub
+ extends io.grpc.stub.AbstractAsyncStub {
private GreeterStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
@@ -153,11 +157,13 @@ public void sayHello(examples.HelloRequest request,
}
/**
+ * A stub to allow clients to do synchronous rpc calls to service Greeter.
*
* The greeting service definition.
*
*/
- public static final class GreeterBlockingStub extends io.grpc.stub.AbstractBlockingStub {
+ public static final class GreeterBlockingStub
+ extends io.grpc.stub.AbstractBlockingStub {
private GreeterBlockingStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
@@ -181,11 +187,13 @@ public examples.HelloReply sayHello(examples.HelloRequest request) {
}
/**
+ * A stub to allow clients to do ListenableFuture-style rpc calls to service Greeter.
*
* The greeting service definition.
*
*/
- public static final class GreeterFutureStub extends io.grpc.stub.AbstractFutureStub {
+ public static final class GreeterFutureStub
+ extends io.grpc.stub.AbstractFutureStub {
private GreeterFutureStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
@@ -216,10 +224,10 @@ private static final class MethodHandlers implements
io.grpc.stub.ServerCalls.ServerStreamingMethod,
io.grpc.stub.ServerCalls.ClientStreamingMethod,
io.grpc.stub.ServerCalls.BidiStreamingMethod {
- private final GreeterImplBase serviceImpl;
+ private final AsyncService serviceImpl;
private final int methodId;
- MethodHandlers(GreeterImplBase serviceImpl, int methodId) {
+ MethodHandlers(AsyncService serviceImpl, int methodId) {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@@ -248,6 +256,18 @@ public io.grpc.stub.StreamObserver invoke(
}
}
+ public static final io.grpc.ServerServiceDefinition bindService(AsyncService service) {
+ return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
+ .addMethod(
+ getSayHelloMethod(),
+ io.grpc.stub.ServerCalls.asyncUnaryCall(
+ new MethodHandlers<
+ examples.HelloRequest,
+ examples.HelloReply>(
+ service, METHODID_SAY_HELLO)))
+ .build();
+ }
+
private static abstract class GreeterBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier {
GreeterBaseDescriptorSupplier() {}
@@ -271,9 +291,9 @@ private static final class GreeterFileDescriptorSupplier
private static final class GreeterMethodDescriptorSupplier
extends GreeterBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
- private final String methodName;
+ private final java.lang.String methodName;
- GreeterMethodDescriptorSupplier(String methodName) {
+ GreeterMethodDescriptorSupplier(java.lang.String methodName) {
this.methodName = methodName;
}
diff --git a/vertx-grpc-server/src/main/java/examples/HelloReply.java b/vertx-grpc-server/src/main/java/examples/HelloReply.java
index 3732a298..8d2d2e12 100644
--- a/vertx-grpc-server/src/main/java/examples/HelloReply.java
+++ b/vertx-grpc-server/src/main/java/examples/HelloReply.java
@@ -1,6 +1,7 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: helloworld.proto
+// Protobuf Java Version: 3.25.5
package examples;
/**
@@ -30,11 +31,6 @@ protected java.lang.Object newInstance(
return new HelloReply();
}
- @java.lang.Override
- public final com.google.protobuf.UnknownFieldSet
- getUnknownFields() {
- return this.unknownFields;
- }
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return examples.HelloWorldProto.internal_static_helloworld_HelloReply_descriptor;
@@ -195,11 +191,13 @@ public static examples.HelloReply parseFrom(
return com.google.protobuf.GeneratedMessageV3
.parseWithIOException(PARSER, input, extensionRegistry);
}
+
public static examples.HelloReply parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input);
}
+
public static examples.HelloReply parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
diff --git a/vertx-grpc-server/src/main/java/examples/HelloReplyOrBuilder.java b/vertx-grpc-server/src/main/java/examples/HelloReplyOrBuilder.java
index ba60b071..b581c220 100644
--- a/vertx-grpc-server/src/main/java/examples/HelloReplyOrBuilder.java
+++ b/vertx-grpc-server/src/main/java/examples/HelloReplyOrBuilder.java
@@ -1,6 +1,7 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: helloworld.proto
+// Protobuf Java Version: 3.25.5
package examples;
public interface HelloReplyOrBuilder extends
diff --git a/vertx-grpc-server/src/main/java/examples/HelloRequest.java b/vertx-grpc-server/src/main/java/examples/HelloRequest.java
index 9bfe7632..a5e21353 100644
--- a/vertx-grpc-server/src/main/java/examples/HelloRequest.java
+++ b/vertx-grpc-server/src/main/java/examples/HelloRequest.java
@@ -1,6 +1,7 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: helloworld.proto
+// Protobuf Java Version: 3.25.5
package examples;
/**
@@ -30,11 +31,6 @@ protected java.lang.Object newInstance(
return new HelloRequest();
}
- @java.lang.Override
- public final com.google.protobuf.UnknownFieldSet
- getUnknownFields() {
- return this.unknownFields;
- }
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return examples.HelloWorldProto.internal_static_helloworld_HelloRequest_descriptor;
@@ -195,11 +191,13 @@ public static examples.HelloRequest parseFrom(
return com.google.protobuf.GeneratedMessageV3
.parseWithIOException(PARSER, input, extensionRegistry);
}
+
public static examples.HelloRequest parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input);
}
+
public static examples.HelloRequest parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
diff --git a/vertx-grpc-server/src/main/java/examples/HelloRequestOrBuilder.java b/vertx-grpc-server/src/main/java/examples/HelloRequestOrBuilder.java
index 1ccf4a99..0255e74d 100644
--- a/vertx-grpc-server/src/main/java/examples/HelloRequestOrBuilder.java
+++ b/vertx-grpc-server/src/main/java/examples/HelloRequestOrBuilder.java
@@ -1,6 +1,7 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: helloworld.proto
+// Protobuf Java Version: 3.25.5
package examples;
public interface HelloRequestOrBuilder extends
diff --git a/vertx-grpc-server/src/main/java/examples/HelloWorldProto.java b/vertx-grpc-server/src/main/java/examples/HelloWorldProto.java
index 07970283..68286694 100644
--- a/vertx-grpc-server/src/main/java/examples/HelloWorldProto.java
+++ b/vertx-grpc-server/src/main/java/examples/HelloWorldProto.java
@@ -1,6 +1,7 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: helloworld.proto
+// Protobuf Java Version: 3.25.5
package examples;
public final class HelloWorldProto {
diff --git a/vertx-grpc-server/src/main/java/examples/Item.java b/vertx-grpc-server/src/main/java/examples/Item.java
index 1a7eccb1..4297dd74 100644
--- a/vertx-grpc-server/src/main/java/examples/Item.java
+++ b/vertx-grpc-server/src/main/java/examples/Item.java
@@ -1,6 +1,7 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: streaming.proto
+// Protobuf Java Version: 3.25.5
package examples;
/**
@@ -26,11 +27,6 @@ protected java.lang.Object newInstance(
return new Item();
}
- @java.lang.Override
- public final com.google.protobuf.UnknownFieldSet
- getUnknownFields() {
- return this.unknownFields;
- }
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return examples.StreamingProto.internal_static_streaming_Item_descriptor;
@@ -191,11 +187,13 @@ public static examples.Item parseFrom(
return com.google.protobuf.GeneratedMessageV3
.parseWithIOException(PARSER, input, extensionRegistry);
}
+
public static examples.Item parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input);
}
+
public static examples.Item parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
diff --git a/vertx-grpc-server/src/main/java/examples/ItemOrBuilder.java b/vertx-grpc-server/src/main/java/examples/ItemOrBuilder.java
index 8fcd126e..18e44d36 100644
--- a/vertx-grpc-server/src/main/java/examples/ItemOrBuilder.java
+++ b/vertx-grpc-server/src/main/java/examples/ItemOrBuilder.java
@@ -1,6 +1,7 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: streaming.proto
+// Protobuf Java Version: 3.25.5
package examples;
public interface ItemOrBuilder extends
diff --git a/vertx-grpc-server/src/main/java/examples/StreamingGrpc.java b/vertx-grpc-server/src/main/java/examples/StreamingGrpc.java
index 731789e8..ebdba056 100644
--- a/vertx-grpc-server/src/main/java/examples/StreamingGrpc.java
+++ b/vertx-grpc-server/src/main/java/examples/StreamingGrpc.java
@@ -8,14 +8,14 @@
*
*/
@javax.annotation.Generated(
- value = "by gRPC proto compiler (version 1.50.2)",
+ value = "by gRPC proto compiler (version 1.68.1)",
comments = "Source: streaming.proto")
@io.grpc.stub.annotations.GrpcGenerated
public final class StreamingGrpc {
private StreamingGrpc() {}
- public static final String SERVICE_NAME = "streaming.Streaming";
+ public static final java.lang.String SERVICE_NAME = "streaming.Streaming";
// Static method descriptors that strictly reflect the proto.
private static volatile io.grpc.MethodDescriptor
*/
- public static abstract class StreamingImplBase implements io.grpc.BindableService {
+ public interface AsyncService {
/**
*/
- public void source(examples.Empty request,
+ default void source(examples.Empty request,
io.grpc.stub.StreamObserver responseObserver) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getSourceMethod(), responseObserver);
}
/**
*/
- public io.grpc.stub.StreamObserver sink(
+ default io.grpc.stub.StreamObserver sink(
io.grpc.stub.StreamObserver responseObserver) {
return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(getSinkMethod(), responseObserver);
}
/**
*/
- public io.grpc.stub.StreamObserver pipe(
+ default io.grpc.stub.StreamObserver pipe(
io.grpc.stub.StreamObserver responseObserver) {
return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(getPipeMethod(), responseObserver);
}
+ }
+
+ /**
+ * Base class for the server implementation of the service Streaming.
+ *
+ * Interface exported by the server.
+ *
+ */
+ public static abstract class StreamingImplBase
+ implements io.grpc.BindableService, AsyncService {
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
- return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
- .addMethod(
- getSourceMethod(),
- io.grpc.stub.ServerCalls.asyncServerStreamingCall(
- new MethodHandlers<
- examples.Empty,
- examples.Item>(
- this, METHODID_SOURCE)))
- .addMethod(
- getSinkMethod(),
- io.grpc.stub.ServerCalls.asyncClientStreamingCall(
- new MethodHandlers<
- examples.Item,
- examples.Empty>(
- this, METHODID_SINK)))
- .addMethod(
- getPipeMethod(),
- io.grpc.stub.ServerCalls.asyncBidiStreamingCall(
- new MethodHandlers<
- examples.Item,
- examples.Item>(
- this, METHODID_PIPE)))
- .build();
+ return StreamingGrpc.bindService(this);
}
}
/**
+ * A stub to allow clients to do asynchronous rpc calls to service Streaming.
*
* Interface exported by the server.
*
*/
- public static final class StreamingStub extends io.grpc.stub.AbstractAsyncStub {
+ public static final class StreamingStub
+ extends io.grpc.stub.AbstractAsyncStub {
private StreamingStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
@@ -253,11 +243,13 @@ public io.grpc.stub.StreamObserver pipe(
}
/**
+ * A stub to allow clients to do synchronous rpc calls to service Streaming.
*
* Interface exported by the server.
*
*/
- public static final class StreamingBlockingStub extends io.grpc.stub.AbstractBlockingStub {
+ public static final class StreamingBlockingStub
+ extends io.grpc.stub.AbstractBlockingStub {
private StreamingBlockingStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
@@ -279,11 +271,13 @@ public java.util.Iterator source(
}
/**
+ * A stub to allow clients to do ListenableFuture-style rpc calls to service Streaming.
*
* Interface exported by the server.
*
*/
- public static final class StreamingFutureStub extends io.grpc.stub.AbstractFutureStub {
+ public static final class StreamingFutureStub
+ extends io.grpc.stub.AbstractFutureStub {
private StreamingFutureStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
@@ -305,10 +299,10 @@ private static final class MethodHandlers implements
io.grpc.stub.ServerCalls.ServerStreamingMethod,
io.grpc.stub.ServerCalls.ClientStreamingMethod,
io.grpc.stub.ServerCalls.BidiStreamingMethod {
- private final StreamingImplBase serviceImpl;
+ private final AsyncService serviceImpl;
private final int methodId;
- MethodHandlers(StreamingImplBase serviceImpl, int methodId) {
+ MethodHandlers(AsyncService serviceImpl, int methodId) {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@@ -343,6 +337,32 @@ public io.grpc.stub.StreamObserver invoke(
}
}
+ public static final io.grpc.ServerServiceDefinition bindService(AsyncService service) {
+ return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
+ .addMethod(
+ getSourceMethod(),
+ io.grpc.stub.ServerCalls.asyncServerStreamingCall(
+ new MethodHandlers<
+ examples.Empty,
+ examples.Item>(
+ service, METHODID_SOURCE)))
+ .addMethod(
+ getSinkMethod(),
+ io.grpc.stub.ServerCalls.asyncClientStreamingCall(
+ new MethodHandlers<
+ examples.Item,
+ examples.Empty>(
+ service, METHODID_SINK)))
+ .addMethod(
+ getPipeMethod(),
+ io.grpc.stub.ServerCalls.asyncBidiStreamingCall(
+ new MethodHandlers<
+ examples.Item,
+ examples.Item>(
+ service, METHODID_PIPE)))
+ .build();
+ }
+
private static abstract class StreamingBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier {
StreamingBaseDescriptorSupplier() {}
@@ -366,9 +386,9 @@ private static final class StreamingFileDescriptorSupplier
private static final class StreamingMethodDescriptorSupplier
extends StreamingBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
- private final String methodName;
+ private final java.lang.String methodName;
- StreamingMethodDescriptorSupplier(String methodName) {
+ StreamingMethodDescriptorSupplier(java.lang.String methodName) {
this.methodName = methodName;
}
diff --git a/vertx-grpc-server/src/main/java/examples/StreamingProto.java b/vertx-grpc-server/src/main/java/examples/StreamingProto.java
index 4ad5518d..7988c431 100644
--- a/vertx-grpc-server/src/main/java/examples/StreamingProto.java
+++ b/vertx-grpc-server/src/main/java/examples/StreamingProto.java
@@ -1,6 +1,7 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: streaming.proto
+// Protobuf Java Version: 3.25.5
package examples;
public final class StreamingProto {
diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServer.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServer.java
index 1cdbfe4c..a0ea1c86 100644
--- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServer.java
+++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServer.java
@@ -18,6 +18,8 @@
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.RoutingContext;
import io.vertx.grpc.server.impl.GrpcServerImpl;
/**
@@ -44,6 +46,7 @@ public interface GrpcServer extends Handler {
/**
* Create a blank gRPC server
*
+ * @param vertx the vertx instance
* @return the created server
*/
static GrpcServer server(Vertx vertx) {
@@ -68,4 +71,8 @@ static GrpcServer server(Vertx vertx) {
@GenIgnore(GenIgnore.PERMITTED_TYPE)
GrpcServer callHandler(MethodDescriptor methodDesc, Handler> handler);
+ Handler routeHandler();
+
+ @Fluent
+ GrpcServer mount(Router router);
}
diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerRequest.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerRequest.java
index 374a2eca..bb9cd0e7 100644
--- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerRequest.java
+++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerRequest.java
@@ -16,6 +16,8 @@
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpConnection;
+import io.vertx.ext.auth.User;
+import io.vertx.ext.web.RoutingContext;
import io.vertx.grpc.common.GrpcError;
import io.vertx.grpc.common.GrpcMessage;
import io.vertx.grpc.common.GrpcReadStream;
@@ -78,4 +80,6 @@ public interface GrpcServerRequest extends GrpcReadStream {
* @return the underlying HTTP connection
*/
HttpConnection connection();
+
+ RoutingContext routingContext();
}
diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java
index f664b3ea..5c9ed81c 100644
--- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java
+++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java
@@ -14,14 +14,20 @@
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.RoutingContext;
import io.vertx.grpc.common.GrpcMessageDecoder;
import io.vertx.grpc.common.GrpcMessageEncoder;
import io.vertx.grpc.common.impl.GrpcMethodCall;
import io.vertx.grpc.server.GrpcServer;
import io.vertx.grpc.server.GrpcServerRequest;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -32,6 +38,7 @@ public class GrpcServerImpl implements GrpcServer {
private final Vertx vertx;
private Handler> requestHandler;
private Map> methodCallHandlers = new HashMap<>();
+ private List routers = new ArrayList<>();
public GrpcServerImpl(Vertx vertx) {
this.vertx = vertx;
@@ -39,15 +46,19 @@ public GrpcServerImpl(Vertx vertx) {
@Override
public void handle(HttpServerRequest httpRequest) {
+ handle(httpRequest, null);
+ }
+
+ public void handle(HttpServerRequest httpRequest, RoutingContext routingContext) {
GrpcMethodCall methodCall = new GrpcMethodCall(httpRequest.path());
String fmn = methodCall.fullMethodName();
MethodCallHandler, ?> method = methodCallHandlers.get(fmn);
if (method != null) {
- handle(method, httpRequest, methodCall);
+ handle(method, httpRequest, routingContext, methodCall);
} else {
Handler> handler = requestHandler;
if (handler != null) {
- GrpcServerRequestImpl grpcRequest = new GrpcServerRequestImpl<>(httpRequest, GrpcMessageDecoder.IDENTITY, GrpcMessageEncoder.IDENTITY, methodCall);
+ GrpcServerRequestImpl grpcRequest = new GrpcServerRequestImpl<>(httpRequest, routingContext, GrpcMessageDecoder.IDENTITY, GrpcMessageEncoder.IDENTITY, methodCall);
grpcRequest.init();
handler.handle(grpcRequest);
} else {
@@ -56,8 +67,8 @@ public void handle(HttpServerRequest httpRequest) {
}
}
- private void handle(MethodCallHandler method, HttpServerRequest httpRequest, GrpcMethodCall methodCall) {
- GrpcServerRequestImpl grpcRequest = new GrpcServerRequestImpl<>(httpRequest, method.messageDecoder, method.messageEncoder, methodCall);
+ private void handle(MethodCallHandler method, HttpServerRequest httpRequest, RoutingContext routingContext, GrpcMethodCall methodCall) {
+ GrpcServerRequestImpl grpcRequest = new GrpcServerRequestImpl<>(httpRequest, routingContext, method.messageDecoder, method.messageEncoder, methodCall);
grpcRequest.init();
method.handle(grpcRequest);
}
@@ -69,19 +80,55 @@ public GrpcServer callHandler(Handler> handler
public GrpcServer callHandler(MethodDescriptor methodDesc, Handler> handler) {
if (handler != null) {
- methodCallHandlers.put(methodDesc.getFullMethodName(), new MethodCallHandler<>(methodDesc, GrpcMessageDecoder.unmarshaller(methodDesc.getRequestMarshaller()), GrpcMessageEncoder.marshaller(methodDesc.getResponseMarshaller()), handler));
+ MethodCallHandler methodCallHandler = new MethodCallHandler<>(methodDesc, GrpcMessageDecoder.unmarshaller(methodDesc.getRequestMarshaller()), GrpcMessageEncoder.marshaller(methodDesc.getResponseMarshaller()), handler);
+ methodCallHandlers.put(methodDesc.getFullMethodName(), methodCallHandler);
+ for (Router router : routers) {
+ addToRouter(router, methodCallHandler);
+ }
} else {
- methodCallHandlers.remove(methodDesc.getFullMethodName());
+ MethodCallHandler, ?> methodCallHandler = methodCallHandlers.remove(methodDesc.getFullMethodName());
+ if (methodCallHandler != null) {
+ removeFromRouter(methodCallHandler);
+ }
}
return this;
}
+ private void removeFromRouter(MethodCallHandler, ?> methodCallHandler) {
+ for (Route route : methodCallHandler.routes) {
+ route.remove();
+ }
+ methodCallHandler.routes.clear();
+ }
+
+ @Override
+ public Handler routeHandler() {
+ return ctx -> handle(ctx.request(), ctx);
+ }
+
+ @Override
+ public GrpcServer mount(Router router) {
+ routers.add(router);
+ for (MethodCallHandler, ?> methodCallHandler : methodCallHandlers.values()) {
+ addToRouter(router, methodCallHandler);
+ }
+ return this;
+ }
+
+ private void addToRouter(Router router, MethodCallHandler, ?> methodCallHandler) {
+ Route route = router.route(HttpMethod.POST, '/' + methodCallHandler.def.getFullMethodName())
+ .consumes("application/grpc")
+ .handler(this.routeHandler());
+ methodCallHandler.routes.add(route);
+ }
+
private static class MethodCallHandler implements Handler> {
final MethodDescriptor def;
final GrpcMessageDecoder messageDecoder;
final GrpcMessageEncoder messageEncoder;
final Handler> handler;
+ final List routes = new ArrayList<>();
MethodCallHandler(MethodDescriptor def, GrpcMessageDecoder messageDecoder, GrpcMessageEncoder messageEncoder, Handler> handler) {
this.def = def;
diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java
index 4c2e5720..6583a012 100644
--- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java
+++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java
@@ -10,12 +10,13 @@
*/
package io.vertx.grpc.server.impl;
-import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.impl.HttpServerRequestInternal;
+import io.vertx.ext.auth.User;
+import io.vertx.ext.web.RoutingContext;
import io.vertx.grpc.common.CodecException;
import io.vertx.grpc.common.GrpcMessageDecoder;
import io.vertx.grpc.common.GrpcMessageEncoder;
@@ -31,12 +32,14 @@
public class GrpcServerRequestImpl extends GrpcReadStreamBase, Req> implements GrpcServerRequest {
final HttpServerRequest httpRequest;
+ final RoutingContext routingContext;
final GrpcServerResponse response;
private GrpcMethodCall methodCall;
- public GrpcServerRequestImpl(HttpServerRequest httpRequest, GrpcMessageDecoder messageDecoder, GrpcMessageEncoder messageEncoder, GrpcMethodCall methodCall) {
+ public GrpcServerRequestImpl(HttpServerRequest httpRequest, RoutingContext routingContext, GrpcMessageDecoder messageDecoder, GrpcMessageEncoder messageEncoder, GrpcMethodCall methodCall) {
super(((HttpServerRequestInternal) httpRequest).context(), httpRequest, httpRequest.headers().get("grpc-encoding"), messageDecoder);
this.httpRequest = httpRequest;
+ this.routingContext = routingContext;
this.response = new GrpcServerResponseImpl<>(this, httpRequest.response(), messageEncoder);
this.methodCall = methodCall;
}
@@ -65,6 +68,11 @@ public String methodName() {
return methodCall.methodName();
}
+ @Override
+ public RoutingContext routingContext() {
+ return this.routingContext;
+ }
+
@Override
public GrpcServerRequest handler(Handler handler) {
if (handler != null) {
diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java
index dfb5ff00..b50b1d02 100644
--- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java
+++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java
@@ -102,6 +102,11 @@ public Future write(Resp message) {
return writeMessage(encoder.encode(message));
}
+ @Override
+ public void write(Resp resp, Handler> handler) {
+ write(resp).onComplete(handler);
+ }
+
@Override
public Future end(Resp message) {
return endMessage(encoder.encode(message));
@@ -121,6 +126,11 @@ public Future end() {
return writeMessage(null, true);
}
+ @Override
+ public void end(Handler> handler) {
+ end().onComplete(handler);
+ }
+
@Override
public GrpcServerResponse setWriteQueueMaxSize(int maxSize) {
httpResponse.setWriteQueueMaxSize(maxSize);