Skip to content

Commit afca224

Browse files
authored
Merge branch 'main' into compression
2 parents 26e3f8d + 99a34a1 commit afca224

File tree

12 files changed

+323
-105
lines changed

12 files changed

+323
-105
lines changed

vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientRequestImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ public GrpcClientRequestImpl(HttpClientRequest httpRequest,
105105
});
106106
}
107107

108+
@Override
109+
protected Future<Void> sendHead() {
110+
return httpRequest.sendHead();
111+
}
112+
108113
@Override
109114
public GrpcClientRequest<Req, Resp> serviceName(ServiceName serviceName) {
110115
this.serviceName = serviceName;
@@ -161,7 +166,7 @@ public GrpcClientRequest<Req, Resp> idleTimeout(long timeout) {
161166
}
162167

163168
@Override
164-
protected void setHeaders(String contentType, MultiMap headers, boolean isEnd) {
169+
protected void setHeaders(String contentType, MultiMap headers) {
165170
ServiceName serviceName = this.serviceName;
166171
String methodName = this.methodName;
167172
if (serviceName == null) {

vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcWriteStreamBase.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -186,10 +186,12 @@ public final Future<Void> end() {
186186
return writeMessage(null, true);
187187
}
188188

189-
protected abstract void setHeaders(String contentType, MultiMap headers, boolean isEnd);
189+
protected abstract void setHeaders(String contentType, MultiMap headers);
190190
protected abstract void setTrailers(MultiMap trailers);
191+
191192
protected abstract Future<Void> sendMessage(Buffer message, boolean compressed);
192193
protected abstract Future<Void> sendEnd();
194+
protected abstract Future<Void> sendHead();
193195

194196
protected String contentType(WireFormat wireFormat) {
195197
if (wireFormat != null) {
@@ -206,16 +208,17 @@ protected String contentType(WireFormat wireFormat) {
206208
return mediaType;
207209
}
208210

209-
private Future<Void> writeMessage(GrpcMessage message, boolean end) {
211+
public final Future<Void> writeHead() {
212+
return writeMessage(null, false);
213+
}
214+
215+
protected Future<Void> writeMessage(GrpcMessage message, boolean end) {
210216
if (error != null) {
211217
throw new IllegalStateException("The stream is failed: " + error);
212218
}
213219
if (trailersSent) {
214220
throw new IllegalStateException("The stream has been closed");
215221
}
216-
if (message == null && !end) {
217-
throw new IllegalStateException();
218-
}
219222
if (message != null) {
220223
if (format == null) {
221224
format = message.format();
@@ -284,19 +287,21 @@ private Future<Void> writeMessage(GrpcMessage message, boolean end) {
284287
if (!headersSent) {
285288
headersSent = true;
286289
String contentType = contentType(format);
287-
setHeaders(contentType, headers, end);
290+
setHeaders(contentType, headers);
288291
}
289292
if (end) {
290-
if (!trailersSent) {
291-
trailersSent = true;
292-
}
293+
trailersSent = true;
293294
if (payload != null) {
294295
sendMessage(payload, compressed);
295296
}
296297
setTrailers(trailers);
297298
return sendEnd();
298299
} else {
299-
return sendMessage(payload, compressed);
300+
if (message != null) {
301+
return sendMessage(payload, compressed);
302+
} else {
303+
return sendHead();
304+
}
300305
}
301306
}
302307
}

vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerResponse.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ public interface GrpcServerResponse<Req, Resp> extends GrpcWriteStream<Resp> {
6262
@Override
6363
GrpcServerResponse<Req, Resp> drainHandler(@Nullable Handler<Void> handler);
6464

65+
/**
66+
* Send the response headers.
67+
*
68+
* @return a future notified by the success or failure of the write
69+
*/
70+
Future<Void> writeHead();
71+
6572
default Future<Void> send(Resp item) {
6673
return end(item);
6774
}

vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerResponseImpl.java

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ public abstract class GrpcServerResponseImpl<Req, Resp> extends GrpcWriteStreamB
3333

3434
private final GrpcServerRequestImpl<Req, Resp> request;
3535
private final HttpServerResponse httpResponse;
36-
private final GrpcProtocol protocol;
3736
private GrpcStatus status = GrpcStatus.OK;
3837
private String statusMessage;
3938
private boolean trailersOnly;
@@ -49,7 +48,6 @@ public GrpcServerResponseImpl(ContextInternal context,
4948
super(context, protocol.mediaType(), httpResponse, encoder, compressors, decompressors);
5049
this.request = request;
5150
this.httpResponse = httpResponse;
52-
this.protocol = protocol;
5351
}
5452

5553
public GrpcServerResponse<Req, Resp> status(GrpcStatus status) {
@@ -104,6 +102,7 @@ public void fail(Throwable failure) {
104102
end();
105103
}
106104

105+
// TODO : remove this
107106
public boolean isTrailersOnly() {
108107
return trailersOnly;
109108
}
@@ -118,11 +117,13 @@ protected void sendCancel() {
118117
.onSuccess(v -> handleError(GrpcError.CANCELLED));
119118
}
120119

121-
protected void setHeaders(String contentType, MultiMap grpcHeaders, boolean isEnd) {
122-
trailersOnly = status != GrpcStatus.OK && isEnd;
120+
protected void setHeaders(String contentType, MultiMap grpcHeaders) {
123121
MultiMap httpHeaders = httpResponse.headers();
124122
httpHeaders.set("content-type", contentType);
125123
encodeGrpcHeaders(grpcHeaders, httpHeaders);
124+
if (trailersOnly) {
125+
encodeGrpcStatus(httpHeaders);
126+
}
126127
}
127128

128129
protected void encodeGrpcHeaders(MultiMap grpcHeaders, MultiMap httpHeaders) {
@@ -134,25 +135,40 @@ protected void encodeGrpcHeaders(MultiMap grpcHeaders, MultiMap httpHeaders) {
134135
}
135136

136137
protected void setTrailers(MultiMap grpcTrailers) {
138+
MultiMap httpTrailers;
139+
if (trailersOnly) {
140+
httpTrailers = httpResponse.headers();
141+
} else {
142+
httpTrailers = httpResponse.trailers();
143+
}
144+
encodeGrpcTrailers(grpcTrailers, httpTrailers);
145+
encodeGrpcStatus(httpTrailers);
137146
}
138147

139148
protected void encodeGrpcTrailers(MultiMap grpcTrailers, MultiMap httpTrailers) {
140-
MultiMap httpHeaders = httpResponse.headers();
141149
if (grpcTrailers != null && !grpcTrailers.isEmpty()) {
142-
for (Map.Entry<String, String> trailer : grpcTrailers) {
143-
httpTrailers.add(trailer.getKey(), trailer.getValue());
150+
for (Map.Entry<String, String> header : grpcTrailers) {
151+
httpTrailers.add(header.getKey(), header.getValue());
144152
}
145153
}
146-
if (!httpHeaders.contains(GrpcHeaderNames.GRPC_STATUS)) {
147-
httpTrailers.set(GrpcHeaderNames.GRPC_STATUS, status.toString());
154+
}
155+
156+
/**
157+
* Encode grpc status and status message in the specified {@code entries} map.
158+
*
159+
* @param entries the map updated with grpc specific headers
160+
*/
161+
protected void encodeGrpcStatus(MultiMap entries) {
162+
if (!entries.contains(GrpcHeaderNames.GRPC_STATUS)) {
163+
entries.set(GrpcHeaderNames.GRPC_STATUS, status.toString());
148164
}
149165
if (status != GrpcStatus.OK) {
150166
String msg = statusMessage;
151-
if (msg != null && !httpHeaders.contains(GrpcHeaderNames.GRPC_MESSAGE)) {
152-
httpTrailers.set(GrpcHeaderNames.GRPC_MESSAGE, UrlEscapers.urlFragmentEscaper().escape(msg));
167+
if (msg != null && !entries.contains(GrpcHeaderNames.GRPC_MESSAGE)) {
168+
entries.set(GrpcHeaderNames.GRPC_MESSAGE, UrlEscapers.urlFragmentEscaper().escape(msg));
153169
}
154170
} else {
155-
httpTrailers.remove(GrpcHeaderNames.GRPC_MESSAGE);
171+
entries.remove(GrpcHeaderNames.GRPC_MESSAGE);
156172
}
157173
}
158174

@@ -166,6 +182,11 @@ protected Future<Void> sendEnd() {
166182
return httpResponse.end();
167183
}
168184

185+
@Override
186+
protected Future<Void> sendHead() {
187+
return httpResponse.writeHead();
188+
}
189+
169190
protected Buffer encodeMessage(Buffer message, boolean compressed, boolean trailer) {
170191
return GrpcMessageImpl.encode(message, compressed, trailer);
171192
}
@@ -179,4 +200,15 @@ private static GrpcStatus mapStatus(Throwable t) {
179200
return GrpcStatus.UNKNOWN;
180201
}
181202
}
203+
204+
private boolean headersSent;
205+
206+
@Override
207+
protected Future<Void> writeMessage(GrpcMessage message, boolean end) {
208+
if (!headersSent) {
209+
headersSent = true;
210+
trailersOnly = status != GrpcStatus.OK && end;
211+
}
212+
return super.writeMessage(message, end);
213+
}
182214
}

vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/Http2GrpcServerResponse.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323

2424
public class Http2GrpcServerResponse<Req, Resp> extends GrpcServerResponseImpl<Req, Resp> {
2525

26-
private final HttpServerResponse httpResponse;
27-
2826
public Http2GrpcServerResponse(ContextInternal context,
2927
GrpcServerRequestImpl<Req, Resp> request,
3028
GrpcProtocol protocol,
@@ -33,8 +31,6 @@ public Http2GrpcServerResponse(ContextInternal context,
3331
Map<String, GrpcCompressor> compressors,
3432
Map<String, GrpcDecompressor> decompressors) {
3533
super(context, request, protocol, httpResponse, encoder, compressors, decompressors);
36-
37-
this.httpResponse = httpResponse;
3834
}
3935

4036
@Override
@@ -43,15 +39,4 @@ protected void encodeGrpcHeaders(MultiMap grpcHeaders, MultiMap httpHeaders) {
4339
httpHeaders.set(GrpcHeaderNames.GRPC_ENCODING, encoding);
4440
httpHeaders.set(GrpcHeaderNames.GRPC_ACCEPT_ENCODING, String.join(",", decompressors.keySet()));
4541
}
46-
47-
@Override
48-
protected void setTrailers(MultiMap grpcTrailers) {
49-
MultiMap httpTrailers;
50-
if (isTrailersOnly()) {
51-
httpTrailers = httpResponse.headers();
52-
} else {
53-
httpTrailers = httpResponse.trailers();
54-
}
55-
encodeGrpcTrailers(grpcTrailers, httpTrailers);
56-
}
5742
}

vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/WebGrpcServerResponse.java

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
package io.vertx.grpc.server.impl;
1212

1313
import io.netty.handler.codec.base64.Base64;
14+
import io.vertx.core.Future;
1415
import io.vertx.core.MultiMap;
1516
import io.vertx.core.buffer.Buffer;
1617
import io.vertx.core.http.HttpHeaders;
@@ -30,6 +31,7 @@ public class WebGrpcServerResponse<Req, Resp> extends GrpcServerResponseImpl<Req
3031

3132
private final GrpcProtocol protocol;
3233
private final HttpServerResponse httpResponse;
34+
private Buffer trailers;
3335

3436
public WebGrpcServerResponse(ContextInternal context,
3537
GrpcServerRequestImpl<Req, Resp> request,
@@ -44,6 +46,18 @@ public WebGrpcServerResponse(ContextInternal context,
4446
this.httpResponse = httpResponse;
4547
}
4648

49+
private void appendToTrailers(MultiMap entries) {
50+
if (trailers == null) {
51+
trailers = Buffer.buffer();
52+
}
53+
for (Map.Entry<String, String> trailer : entries) {
54+
trailers.appendString(trailer.getKey())
55+
.appendByte((byte) ':')
56+
.appendString(trailer.getValue())
57+
.appendString("\r\n");
58+
}
59+
}
60+
4761
@Override
4862
protected Buffer encodeMessage(Buffer message, boolean compressed, boolean trailer) {
4963
message = super.encodeMessage(message, compressed, trailer);
@@ -54,27 +68,31 @@ protected Buffer encodeMessage(Buffer message, boolean compressed, boolean trail
5468
}
5569

5670
@Override
57-
protected void setHeaders(String contentType, MultiMap grpcHeaders, boolean isEnd) {
58-
super.setHeaders(contentType, grpcHeaders, isEnd);
59-
// We need chunked to get response trailers to be sent
71+
protected void setHeaders(String contentType, MultiMap grpcHeaders) {
6072
httpResponse.setChunked(!isTrailersOnly());
73+
super.setHeaders(contentType, grpcHeaders);
6174
}
6275

6376
@Override
6477
protected void setTrailers(MultiMap grpcTrailers) {
6578
if (isTrailersOnly()) {
6679
encodeGrpcTrailers(grpcTrailers, httpResponse.headers());
6780
} else {
68-
MultiMap httpTrailers = HttpHeaders.headers();
69-
encodeGrpcTrailers(grpcTrailers, httpTrailers);
70-
Buffer buffer = Buffer.buffer();
71-
for (Map.Entry<String, String> trailer : httpTrailers) {
72-
buffer.appendString(trailer.getKey())
73-
.appendByte((byte) ':')
74-
.appendString(trailer.getValue())
75-
.appendString("\r\n");
76-
}
77-
httpResponse.write(encodeMessage(buffer, false, true));
81+
MultiMap buffer = HttpHeaders.headers();
82+
super.encodeGrpcStatus(buffer);
83+
appendToTrailers(buffer);
84+
appendToTrailers(grpcTrailers);
85+
}
86+
}
87+
88+
@Override
89+
protected Future<Void> sendEnd() {
90+
if (trailers != null) {
91+
Future<Void> ret = httpResponse.end(encodeMessage(trailers, false, true));
92+
trailers = null;
93+
return ret;
94+
} else {
95+
return httpResponse.end();
7896
}
7997
}
8098
}

0 commit comments

Comments
 (0)