diff --git a/src/main/java/software/amazon/awssdk/crt/http/Http1StreamManager.java b/src/main/java/software/amazon/awssdk/crt/http/Http1StreamManager.java index 7868b27c5..0059e2289 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/Http1StreamManager.java +++ b/src/main/java/software/amazon/awssdk/crt/http/Http1StreamManager.java @@ -49,6 +49,21 @@ public CompletableFuture acquireStream(HttpRequest request, return this.acquireStream((HttpRequestBase) request, streamHandler); } + /** + * Request an HTTP/1.1 HttpStream from StreamManager. + * + * @param request HttpRequest. The Request to make to the Server. + * @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop + * @param useManualDataWrites A boolean variable to signal that body will be streamed using async writes. + * @return A future for a HttpStream that will be completed when the stream is + * acquired. + * @throws CrtRuntimeException Exception happens from acquiring stream. + */ + public CompletableFuture acquireStream(HttpRequest request, + HttpStreamBaseResponseHandler streamHandler, boolean useManualDataWrites) { + return this.acquireStream((HttpRequestBase) request, streamHandler, useManualDataWrites); + } + /** * Request an HTTP/1.1 HttpStream from StreamManager. * @@ -60,6 +75,21 @@ public CompletableFuture acquireStream(HttpRequest request, */ public CompletableFuture acquireStream(HttpRequestBase request, HttpStreamBaseResponseHandler streamHandler) { + return this.acquireStream(request, streamHandler, false); // overloading to ensure backward-compatibility + } + + /** + * Request an HTTP/1.1 HttpStream from StreamManager. + * + * @param request HttpRequestBase. The Request to make to the Server. + * @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop + * @param useManualDataWrites A boolean variable to signal that body will be streamed using async writes. + * @return A future for a HttpStream that will be completed when the stream is + * acquired. + * @throws CrtRuntimeException Exception happens from acquiring stream. + */ + public CompletableFuture acquireStream(HttpRequestBase request, + HttpStreamBaseResponseHandler streamHandler, boolean useManualDataWrites) { CompletableFuture completionFuture = new CompletableFuture<>(); HttpClientConnectionManager connManager = this.connectionManager; @@ -91,7 +121,7 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) { /* Release the connection back */ connManager.releaseConnection(conn); } - }); + }, useManualDataWrites); completionFuture.complete((HttpStream) stream); /* Active the stream for user */ try { diff --git a/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManager.java b/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManager.java index bcd86461d..5e959622c 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManager.java +++ b/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManager.java @@ -151,17 +151,48 @@ private Http2StreamManager(Http2StreamManagerOptions options) { public CompletableFuture acquireStream(Http2Request request, HttpStreamBaseResponseHandler streamHandler) { - return this.acquireStream((HttpRequestBase) request, streamHandler); + return this.acquireStream((HttpRequestBase) request, streamHandler, false); + } + + /** + * Request a Http2Stream from StreamManager. + * + * @param request The Request to make to the Server. + * @param streamHandler The Stream Handler to be called from the Native + * EventLoop + * @param useManualDataWrites When {@code true}, request body data is provided via + * {@link HttpStreamBase#writeData} instead of from the + * request's {@link HttpRequestBodyStream}. See + * {@link Http2ClientConnection#makeRequest(HttpRequestBase, HttpStreamBaseResponseHandler, boolean)} + * for caveats on combining a body stream with manual writes. + * @return A future for a Http2Stream that will be completed when the stream is + * acquired. + */ + public CompletableFuture acquireStream(Http2Request request, + HttpStreamBaseResponseHandler streamHandler, boolean useManualDataWrites) { + + return this.acquireStream((HttpRequestBase) request, streamHandler, useManualDataWrites); } public CompletableFuture acquireStream(HttpRequest request, HttpStreamBaseResponseHandler streamHandler) { - return this.acquireStream((HttpRequestBase) request, streamHandler); + return this.acquireStream((HttpRequestBase) request, streamHandler, false); + } + + public CompletableFuture acquireStream(HttpRequest request, + HttpStreamBaseResponseHandler streamHandler, boolean useManualDataWrites) { + + return this.acquireStream((HttpRequestBase) request, streamHandler, useManualDataWrites); } public CompletableFuture acquireStream(HttpRequestBase request, HttpStreamBaseResponseHandler streamHandler) { + return this.acquireStream(request, streamHandler, false); + } + + public CompletableFuture acquireStream(HttpRequestBase request, + HttpStreamBaseResponseHandler streamHandler, boolean useManualDataWrites) { CompletableFuture completionFuture = new CompletableFuture<>(); AsyncCallback acquireStreamCompleted = AsyncCallback.wrapFuture(completionFuture, null); if (isNull()) { @@ -174,7 +205,8 @@ public CompletableFuture acquireStream(HttpRequestBase request, request.marshalForJni(), request.getBodyStream(), new HttpStreamResponseHandlerNativeAdapter(streamHandler), - acquireStreamCompleted); + acquireStreamCompleted, + useManualDataWrites); } catch (CrtRuntimeException ex) { completionFuture.completeExceptionally(ex); } @@ -278,7 +310,8 @@ private static native void http2StreamManagerAcquireStream(long stream_manager, byte[] marshalledRequest, HttpRequestBodyStream bodyStream, HttpStreamResponseHandlerNativeAdapter responseHandler, - AsyncCallback completedCallback) throws CrtRuntimeException; + AsyncCallback completedCallback, + boolean useManualDataWrites) throws CrtRuntimeException; private static native HttpManagerMetrics http2StreamManagerFetchMetrics(long stream_manager) throws CrtRuntimeException; } diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamManager.java b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamManager.java index d816bfdb5..ee9dfbfd7 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamManager.java +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamManager.java @@ -55,7 +55,27 @@ public CompletableFuture acquireStream(HttpRequestBase request, return this.h2StreamManager.acquireStream(request, streamHandler) .thenApply(stream -> (HttpStreamBase) stream); } else { - return this.h1StreamManager.acquireStream(request, streamHandler) + return this.h1StreamManager.acquireStream(request, streamHandler, false) + .thenApply(stream -> (HttpStreamBase) stream); + } + } + + /** + * Request an HttpStream from StreamManager. + * + * @param request HttpRequestBase. The Request to make to the Server. + * @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop + * @param useManualDataWrites A boolean variable to signal that body will be streamed using async writes. + * @return A future for a HttpStreamBase that will be completed when the stream is + * acquired. + */ + public CompletableFuture acquireStream(HttpRequestBase request, + HttpStreamBaseResponseHandler streamHandler, boolean useManualDataWrites) { + if (this.h2StreamManager != null) { + return this.h2StreamManager.acquireStream(request, streamHandler, useManualDataWrites) + .thenApply(stream -> (HttpStreamBase) stream); + } else { + return this.h1StreamManager.acquireStream(request, streamHandler, useManualDataWrites) .thenApply(stream -> (HttpStreamBase) stream); } } diff --git a/src/native/http2_stream_manager.c b/src/native/http2_stream_manager.c index af86f2bcc..4a31b7933 100644 --- a/src/native/http2_stream_manager.c +++ b/src/native/http2_stream_manager.c @@ -365,7 +365,8 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_Http2StreamManager_h jbyteArray marshalled_request, jobject jni_http_request_body_stream, jobject jni_http_response_callback_handler, - jobject java_async_callback) { + jobject java_async_callback, + jboolean jni_use_manual_data_writes) { (void)jni_class; aws_cache_jni_ids(env); @@ -411,6 +412,12 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_Http2StreamManager_h .on_complete = aws_java_http_stream_on_stream_complete_fn, .on_destroy = aws_java_http_stream_on_stream_destroy_fn, .user_data = stream_binding, + /* aws_http2_stream_manager only forwards http2_use_manual_data_writes (not the + * unified use_manual_data_writes) through to aws_http_connection_make_request. + * Set the H2-specific flag here so manual writes actually take effect on the + * stream-manager path. Revisit when aws-c-http's http2_stream_manager.c is + * updated to forward use_manual_data_writes. */ + .http2_use_manual_data_writes = jni_use_manual_data_writes, }; struct aws_allocator *allocator = aws_jni_get_allocator(); diff --git a/src/native/http_request_response.c b/src/native/http_request_response.c index b2bb63690..19efa84ab 100644 --- a/src/native/http_request_response.c +++ b/src/native/http_request_response.c @@ -611,7 +611,15 @@ JNIEXPORT jint JNICALL Java_software_amazon_awssdk_crt_http_HttpStream_httpStrea struct http_stream_write_data_callback_data { struct http_stream_binding *stream_cb_data; - struct aws_byte_buf data_buf; + /* + * Global ref to the Java byte[] passed in by the caller, and the aws_byte_cursor acquired + * from it via aws_jni_byte_cursor_from_jbyteArray_acquire. Held on callback_data so the + * JNI-side bytes stay valid for the full async write, and released from + * s_cleanup_write_data_callback_data once the write completes (or fails synchronously). + * Avoids copying the payload into a native aws_byte_buf before sending. + */ + jbyteArray data_array; + struct aws_byte_cursor data_cur; struct aws_input_stream *data_stream; jobject completion_callback; }; @@ -622,7 +630,11 @@ static void s_cleanup_write_data_callback_data( if (callback_data->data_stream) { aws_input_stream_destroy(callback_data->data_stream); } - aws_byte_buf_clean_up(&callback_data->data_buf); + if (callback_data->data_array) { + /* Release the JNI pin/copy (needs the jbyteArray + original cursor), then drop our global ref. */ + aws_jni_byte_cursor_from_jbyteArray_release(env, callback_data->data_array, callback_data->data_cur); + (*env)->DeleteGlobalRef(env, callback_data->data_array); + } (*env)->DeleteGlobalRef(env, callback_data->completion_callback); aws_mem_release(aws_jni_get_allocator(), callback_data); } @@ -676,12 +688,19 @@ JNIEXPORT jint JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpS }; if (data != NULL) { - struct aws_byte_cursor data_cur = aws_jni_byte_cursor_from_jbyteArray_acquire(env, data); - aws_byte_buf_init_copy_from_cursor(&callback_data->data_buf, aws_jni_get_allocator(), data_cur); - aws_jni_byte_cursor_from_jbyteArray_release(env, data, data_cur); - - data_cur = aws_byte_cursor_from_buf(&callback_data->data_buf); - callback_data->data_stream = aws_input_stream_new_from_cursor(aws_jni_get_allocator(), &data_cur); + /* + * Promote to a global ref so it survives beyond this JNI frame — the async write + * completion fires on the event-loop thread after this function returns. + * Acquire the cursor once, store both on callback_data, and let + * s_cleanup_write_data_callback_data release them when the write finishes. + * aws_input_stream_new_from_cursor copies the {ptr,len} cursor struct into its own + * state, but does NOT copy the underlying bytes, so this path avoids the payload copy. + */ + callback_data->data_array = (*env)->NewGlobalRef(env, data); + callback_data->data_cur = aws_jni_byte_cursor_from_jbyteArray_acquire(env, data); + + callback_data->data_stream = + aws_input_stream_new_from_cursor(aws_jni_get_allocator(), &callback_data->data_cur); options.data = callback_data->data_stream; } diff --git a/src/test/java/software/amazon/awssdk/crt/test/WriteDataTest.java b/src/test/java/software/amazon/awssdk/crt/test/WriteDataTest.java index f9af855c0..b13f07ef3 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/WriteDataTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/WriteDataTest.java @@ -10,10 +10,17 @@ import software.amazon.awssdk.crt.CRT; import software.amazon.awssdk.crt.CrtResource; import software.amazon.awssdk.crt.http.*; +import software.amazon.awssdk.crt.io.ClientBootstrap; +import software.amazon.awssdk.crt.io.EventLoopGroup; +import software.amazon.awssdk.crt.io.HostResolver; +import software.amazon.awssdk.crt.io.SocketOptions; +import software.amazon.awssdk.crt.io.TlsContext; +import software.amazon.awssdk.crt.io.TlsContextOptions; import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.lang.ref.WeakReference; import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -23,20 +30,52 @@ public class WriteDataTest extends HttpRequestResponseFixture { private final static int H1_TLS_PORT = 8082; private final static int H2_TLS_PORT = 3443; + /** + * Build an {@link HttpStreamManager} suitable for the localhost mock server + * (self-signed cert -> verify peer disabled). Mirrors the setup used by + * the stream-manager and connection-pool tests for localhost. + */ + private HttpStreamManager createLocalhostStreamManager(URI uri, HttpVersion expectedVersion) { + try (EventLoopGroup eventLoopGroup = new EventLoopGroup(1); + HostResolver resolver = new HostResolver(eventLoopGroup); + ClientBootstrap bootstrap = new ClientBootstrap(eventLoopGroup, resolver); + SocketOptions sockOpts = new SocketOptions(); + TlsContextOptions tlsOpts = (expectedVersion == HttpVersion.HTTP_2 + ? TlsContextOptions.createDefaultClient().withAlpnList("h2") + : TlsContextOptions.createDefaultClient().withAlpnList("http/1.1")) + .withVerifyPeer(false); + TlsContext tlsContext = createHttpClientTlsContext(tlsOpts)) { + HttpClientConnectionManagerOptions h1Options = new HttpClientConnectionManagerOptions() + .withClientBootstrap(bootstrap) + .withSocketOptions(sockOpts) + .withTlsContext(tlsContext) + .withUri(uri) + .withMaxConnections(1); + Http2StreamManagerOptions h2Options = new Http2StreamManagerOptions() + .withConnectionManagerOptions(h1Options); + HttpStreamManagerOptions options = new HttpStreamManagerOptions() + .withHTTP1ConnectionManagerOptions(h1Options) + .withHTTP2StreamManagerOptions(h2Options) + .withExpectedProtocol(expectedVersion); + return HttpStreamManager.create(options); + } + } + @Test public void testHttp2WriteData() throws Exception { skipIfAndroid(); skipIfLocalhostUnavailable(); URI uri = new URI(HOST + ":" + H2_TLS_PORT); - byte[] payload = "hello from writeData".getBytes(StandardCharsets.UTF_8); + String expectedBody = "hello from writeData"; + int expectedLen = expectedBody.getBytes(StandardCharsets.UTF_8).length; HttpHeader[] headers = new HttpHeader[]{ new HttpHeader(":method", "PUT"), new HttpHeader(":path", "/echo"), new HttpHeader(":scheme", "https"), new HttpHeader(":authority", uri.getHost()), - new HttpHeader("content-length", Integer.toString(payload.length)), + new HttpHeader("content-length", Integer.toString(expectedLen)), }; Http2Request request = new Http2Request(headers, null); @@ -72,7 +111,33 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) { try (Http2Stream stream = conn.makeRequest(request, streamHandler, true)) { stream.activate(); - stream.writeData(payload, true).get(5, TimeUnit.SECONDS); + + /* + * Issue the write from a helper that: + * 1) allocates the byte[] locally, so once the helper returns the + * caller's stack has no reference to it, + * 2) captures only a WeakReference to the array in its inner callback + * (not the byte[] itself) so the lambda doesn't accidentally become + * a Java-side GC root, + * 3) performs the "is the native GlobalRef still holding the array?" + * assertion INSIDE the write-completion callback -- which is + * guaranteed by the native code to run BEFORE native cleanup + * (Release + DeleteGlobalRef) in s_write_data_complete. + * + * If NewGlobalRef on callback_data were missing or dropped early, the + * array would be unreachable from Java-land at the moment the callback + * fires, System.gc() inside the callback would clear the WeakReference, + * and the helper would complete the returned future exceptionally. + * + * Caveat: GetByteArrayElements may pin the array, and on some JVMs a + * pin can also keep the object GC-reachable. A pass here therefore + * proves "native side is keeping the array alive somehow" rather than + * strictly "GlobalRef is what is keeping it alive". Combined with the + * echo-body-matches assertion below (which would fail on any data + * corruption from premature release), this covers the interesting cases. + */ + CompletableFuture writeFuture = issueWriteAndAssertReachable(stream, expectedBody); + writeFuture.get(5, TimeUnit.SECONDS); reqCompleted.get(60, TimeUnit.SECONDS); } } @@ -82,15 +147,59 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) { Assert.assertEquals(200, response.statusCode); // /echo returns JSON: {"body": "", "bytes": } String body = response.getBody(); - Assert.assertTrue("Response should contain sent body: " + body, - body.contains("\"body\": \"hello from writeData\"")); + Assert.assertTrue("Response should contain sent body intact: " + body, + body.contains("\"body\": \"" + expectedBody + "\"")); Assert.assertTrue("Response should contain byte count: " + body, - body.contains("\"bytes\": " + payload.length)); + body.contains("\"bytes\": " + expectedLen)); shutdownComplete.get(60, TimeUnit.SECONDS); CrtResource.waitForNoResources(); } + /** + * Allocates the payload locally and issues writeData. Inside the write-completion + * callback (which runs on the event-loop thread BEFORE native cleanup -- see + * s_write_data_complete in http_request_response.c), forces GC and asserts that a + * WeakReference to the payload is still live. A pass means the native side is keeping + * the array reachable from the JVM's point of view for the duration of the async write, + * which is what the NewGlobalRef stored on callback_data is there to guarantee. + * + * The lambda captures {@code weak} and {@code future}, never {@code payload} directly, + * so the lambda itself does not become a Java-side strong root for the array. Once this + * method returns, the local {@code payload} is out of scope and the only thing + * reachable from Java-land is whatever the native layer holds. + */ + private CompletableFuture issueWriteAndAssertReachable(HttpStreamBase stream, String body) { + byte[] payload = body.getBytes(StandardCharsets.UTF_8); + WeakReference weak = new WeakReference<>(payload); + CompletableFuture future = new CompletableFuture<>(); + + stream.writeData(payload, true, (errorCode) -> { + try { + /* We're inside CallVoidMethod on the event-loop thread; native cleanup has + * not yet run, so the native GlobalRef is still holding the array. Trigger + * a collection attempt and verify the WeakReference is still live. */ + System.gc(); + if (weak.get() == null) { + future.completeExceptionally(new AssertionError( + "byte[] was reclaimed while native should still hold a GlobalRef on it " + + "(NewGlobalRef on callback_data missing or dropped early?)")); + return; + } + if (errorCode != 0) { + future.completeExceptionally(new RuntimeException( + "writeData failed with errorCode=" + errorCode)); + } else { + future.complete(null); + } + } catch (Throwable t) { + future.completeExceptionally(t); + } + }); + + return future; + } + @Test public void testHttp2WriteDataEndStreamOnly() throws Exception { skipIfAndroid(); @@ -162,11 +271,12 @@ public void testHttp1WriteData() throws Exception { skipIfLocalhostUnavailable(); URI uri = new URI(HOST + ":" + H1_TLS_PORT); - byte[] payload = "hello from writeData h1".getBytes(StandardCharsets.UTF_8); + String expectedBody = "hello from writeData h1"; + int expectedLen = expectedBody.getBytes(StandardCharsets.UTF_8).length; HttpHeader[] headers = new HttpHeader[]{ new HttpHeader("Host", uri.getHost()), - new HttpHeader("Content-Length", Integer.toString(payload.length)), + new HttpHeader("Content-Length", Integer.toString(expectedLen)), }; HttpRequest request = new HttpRequest("PUT", "/echo", headers, null); @@ -202,7 +312,16 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) { // Use the unified makeRequest with useManualDataWrites=true try (HttpStreamBase stream = conn.makeRequest(request, streamHandler, true)) { stream.activate(); - stream.writeData(payload, true).get(5, TimeUnit.SECONDS); + + /* + * Same weak-ref-inside-write-callback assertion as testHttp2WriteData -- + * proves that the H1 writeData path also keeps the byte[] reachable + * from the JVM for the duration of the async write. Helper is shared + * (takes HttpStreamBase), so this exercises the exact same JNI code path + * in http_request_response.c (httpStreamBaseWriteData) as H2. + */ + CompletableFuture writeFuture = issueWriteAndAssertReachable(stream, expectedBody); + writeFuture.get(5, TimeUnit.SECONDS); reqCompleted.get(60, TimeUnit.SECONDS); } } @@ -213,7 +332,7 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) { // H1 /echo returns JSON: {"data": ""} String body = response.getBody(); Assert.assertTrue("Response should contain sent data: " + body, - body.contains("\"data\": \"hello from writeData h1\"")); + body.contains("\"data\": \"" + expectedBody + "\"")); shutdownComplete.get(60, TimeUnit.SECONDS); CrtResource.waitForNoResources(); @@ -326,4 +445,135 @@ public void onResponseComplete(HttpStream stream, int errorCode) {} } } } + + /** + * Smoke test: stream acquired from {@link HttpStreamManager} for HTTP/2 + * correctly threads {@code useManualDataWrites=true} through to the stream + * and allows a simple "hello world" body to be sent via writeData(). + */ + @Test + public void testHttp2StreamManagerWriteData() throws Exception { + skipIfAndroid(); + skipIfLocalhostUnavailable(); + + URI uri = new URI(HOST + ":" + H2_TLS_PORT); + byte[] payload = "hello world".getBytes(StandardCharsets.UTF_8); + + HttpHeader[] headers = new HttpHeader[] { + new HttpHeader(":method", "PUT"), + new HttpHeader(":path", "/echo"), + new HttpHeader(":scheme", "https"), + new HttpHeader(":authority", uri.getHost()), + new HttpHeader("content-length", Integer.toString(payload.length)), + }; + Http2Request request = new Http2Request(headers, null); + + CompletableFuture reqCompleted = new CompletableFuture<>(); + TestHttpResponse response = new TestHttpResponse(); + + CompletableFuture shutdownComplete; + try (HttpStreamManager streamManager = createLocalhostStreamManager(uri, HttpVersion.HTTP_2)) { + shutdownComplete = streamManager.getShutdownCompleteFuture(); + + HttpStreamBaseResponseHandler streamHandler = new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + response.statusCode = responseStatusCode; + response.headers.addAll(Arrays.asList(nextHeaders)); + } + + @Override + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { + response.bodyBuffer.put(bodyBytesIn); + return bodyBytesIn.length; + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + response.onCompleteErrorCode = errorCode; + reqCompleted.complete(null); + } + }; + + try (HttpStreamBase stream = streamManager.acquireStream(request, streamHandler, true) + .get(60, TimeUnit.SECONDS)) { + stream.writeData(payload, true).get(5, TimeUnit.SECONDS); + reqCompleted.get(60, TimeUnit.SECONDS); + } + } + + Assert.assertEquals(CRT.AWS_CRT_SUCCESS, response.onCompleteErrorCode); + Assert.assertEquals(200, response.statusCode); + String body = response.getBody(); + Assert.assertTrue("Response should contain sent body: " + body, + body.contains("\"body\": \"hello world\"")); + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.waitForNoResources(); + } + + /** + * Smoke test: stream acquired from {@link HttpStreamManager} for HTTP/1.1 + * correctly threads {@code useManualDataWrites=true} through to the stream + * and allows a simple "hello world" body to be sent via writeData(). + */ + @Test + public void testHttp1StreamManagerWriteData() throws Exception { + skipIfAndroid(); + skipIfLocalhostUnavailable(); + + URI uri = new URI(HOST + ":" + H1_TLS_PORT); + byte[] payload = "hello world".getBytes(StandardCharsets.UTF_8); + + HttpHeader[] headers = new HttpHeader[] { + new HttpHeader("Host", uri.getHost()), + new HttpHeader("Content-Length", Integer.toString(payload.length)), + }; + HttpRequest request = new HttpRequest("PUT", "/echo", headers, null); + + CompletableFuture reqCompleted = new CompletableFuture<>(); + TestHttpResponse response = new TestHttpResponse(); + + CompletableFuture shutdownComplete; + try (HttpStreamManager streamManager = createLocalhostStreamManager(uri, HttpVersion.HTTP_1_1)) { + shutdownComplete = streamManager.getShutdownCompleteFuture(); + + HttpStreamBaseResponseHandler streamHandler = new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + response.statusCode = responseStatusCode; + response.headers.addAll(Arrays.asList(nextHeaders)); + } + + @Override + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { + response.bodyBuffer.put(bodyBytesIn); + return bodyBytesIn.length; + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + response.onCompleteErrorCode = errorCode; + reqCompleted.complete(null); + } + }; + + try (HttpStreamBase stream = streamManager.acquireStream(request, streamHandler, true) + .get(60, TimeUnit.SECONDS)) { + stream.writeData(payload, true).get(5, TimeUnit.SECONDS); + reqCompleted.get(60, TimeUnit.SECONDS); + } + } + + Assert.assertEquals(CRT.AWS_CRT_SUCCESS, response.onCompleteErrorCode); + Assert.assertEquals(200, response.statusCode); + String body = response.getBody(); + Assert.assertTrue("Response should contain sent data: " + body, + body.contains("\"data\": \"hello world\"")); + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.waitForNoResources(); + } }