Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ public CompletableFuture<HttpStream> acquireStream(HttpRequest request,
return this.acquireStream((HttpRequestBase) request, streamHandler);
}

/**
* Request an HTTP/1.1 HttpStream from StreamManager.
*
* @param request HttpRequest. The Request to make to the Server.
* @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop
* @param useManualDataWrites A boolean variable to signal that body will be streamed using async writes.
* @return A future for a HttpStream that will be completed when the stream is
* acquired.
* @throws CrtRuntimeException Exception happens from acquiring stream.
*/
public CompletableFuture<HttpStream> acquireStream(HttpRequest request,
HttpStreamBaseResponseHandler streamHandler, boolean useManualDataWrites) {
return this.acquireStream((HttpRequestBase) request, streamHandler, useManualDataWrites);
}

/**
* Request an HTTP/1.1 HttpStream from StreamManager.
*
Expand All @@ -60,6 +75,21 @@ public CompletableFuture<HttpStream> acquireStream(HttpRequest request,
*/
public CompletableFuture<HttpStream> acquireStream(HttpRequestBase request,
HttpStreamBaseResponseHandler streamHandler) {
return this.acquireStream(request, streamHandler, false); // overloading to ensure backward-compatibility
}

/**
* Request an HTTP/1.1 HttpStream from StreamManager.
*
* @param request HttpRequestBase. The Request to make to the Server.
* @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop
* @param useManualDataWrites A boolean variable to signal that body will be streamed using async writes.
* @return A future for a HttpStream that will be completed when the stream is
* acquired.
* @throws CrtRuntimeException Exception happens from acquiring stream.
*/
public CompletableFuture<HttpStream> acquireStream(HttpRequestBase request,
HttpStreamBaseResponseHandler streamHandler, boolean useManualDataWrites) {
CompletableFuture<HttpStream> completionFuture = new CompletableFuture<>();
HttpClientConnectionManager connManager = this.connectionManager;

Expand Down Expand Up @@ -91,7 +121,7 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
/* Release the connection back */
connManager.releaseConnection(conn);
}
});
}, useManualDataWrites);
completionFuture.complete((HttpStream) stream);
/* Active the stream for user */
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,48 @@ private Http2StreamManager(Http2StreamManagerOptions options) {
public CompletableFuture<Http2Stream> acquireStream(Http2Request request,
HttpStreamBaseResponseHandler streamHandler) {

return this.acquireStream((HttpRequestBase) request, streamHandler);
return this.acquireStream((HttpRequestBase) request, streamHandler, false);
}

/**
* Request a Http2Stream from StreamManager.
*
* @param request The Request to make to the Server.
* @param streamHandler The Stream Handler to be called from the Native
* EventLoop
* @param useManualDataWrites When {@code true}, request body data is provided via
* {@link HttpStreamBase#writeData} instead of from the
* request's {@link HttpRequestBodyStream}. See
* {@link Http2ClientConnection#makeRequest(HttpRequestBase, HttpStreamBaseResponseHandler, boolean)}
* for caveats on combining a body stream with manual writes.
* @return A future for a Http2Stream that will be completed when the stream is
* acquired.
*/
public CompletableFuture<Http2Stream> acquireStream(Http2Request request,
HttpStreamBaseResponseHandler streamHandler, boolean useManualDataWrites) {

return this.acquireStream((HttpRequestBase) request, streamHandler, useManualDataWrites);
}

public CompletableFuture<Http2Stream> acquireStream(HttpRequest request,
HttpStreamBaseResponseHandler streamHandler) {

return this.acquireStream((HttpRequestBase) request, streamHandler);
return this.acquireStream((HttpRequestBase) request, streamHandler, false);
}

public CompletableFuture<Http2Stream> acquireStream(HttpRequest request,
HttpStreamBaseResponseHandler streamHandler, boolean useManualDataWrites) {

return this.acquireStream((HttpRequestBase) request, streamHandler, useManualDataWrites);
}

public CompletableFuture<Http2Stream> acquireStream(HttpRequestBase request,
HttpStreamBaseResponseHandler streamHandler) {
return this.acquireStream(request, streamHandler, false);
}

public CompletableFuture<Http2Stream> acquireStream(HttpRequestBase request,
HttpStreamBaseResponseHandler streamHandler, boolean useManualDataWrites) {
CompletableFuture<Http2Stream> completionFuture = new CompletableFuture<>();
AsyncCallback acquireStreamCompleted = AsyncCallback.wrapFuture(completionFuture, null);
if (isNull()) {
Expand All @@ -174,7 +205,8 @@ public CompletableFuture<Http2Stream> acquireStream(HttpRequestBase request,
request.marshalForJni(),
request.getBodyStream(),
new HttpStreamResponseHandlerNativeAdapter(streamHandler),
acquireStreamCompleted);
acquireStreamCompleted,
useManualDataWrites);
} catch (CrtRuntimeException ex) {
completionFuture.completeExceptionally(ex);
}
Expand Down Expand Up @@ -278,7 +310,8 @@ private static native void http2StreamManagerAcquireStream(long stream_manager,
byte[] marshalledRequest,
HttpRequestBodyStream bodyStream,
HttpStreamResponseHandlerNativeAdapter responseHandler,
AsyncCallback completedCallback) throws CrtRuntimeException;
AsyncCallback completedCallback,
boolean useManualDataWrites) throws CrtRuntimeException;

private static native HttpManagerMetrics http2StreamManagerFetchMetrics(long stream_manager) throws CrtRuntimeException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,27 @@ public CompletableFuture<HttpStreamBase> acquireStream(HttpRequestBase request,
return this.h2StreamManager.acquireStream(request, streamHandler)
.thenApply(stream -> (HttpStreamBase) stream);
} else {
return this.h1StreamManager.acquireStream(request, streamHandler)
return this.h1StreamManager.acquireStream(request, streamHandler, false)
.thenApply(stream -> (HttpStreamBase) stream);
}
}

/**
* Request an HttpStream from StreamManager.
*
* @param request HttpRequestBase. The Request to make to the Server.
* @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop
* @param useManualDataWrites A boolean variable to signal that body will be streamed using async writes.
* @return A future for a HttpStreamBase that will be completed when the stream is
* acquired.
*/
public CompletableFuture<HttpStreamBase> acquireStream(HttpRequestBase request,
HttpStreamBaseResponseHandler streamHandler, boolean useManualDataWrites) {
if (this.h2StreamManager != null) {
return this.h2StreamManager.acquireStream(request, streamHandler, useManualDataWrites)
.thenApply(stream -> (HttpStreamBase) stream);
} else {
return this.h1StreamManager.acquireStream(request, streamHandler, useManualDataWrites)
.thenApply(stream -> (HttpStreamBase) stream);
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/native/http2_stream_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_Http2StreamManager_h
jbyteArray marshalled_request,
jobject jni_http_request_body_stream,
jobject jni_http_response_callback_handler,
jobject java_async_callback) {
jobject java_async_callback,
jboolean jni_use_manual_data_writes) {
(void)jni_class;
aws_cache_jni_ids(env);

Expand Down Expand Up @@ -411,6 +412,12 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_Http2StreamManager_h
.on_complete = aws_java_http_stream_on_stream_complete_fn,
.on_destroy = aws_java_http_stream_on_stream_destroy_fn,
.user_data = stream_binding,
/* aws_http2_stream_manager only forwards http2_use_manual_data_writes (not the
* unified use_manual_data_writes) through to aws_http_connection_make_request.
* Set the H2-specific flag here so manual writes actually take effect on the
* stream-manager path. Revisit when aws-c-http's http2_stream_manager.c is
* updated to forward use_manual_data_writes. */
.http2_use_manual_data_writes = jni_use_manual_data_writes,
};

struct aws_allocator *allocator = aws_jni_get_allocator();
Expand Down
35 changes: 27 additions & 8 deletions src/native/http_request_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,15 @@ JNIEXPORT jint JNICALL Java_software_amazon_awssdk_crt_http_HttpStream_httpStrea

struct http_stream_write_data_callback_data {
struct http_stream_binding *stream_cb_data;
struct aws_byte_buf data_buf;
/*
* Global ref to the Java byte[] passed in by the caller, and the aws_byte_cursor acquired
* from it via aws_jni_byte_cursor_from_jbyteArray_acquire. Held on callback_data so the
* JNI-side bytes stay valid for the full async write, and released from
* s_cleanup_write_data_callback_data once the write completes (or fails synchronously).
* Avoids copying the payload into a native aws_byte_buf before sending.
*/
jbyteArray data_array;
struct aws_byte_cursor data_cur;
struct aws_input_stream *data_stream;
jobject completion_callback;
};
Expand All @@ -622,7 +630,11 @@ static void s_cleanup_write_data_callback_data(
if (callback_data->data_stream) {
aws_input_stream_destroy(callback_data->data_stream);
}
aws_byte_buf_clean_up(&callback_data->data_buf);
if (callback_data->data_array) {
/* Release the JNI pin/copy (needs the jbyteArray + original cursor), then drop our global ref. */
aws_jni_byte_cursor_from_jbyteArray_release(env, callback_data->data_array, callback_data->data_cur);
(*env)->DeleteGlobalRef(env, callback_data->data_array);
}
(*env)->DeleteGlobalRef(env, callback_data->completion_callback);
aws_mem_release(aws_jni_get_allocator(), callback_data);
}
Expand Down Expand Up @@ -676,12 +688,19 @@ JNIEXPORT jint JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpS
};

if (data != NULL) {
struct aws_byte_cursor data_cur = aws_jni_byte_cursor_from_jbyteArray_acquire(env, data);
aws_byte_buf_init_copy_from_cursor(&callback_data->data_buf, aws_jni_get_allocator(), data_cur);
aws_jni_byte_cursor_from_jbyteArray_release(env, data, data_cur);

data_cur = aws_byte_cursor_from_buf(&callback_data->data_buf);
callback_data->data_stream = aws_input_stream_new_from_cursor(aws_jni_get_allocator(), &data_cur);
/*
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a test for this, if we don't already have one.
invoke the write data API with a local variable (Or somehow remove the reference to the object from java side). We should sill keep the data alive and works as expected since we keep the object referenced in C here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modified the existing happy path tests to create weak references and remove object, forced a gc and checked for the the weak reference to not be null.

* Promote to a global ref so it survives beyond this JNI frame — the async write
* completion fires on the event-loop thread after this function returns.
* Acquire the cursor once, store both on callback_data, and let
* s_cleanup_write_data_callback_data release them when the write finishes.
* aws_input_stream_new_from_cursor copies the {ptr,len} cursor struct into its own
* state, but does NOT copy the underlying bytes, so this path avoids the payload copy.
*/
callback_data->data_array = (*env)->NewGlobalRef(env, data);
callback_data->data_cur = aws_jni_byte_cursor_from_jbyteArray_acquire(env, data);

callback_data->data_stream =
aws_input_stream_new_from_cursor(aws_jni_get_allocator(), &callback_data->data_cur);
options.data = callback_data->data_stream;
}

Expand Down
Loading
Loading