Skip to content

Commit cbb5087

Browse files
committed
Reset the request when failure happens when send an HttpClientRequest stream.
Motivation: HttpClientRequest#send(ReadStream<Buffer>) pipes the stream to the request with the default behavior that is to end the stream. Instead it should signal the error and reset the stream.
1 parent a4eaffd commit cbb5087

File tree

2 files changed

+74
-1
lines changed

2 files changed

+74
-1
lines changed

vertx-core/src/main/java/io/vertx/core/http/HttpClientRequest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.vertx.core.buffer.Buffer;
1919
import io.vertx.core.net.HostAndPort;
2020
import io.vertx.core.net.NetSocket;
21+
import io.vertx.core.streams.Pipe;
2122
import io.vertx.core.streams.ReadStream;
2223
import io.vertx.core.streams.WriteStream;
2324

@@ -370,14 +371,20 @@ default Future<HttpClientResponse> send(Buffer body) {
370371
* <p> If the {@link HttpHeaders#CONTENT_LENGTH} is set then the request assumes this is the
371372
* length of the {stream}, otherwise the request will set a chunked {@link HttpHeaders#CONTENT_ENCODING}.
372373
*
374+
* <p>When the {@code body} stream fails, the request is reset with the CANCEL {@literal 0x8} error code.f</p>
375+
*
373376
* @return a future notified when the HTTP response is available
374377
*/
375378
default Future<HttpClientResponse> send(ReadStream<Buffer> body) {
376379
MultiMap headers = headers();
377380
if (headers == null || !headers.contains(HttpHeaders.CONTENT_LENGTH)) {
378381
setChunked(true);
379382
}
380-
body.pipeTo(this);
383+
Future<Void> result = body
384+
.pipe()
385+
.endOnFailure(false)
386+
.to(this);
387+
result.onFailure(err -> reset(0x8, err)); // CANCEL
381388
return response();
382389
}
383390

vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6048,6 +6048,72 @@ private void testClientRequestWithLargeBodyInSmallChunks(boolean chunked, BiFunc
60486048
await();
60496049
}
60506050

6051+
@Test
6052+
public void testClientRequestSendFailure() throws Exception {
6053+
waitFor(2);
6054+
CompletableFuture<Void> chunkContinuation = new CompletableFuture<>();
6055+
server.requestHandler(request -> {
6056+
request.handler(chunk -> {
6057+
chunkContinuation.complete(null);
6058+
});
6059+
request.exceptionHandler(err -> {
6060+
if (request.version() == HttpVersion.HTTP_2) {
6061+
assertEquals(StreamResetException.class, err.getClass());
6062+
StreamResetException sre = (StreamResetException) err;
6063+
assertEquals(0x8, sre.getCode());
6064+
} else {
6065+
assertEquals(HttpClosedException.class, err.getClass());
6066+
}
6067+
complete();
6068+
});
6069+
request.endHandler(v -> {
6070+
fail();
6071+
});
6072+
});
6073+
startServer(testAddress);
6074+
Throwable failure = new Throwable();
6075+
client.request(requestOptions).compose(request -> request.send(new ReadStream<>() {
6076+
Handler<Throwable> exceptionHandler;
6077+
@Override
6078+
public ReadStream<Buffer> exceptionHandler(@Nullable Handler<Throwable> handler) {
6079+
exceptionHandler = handler;
6080+
return this;
6081+
}
6082+
@Override
6083+
public ReadStream<Buffer> handler(@Nullable Handler<Buffer> handler) {
6084+
if (handler != null) {
6085+
vertx.runOnContext(v1 -> {
6086+
handler.handle(Buffer.buffer("chunk-1"));
6087+
chunkContinuation.whenComplete((v2, err) -> {
6088+
exceptionHandler.handle(failure);
6089+
});
6090+
});
6091+
}
6092+
return this;
6093+
}
6094+
@Override
6095+
public ReadStream<Buffer> pause() {
6096+
return this;
6097+
}
6098+
@Override
6099+
public ReadStream<Buffer> resume() {
6100+
return this;
6101+
}
6102+
@Override
6103+
public ReadStream<Buffer> fetch(long amount) {
6104+
return this;
6105+
}
6106+
@Override
6107+
public ReadStream<Buffer> endHandler(@Nullable Handler<Void> endHandler) {
6108+
return this;
6109+
}
6110+
})).onComplete(onFailure(err -> {
6111+
assertSame(failure, err.getCause());
6112+
complete();
6113+
}));
6114+
await();
6115+
}
6116+
60516117
@Test
60526118
public void testClientRequestFlowControlDifferentEventLoops() throws Exception {
60536119
Promise<Void> resume = Promise.promise();

0 commit comments

Comments
 (0)