Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/main/java/io/vertx/core/http/HttpServerResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ public interface HttpServerResponse extends WriteStream<Buffer> {
@Fluent
HttpServerResponse endHandler(@Nullable Handler<Void> handler);

/**
* Send the response headers.
*
* @return a future notified by the success or failure of the write
*/
Future<Void> writeHead();

/**
* Write a {@link String} to the response body, encoded using the encoding {@code enc}.
*
Expand Down
62 changes: 4 additions & 58 deletions src/main/java/io/vertx/core/http/impl/AssembledHttpResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,19 @@
*
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
*/
class AssembledHttpResponse implements io.netty.handler.codec.http.HttpResponse, HttpContent {
class AssembledHttpResponse extends VertxHttpResponse implements HttpContent {

private boolean head;
private HttpResponseStatus status;
private HttpVersion version;
private HttpHeaders headers;
private final ByteBuf content;
private DecoderResult result = DecoderResult.SUCCESS;

AssembledHttpResponse(boolean head, HttpVersion version, HttpResponseStatus status, HttpHeaders headers) {
this(head, version, status, headers, Unpooled.EMPTY_BUFFER);
}

AssembledHttpResponse(boolean head, HttpVersion version, HttpResponseStatus status, HttpHeaders headers, ByteBuf content) {
this.head = head;
this.status = status;
this.version = version;
this.headers = headers;
super(head, version, status, headers);
this.content = content;
}

boolean head() {
return head;
}

@Override
public HttpContent copy() {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -80,36 +68,14 @@ public AssembledHttpResponse retain(int increment) {
return this;
}

@Override
public HttpResponseStatus getStatus() {
return status;
}

@Override
public AssembledHttpResponse setStatus(HttpResponseStatus status) {
this.status = status;
return this;
return (AssembledHttpResponse) super.setStatus(status);
}

@Override
public AssembledHttpResponse setProtocolVersion(HttpVersion version) {
this.version = version;
return this;
}

@Override
public HttpVersion getProtocolVersion() {
return version;
}

@Override
public HttpVersion protocolVersion() {
return version;
}

@Override
public HttpResponseStatus status() {
return status;
return (AssembledHttpResponse) super.setProtocolVersion(version);
}

@Override
Expand All @@ -124,26 +90,6 @@ public AssembledHttpResponse touch(Object hint) {
return this;
}

@Override
public DecoderResult decoderResult() {
return result;
}

@Override
public HttpHeaders headers() {
return headers;
}

@Override
public DecoderResult getDecoderResult() {
return result;
}

@Override
public void setDecoderResult(DecoderResult result) {
this.result = result;
}

@Override
public ByteBuf content() {
return content;
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/vertx/core/http/impl/Http1xServerResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,25 @@ public HttpServerResponse endHandler(@Nullable Handler<Void> handler) {
}
}

@Override
public Future<Void> writeHead() {
PromiseInternal<Void> promise = context.promise();
synchronized (conn) {
if (headWritten) {
throw new IllegalStateException();
}
if (!headers.contains(HttpHeaders.TRANSFER_ENCODING) && !headers.contains(HttpHeaders.CONTENT_LENGTH)) {
throw new IllegalStateException("You must set the Content-Length header to be the total size of the message "
+ "body BEFORE sending any data if you are not using HTTP chunked encoding.");
}
VertxHttpResponse msg;
prepareHeaders(-1);
msg = new VertxHttpResponse(head, version, status, headers);
conn.writeToChannel(msg, promise);
}
return promise.future();
}

@Override
public Future<Void> write(Buffer chunk) {
PromiseInternal<Void> promise = context.promise();
Expand Down
23 changes: 16 additions & 7 deletions src/main/java/io/vertx/core/http/impl/Http2ServerResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,14 @@ public HttpServerResponse writeContinue() {
}
}

@Override
public Future<Void> writeHead() {
synchronized (conn) {
checkHeadWritten();
}
return checkSendHeaders(false, true);
}

@Override
public Future<Void> writeEarlyHints(MultiMap headers) {
PromiseInternal<Void> promise = stream.context.promise();
Expand Down Expand Up @@ -431,7 +439,7 @@ Future<NetSocket> netSocket() {
synchronized (conn) {
if (netSocket == null) {
status = HttpResponseStatus.OK;
if (!checkSendHeaders(false)) {
if (checkSendHeaders(false) == null) {
netSocket = stream.context.failedFuture("Response for CONNECT already sent");
} else {
HttpNetSocket ns = HttpNetSocket.netSocket(conn, stream.context, (ReadStream<Buffer>) stream.request, this);
Expand Down Expand Up @@ -464,7 +472,7 @@ void write(ByteBuf chunk, boolean end, Handler<AsyncResult<Void>> handler) {
if (end && !headWritten && needsContentLengthHeader()) {
headers().set(HttpHeaderNames.CONTENT_LENGTH, HttpUtils.positiveLongToString(chunk.readableBytes()));
}
boolean sent = checkSendHeaders(end && !hasBody && trailers == null, !hasBody);
boolean sent = checkSendHeaders(end && !hasBody && trailers == null, !hasBody) != null;
if (hasBody || (!sent && end)) {
stream.writeData(chunk, end && trailers == null, handler);
} else {
Expand Down Expand Up @@ -493,11 +501,11 @@ private boolean needsContentLengthHeader() {
return stream.method != HttpMethod.HEAD && status != HttpResponseStatus.NOT_MODIFIED && !headers.contains(HttpHeaderNames.CONTENT_LENGTH);
}

private boolean checkSendHeaders(boolean end) {
private Future<Void> checkSendHeaders(boolean end) {
return checkSendHeaders(end, true);
}

private boolean checkSendHeaders(boolean end, boolean checkFlush) {
private Future<Void> checkSendHeaders(boolean end, boolean checkFlush) {
if (!headWritten) {
if (headersEndHandler != null) {
headersEndHandler.handle(null);
Expand All @@ -507,10 +515,11 @@ private boolean checkSendHeaders(boolean end, boolean checkFlush) {
}
prepareHeaders();
headWritten = true;
stream.writeHeaders(headers, end, checkFlush, null);
return true;
Promise<Void> promise = stream.context.promise();
stream.writeHeaders(headers, end, checkFlush, promise);
return promise.future();
} else {
return false;
return null;
}
}

Expand Down
79 changes: 79 additions & 0 deletions src/main/java/io/vertx/core/http/impl/VertxHttpResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.vertx.core.http.impl;

import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;

class VertxHttpResponse implements io.netty.handler.codec.http.HttpResponse {

private boolean head;
private HttpResponseStatus status;
private HttpVersion version;
private HttpHeaders headers;
private DecoderResult result = DecoderResult.SUCCESS;

VertxHttpResponse(boolean head, HttpVersion version, HttpResponseStatus status, HttpHeaders headers) {
this.head = head;
this.status = status;
this.version = version;
this.headers = headers;
}

boolean head() {
return head;
}

@Override
public HttpResponseStatus getStatus() {
return status;
}

@Override
public VertxHttpResponse setStatus(HttpResponseStatus status) {
this.status = status;
return this;
}

@Override
public VertxHttpResponse setProtocolVersion(HttpVersion version) {
this.version = version;
return this;
}

@Override
public HttpVersion getProtocolVersion() {
return version;
}

@Override
public HttpVersion protocolVersion() {
return version;
}

@Override
public HttpResponseStatus status() {
return status;
}

@Override
public DecoderResult decoderResult() {
return result;
}

@Override
public HttpHeaders headers() {
return headers;
}

@Override
public DecoderResult getDecoderResult() {
return result;
}

@Override
public void setDecoderResult(DecoderResult result) {
this.result = result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
protected boolean isContentAlwaysEmpty(HttpResponse msg) {
// In HttpServerCodec this is tracked via a FIFO queue of HttpMethod
// here we track it in the assembled response as we don't use HttpServerCodec
return (msg instanceof AssembledHttpResponse && ((AssembledHttpResponse) msg).head()) || super.isContentAlwaysEmpty(msg);
return (msg instanceof VertxHttpResponse && ((VertxHttpResponse) msg).head()) || super.isContentAlwaysEmpty(msg);
}
}
44 changes: 44 additions & 0 deletions src/test/java/io/vertx/core/http/HttpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6916,4 +6916,48 @@ private void testConcurrentWrites(Function<HttpClientRequest, Future<HttpClientR
await();
assertEquals("msg1msg2", received.get());
}

@Test
public void testHttpServerResponseWriteHead() throws Exception {
waitFor(2);
AtomicReference<Runnable> continuation = new AtomicReference<>();
server.requestHandler(req -> {
HttpServerResponse resp = req.response();
continuation.set(() -> resp.end("body"));
try {
resp.writeHead().onComplete(onSuccess(v -> {
complete();
}));
assertEquals(HttpVersion.HTTP_2, req.version());
} catch (IllegalStateException ignore) {
resp
.setChunked(true)
.writeHead()
.onComplete(onSuccess(v -> {
complete();
}));
}
});
startServer(testAddress);
client.request(requestOptions).onComplete(onSuccess(req -> {
req.send().onComplete(onSuccess(response -> {
response.handler(chunk -> {
fail();
});
response.endHandler(v -> {
fail();
});
vertx.setTimer(200, id -> {
response.handler(null);
response.endHandler(null);
response.bodyHandler(body -> {
assertEquals("body", body.toString());
complete();
});
continuation.get().run();
});
}));
}));
await();
}
}
Loading