diff --git a/vertx-core/src/main/java/io/vertx/core/http/HttpClientRequest.java b/vertx-core/src/main/java/io/vertx/core/http/HttpClientRequest.java index 2c9b01bc966..c56389137a0 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/HttpClientRequest.java +++ b/vertx-core/src/main/java/io/vertx/core/http/HttpClientRequest.java @@ -18,6 +18,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.net.HostAndPort; import io.vertx.core.net.NetSocket; +import io.vertx.core.streams.Pipe; import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.WriteStream; @@ -370,6 +371,8 @@ default Future send(Buffer body) { *

If the {@link HttpHeaders#CONTENT_LENGTH} is set then the request assumes this is the * length of the {stream}, otherwise the request will set a chunked {@link HttpHeaders#CONTENT_ENCODING}. * + *

When the {@code body} stream fails, the request is reset with the CANCEL {@literal 0x8} error code.f

+ * * @return a future notified when the HTTP response is available */ default Future send(ReadStream body) { @@ -377,7 +380,11 @@ default Future send(ReadStream body) { if (headers == null || !headers.contains(HttpHeaders.CONTENT_LENGTH)) { setChunked(true); } - body.pipeTo(this); + Future result = body + .pipe() + .endOnFailure(false) + .to(this); + result.onFailure(err -> reset(0x8, err)); // CANCEL return response(); } diff --git a/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java b/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java index 1c59c71e4ca..2ea294568c4 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/HttpTest.java @@ -6048,6 +6048,72 @@ private void testClientRequestWithLargeBodyInSmallChunks(boolean chunked, BiFunc await(); } + @Test + public void testClientRequestSendFailure() throws Exception { + waitFor(2); + CompletableFuture chunkContinuation = new CompletableFuture<>(); + server.requestHandler(request -> { + request.handler(chunk -> { + chunkContinuation.complete(null); + }); + request.exceptionHandler(err -> { + if (request.version() == HttpVersion.HTTP_2) { + assertEquals(StreamResetException.class, err.getClass()); + StreamResetException sre = (StreamResetException) err; + assertEquals(0x8, sre.getCode()); + } else { + assertEquals(HttpClosedException.class, err.getClass()); + } + complete(); + }); + request.endHandler(v -> { + fail(); + }); + }); + startServer(testAddress); + Throwable failure = new Throwable(); + client.request(requestOptions).compose(request -> request.send(new ReadStream<>() { + Handler exceptionHandler; + @Override + public ReadStream exceptionHandler(@Nullable Handler handler) { + exceptionHandler = handler; + return this; + } + @Override + public ReadStream handler(@Nullable Handler handler) { + if (handler != null) { + vertx.runOnContext(v1 -> { + handler.handle(Buffer.buffer("chunk-1")); + chunkContinuation.whenComplete((v2, err) -> { + exceptionHandler.handle(failure); + }); + }); + } + return this; + } + @Override + public ReadStream pause() { + return this; + } + @Override + public ReadStream resume() { + return this; + } + @Override + public ReadStream fetch(long amount) { + return this; + } + @Override + public ReadStream endHandler(@Nullable Handler endHandler) { + return this; + } + })).onComplete(onFailure(err -> { + assertSame(failure, err.getCause()); + complete(); + })); + await(); + } + @Test public void testClientRequestFlowControlDifferentEventLoops() throws Exception { Promise resume = Promise.promise();