From a9393cf3b98b5ce47289f51db348aa658799b272 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 7 May 2026 14:32:03 -0700 Subject: [PATCH 1/3] Add FFM (Foreign Function & Memory) hot-path support for S3 upload/download Adds a useFFM flag to S3MetaRequestOptions that activates zero-copy callbacks on the two performance-critical paths: Download (s_on_s3_meta_request_body_callback): - JNI path: allocates a byte[] and copies every downloaded chunk into it - FFM path: passes the raw native pointer + length as jlong primitives; Java wraps it as a MemorySegment (zero-copy) via onResponseBodyFFM() Upload (s_aws_input_stream_read): - JNI path: creates a DirectByteBuffer wrapper object per part, reads back position() via a second JNI call - FFM path: passes raw pointer + capacity as jlong primitives; Java writes directly into native memory and returns bytes-written as int Java-side changes: - S3MetaRequestOptions: add withUseFFM(boolean) / getUseFFM() - S3Client: pass useFFM to native s3ClientMakeMetaRequest() - S3MetaRequestResponseHandler: add onResponseBody(MemorySegment,...) overload - S3MetaRequestResponseHandlerNativeAdapter: add onResponseBodyFFM(long,long,long) - HttpRequestBodyStream: add sendRequestBody(long address, long length) -> int Native-side changes: - java_class_ids.h/c: register send_outgoing_body_ffm (JJ)I and onResponseBodyFFM (JJJ)I method IDs - s3_client.c: add use_ffm to callback struct; branch in body callback - http_request_utils.h/c: add use_ffm to stream impl; branch in read fn; propagate use_ffm through aws_apply_java_http_request_changes_to_native_request --- .../crt/http/HttpRequestBodyStream.java | 40 +++++++++++ .../amazon/awssdk/crt/s3/S3Client.java | 6 +- .../awssdk/crt/s3/S3MetaRequestOptions.java | 42 +++++++++++ .../crt/s3/S3MetaRequestResponseHandler.java | 33 +++++++++ ...taRequestResponseHandlerNativeAdapter.java | 37 +++++++++- src/native/http_request_utils.c | 70 ++++++++++++++----- src/native/http_request_utils.h | 13 +++- src/native/java_class_ids.c | 10 +++ src/native/java_class_ids.h | 6 +- src/native/s3_client.c | 70 +++++++++++++------ 10 files changed, 280 insertions(+), 47 deletions(-) diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpRequestBodyStream.java b/src/main/java/software/amazon/awssdk/crt/http/HttpRequestBodyStream.java index 137740796..f0d32a928 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/HttpRequestBodyStream.java +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpRequestBodyStream.java @@ -5,6 +5,8 @@ package software.amazon.awssdk.crt.http; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; import java.nio.ByteBuffer; /** @@ -29,6 +31,44 @@ default boolean sendRequestBody(ByteBuffer bodyBytesOut) { return true; } + /** + * FFM variant of {@link #sendRequestBody(ByteBuffer)}. + *

+ * Called from native when the meta request was created with + * {@link software.amazon.awssdk.crt.s3.S3MetaRequestOptions#withUseFFM(boolean) + * useFFM=true}. The native layer passes the destination buffer as a raw + * pointer ({@code address}) and its capacity ({@code length}) as {@code long} + * primitives — no {@code DirectByteBuffer} wrapper object is allocated. + *

+ * Implementations should write up to {@code length} bytes of request body + * data into the native buffer starting at {@code address} and return the + * number of bytes actually written. Returning {@code 0} signals that the + * body is complete (end-of-stream). + *

+ * The default implementation bridges to the {@link #sendRequestBody(ByteBuffer)} + * overload via a {@link MemorySegment} → {@link ByteBuffer} view, so existing + * implementations that only override the {@code ByteBuffer} version continue + * to work correctly (at the cost of the {@code ByteBuffer} wrapper allocation + * that FFM mode is trying to avoid). + * + * @param address Raw native pointer to the start of the destination buffer. + * @param length Capacity of the destination buffer in bytes. + * @return Number of bytes written into the buffer, or {@code 0} when the + * body is fully consumed (end-of-stream). + */ + default int sendRequestBody(long address, long length) { + // Default: wrap the native buffer as a ByteBuffer and delegate to the + // existing overload so that implementations that only override the + // ByteBuffer version still work. + MemorySegment seg = MemorySegment.ofAddress(address) + .reinterpret(length, Arena.ofAuto(), null); + ByteBuffer buf = seg.asByteBuffer(); + boolean done = sendRequestBody(buf); + // Return how many bytes were written (ByteBuffer.position() tracks this). + // If done==true and nothing was written, return 0 to signal end-of-stream. + return buf.position(); + } + /** * Called from native when the processing needs the stream to rewind itself back to its beginning. * If the stream does not support rewinding or the rewind fails, false should be returned diff --git a/src/main/java/software/amazon/awssdk/crt/s3/S3Client.java b/src/main/java/software/amazon/awssdk/crt/s3/S3Client.java index ba46922d5..5d497ff7f 100644 --- a/src/main/java/software/amazon/awssdk/crt/s3/S3Client.java +++ b/src/main/java/software/amazon/awssdk/crt/s3/S3Client.java @@ -226,7 +226,8 @@ public S3MetaRequest makeMetaRequest(S3MetaRequestOptions options) { fioOptionsSet, shouldStream, diskThroughputGbps, - directIo); + directIo, + options.getUseFFM()); metaRequest.setMetaRequestNativeHandle(metaRequestNativeHandle); @@ -305,5 +306,6 @@ private static native long s3ClientMakeMetaRequest(long clientId, S3MetaRequest boolean fioOptionsSet, boolean shouldStream, double diskThroughputGbps, - boolean directIo); + boolean directIo, + boolean useFFM); } diff --git a/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestOptions.java b/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestOptions.java index 26cb34ed5..f7845a59a 100644 --- a/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestOptions.java +++ b/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestOptions.java @@ -96,6 +96,13 @@ private static Map buildEnumMapping() { private ResumeToken resumeToken; private Long objectSizeHint; private FileIoOptions fileIoOptions; + /** + * When true, the native layer uses FFM (Foreign Function & Memory) style + * callbacks that pass raw native pointers as {@code long} values instead of + * allocating JNI wrapper objects (byte[] for downloads, DirectByteBuffer for + * uploads). Requires Java 22+ and {@code --enable-native-access=ALL-UNNAMED}. + */ + private boolean useFFM = false; public S3MetaRequestOptions withMetaRequestType(MetaRequestType metaRequestType) { this.metaRequestType = metaRequestType; @@ -472,4 +479,39 @@ public S3MetaRequestOptions withFileIoOptions(FileIoOptions fileIoOptions) { public FileIoOptions getFileIoOptions() { return fileIoOptions; } + + /** + * Enable FFM (Foreign Function & Memory) mode for this meta request. + *

+ * When {@code true}, the native layer passes raw native memory pointers as + * {@code long} values to Java callbacks instead of allocating JNI wrapper + * objects: + *

+ * Requires Java 22+ and the JVM flag + * {@code --enable-native-access=ALL-UNNAMED}. + * + * @param useFFM {@code true} to use FFM callbacks, {@code false} (default) + * to use the standard JNI callbacks. + * @return this + */ + public S3MetaRequestOptions withUseFFM(boolean useFFM) { + this.useFFM = useFFM; + return this; + } + + /** + * Returns whether FFM (Foreign Function & Memory) mode is enabled. + * + * @return {@code true} if FFM mode is enabled, {@code false} otherwise. + */ + public boolean getUseFFM() { + return useFFM; + } } diff --git a/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestResponseHandler.java b/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestResponseHandler.java index 004dac919..45f3c3090 100644 --- a/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestResponseHandler.java +++ b/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestResponseHandler.java @@ -4,6 +4,7 @@ */ package software.amazon.awssdk.crt.s3; +import java.lang.foreign.MemorySegment; import java.nio.ByteBuffer; import software.amazon.awssdk.crt.http.HttpHeader; @@ -50,6 +51,38 @@ default int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long o return 0; } + /** + * FFM variant of {@link #onResponseBody(ByteBuffer, long, long)}. + *

+ * Invoked instead of the {@code ByteBuffer} overload when the meta request was + * created with {@link S3MetaRequestOptions#withUseFFM(boolean) useFFM=true}. + * The body data is delivered as a {@link MemorySegment} that is a zero-copy + * view directly into native (off-heap) memory — no {@code byte[]} is allocated + * and no data is copied. + *

+ * Do NOT retain a reference to {@code bodyBytesIn} beyond the lifetime + * of this method call. The underlying native memory is only guaranteed to be + * valid for the duration of the callback. + *

+ * The default implementation falls back to the {@code ByteBuffer} overload by + * copying the data, so existing implementations that only override the + * {@code ByteBuffer} version continue to work correctly. + * + * @param bodyBytesIn A zero-copy view of the native response body chunk. + * @param objectRangeStart Byte offset of the first byte in this chunk within + * the full S3 object. + * @param objectRangeEnd Past-the-end byte offset (i.e. + * {@code objectRangeStart + chunk length}). + * @return The number of bytes to increment the flow-control window by. + * Ignored when backpressure is disabled. + */ + default int onResponseBody(MemorySegment bodyBytesIn, long objectRangeStart, long objectRangeEnd) { + // Default: copy into a heap ByteBuffer and delegate to the ByteBuffer overload + // so that implementations that only override the ByteBuffer version still work. + ByteBuffer buf = ByteBuffer.wrap(bodyBytesIn.toArray(java.lang.foreign.ValueLayout.JAVA_BYTE)); + return onResponseBody(buf, objectRangeStart, objectRangeEnd); + } + /** * Invoked when the entire meta request execution is complete. * @param context a wrapper object containing the following fields diff --git a/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestResponseHandlerNativeAdapter.java b/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestResponseHandlerNativeAdapter.java index 59818c1f5..0a3c0cebf 100644 --- a/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestResponseHandlerNativeAdapter.java +++ b/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestResponseHandlerNativeAdapter.java @@ -4,10 +4,13 @@ */ package software.amazon.awssdk.crt.s3; -import software.amazon.awssdk.crt.http.HttpHeader; - +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; import java.nio.ByteBuffer; +import software.amazon.awssdk.crt.http.HttpHeader; + class S3MetaRequestResponseHandlerNativeAdapter { private S3MetaRequestResponseHandler responseHandler; @@ -15,10 +18,40 @@ class S3MetaRequestResponseHandlerNativeAdapter { this.responseHandler = responseHandler; } + /** + * Standard JNI path: native code allocates a {@code byte[]} containing a copy + * of the response body chunk and passes it here. Used when {@code useFFM=false}. + */ int onResponseBody(byte[] bodyBytesIn, long objectRangeStart, long objectRangeEnd) { return this.responseHandler.onResponseBody(ByteBuffer.wrap(bodyBytesIn), objectRangeStart, objectRangeEnd); } + /** + * FFM path: native code passes the raw native pointer ({@code address}) and + * the chunk length ({@code length}) as {@code long} primitives — no heap + * allocation, no copy. Used when {@code useFFM=true}. + *

+ * We wrap the native memory as a {@link MemorySegment} scoped to a + * {@link Arena#ofAuto() auto arena} so that the segment is valid for the + * duration of this call but cannot be retained beyond it (the native buffer + * is only guaranteed live for the callback's lifetime). + * + * @param address Raw native pointer to the start of the body chunk. + * @param length Number of bytes in the chunk. + * @param objectRangeStart Byte offset of the first byte within the S3 object. + * @return The number of bytes to increment the flow-control window by. + */ + int onResponseBodyFFM(long address, long length, long objectRangeStart) { + // Wrap the native pointer as a MemorySegment — zero copy. + // reinterpret() is required to give the segment a known size; the + // Arena.ofAuto() cleanup action is null because the native side owns + // the memory and will free it after the callback returns. + MemorySegment segment = MemorySegment.ofAddress(address) + .reinterpret(length, Arena.ofAuto(), null); + long objectRangeEnd = objectRangeStart + length; + return this.responseHandler.onResponseBody(segment, objectRangeStart, objectRangeEnd); + } + void onFinished(int errorCode, int responseStatus, byte[] errorPayload, String errorOperationName, int checksumAlgorithm, boolean didValidateChecksum, Throwable cause, final ByteBuffer headersBlob) { HttpHeader[] errorHeaders = headersBlob == null ? null : HttpHeader.loadHeadersFromMarshalledHeadersBlob(headersBlob); S3FinishedResponseContext context = new S3FinishedResponseContext(errorCode, responseStatus, errorPayload, errorOperationName, ChecksumAlgorithm.getEnumValueFromInteger(checksumAlgorithm), didValidateChecksum, cause, errorHeaders); diff --git a/src/native/http_request_utils.c b/src/native/http_request_utils.c index e3274a22b..64ada0611 100644 --- a/src/native/http_request_utils.c +++ b/src/native/http_request_utils.c @@ -24,6 +24,9 @@ struct aws_http_request_body_stream_impl { jobject http_request_body_stream; bool body_done; bool is_valid; + /* When true, call sendRequestBody(long address, long length) -> int + * instead of sendRequestBody(ByteBuffer) -> boolean. */ + bool use_ffm; }; static int s_aws_input_stream_seek(struct aws_input_stream *stream, int64_t offset, enum aws_stream_seek_basis basis) { @@ -94,21 +97,49 @@ static int s_aws_input_stream_read(struct aws_input_stream *stream, struct aws_b } size_t out_remaining = dest->capacity - dest->len; + int result = AWS_OP_SUCCESS; - jobject direct_buffer = aws_jni_direct_byte_buffer_from_raw_ptr(env, dest->buffer + dest->len, out_remaining); + if (impl->use_ffm) { + /* + * FFM path: pass the raw native pointer and capacity as jlong primitives. + * Java writes directly into the native buffer and returns the number of + * bytes written as an int. Returning 0 signals end-of-stream. + * No DirectByteBuffer wrapper object is allocated. + */ + jlong ptr = (jlong)(uintptr_t)(dest->buffer + dest->len); + jlong len = (jlong)out_remaining; - impl->body_done = (*env)->CallBooleanMethod( - env, impl->http_request_body_stream, http_request_body_stream_properties.send_outgoing_body, direct_buffer); + jint bytes_written = (*env)->CallIntMethod( + env, impl->http_request_body_stream, http_request_body_stream_properties.send_outgoing_body_ffm, ptr, len); - int result = AWS_OP_SUCCESS; - if (aws_jni_check_and_clear_exception(env)) { - result = aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE); + if (aws_jni_check_and_clear_exception(env)) { + result = aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE); + } else { + dest->len += (size_t)bytes_written; + /* 0 bytes written signals end-of-stream */ + if (bytes_written == 0) { + impl->body_done = true; + } + } } else { - size_t amt_written = aws_jni_byte_buffer_get_position(env, direct_buffer); - dest->len += amt_written; - } + /* + * JNI path (unchanged): wrap the native buffer as a DirectByteBuffer, + * call sendRequestBody(ByteBuffer), then read back position(). + */ + jobject direct_buffer = aws_jni_direct_byte_buffer_from_raw_ptr(env, dest->buffer + dest->len, out_remaining); - (*env)->DeleteLocalRef(env, direct_buffer); + impl->body_done = (*env)->CallBooleanMethod( + env, impl->http_request_body_stream, http_request_body_stream_properties.send_outgoing_body, direct_buffer); + + if (aws_jni_check_and_clear_exception(env)) { + result = aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE); + } else { + size_t amt_written = aws_jni_byte_buffer_get_position(env, direct_buffer); + dest->len += amt_written; + } + + (*env)->DeleteLocalRef(env, direct_buffer); + } aws_jni_release_thread_env(impl->jvm, &jvm_env_context); /********** JNI ENV RELEASE **********/ @@ -188,7 +219,8 @@ static struct aws_input_stream_vtable s_aws_input_stream_vtable = { struct aws_input_stream *aws_input_stream_new_from_java_http_request_body_stream( struct aws_allocator *allocator, JNIEnv *env, - jobject http_request_body_stream) { + jobject http_request_body_stream, + bool use_ffm) { struct aws_http_request_body_stream_impl *impl = aws_mem_calloc(allocator, 1, sizeof(struct aws_http_request_body_stream_impl)); @@ -200,6 +232,8 @@ struct aws_input_stream *aws_input_stream_new_from_java_http_request_body_stream AWS_FATAL_ASSERT(jvmresult == 0); impl->is_valid = true; + impl->use_ffm = use_ffm; + if (http_request_body_stream != NULL) { impl->http_request_body_stream = (*env)->NewGlobalRef(env, http_request_body_stream); if (impl->http_request_body_stream == NULL) { @@ -370,7 +404,8 @@ int aws_apply_java_http_request_changes_to_native_request( JNIEnv *env, jbyteArray marshalled_request, jobject jni_body_stream, - struct aws_http_message *message) { + struct aws_http_message *message, + bool use_ffm) { /* come back to this when we decide we need to. */ (void)jni_body_stream; @@ -400,8 +435,10 @@ int aws_apply_java_http_request_changes_to_native_request( } if (jni_body_stream) { - struct aws_input_stream *body_stream = - aws_input_stream_new_from_java_http_request_body_stream(aws_jni_get_allocator(), env, jni_body_stream); + /* use_ffm=false here: this path is used by non-S3 HTTP requests that + * don't go through the FFM meta-request path. */ + struct aws_input_stream *body_stream = aws_input_stream_new_from_java_http_request_body_stream( + aws_jni_get_allocator(), env, jni_body_stream, false); aws_http_message_set_body_stream(message, body_stream); /* request controls the lifetime of body stream fully */ @@ -440,8 +477,9 @@ struct aws_http_message *aws_http_request_new_from_java_http_request( } if (jni_body_stream != NULL) { - struct aws_input_stream *body_stream = - aws_input_stream_new_from_java_http_request_body_stream(aws_jni_get_allocator(), env, jni_body_stream); + /* use_ffm=false: this function is used by non-S3 HTTP requests */ + struct aws_input_stream *body_stream = aws_input_stream_new_from_java_http_request_body_stream( + aws_jni_get_allocator(), env, jni_body_stream, false); if (body_stream == NULL) { exception_message = "aws_fill_out_request: Error building body stream"; goto on_error; diff --git a/src/native/http_request_utils.h b/src/native/http_request_utils.h index 220d58982..36d6bffde 100644 --- a/src/native/http_request_utils.h +++ b/src/native/http_request_utils.h @@ -16,10 +16,18 @@ struct aws_http_headers; struct aws_http_message; struct aws_input_stream; +/** + * Create a native aws_input_stream that reads from a Java HttpRequestBodyStream. + * + * @param use_ffm When true, the stream will call the FFM-style + * sendRequestBody(long, long) -> int method instead of the + * JNI-style sendRequestBody(ByteBuffer) -> boolean method. + */ struct aws_input_stream *aws_input_stream_new_from_java_http_request_body_stream( struct aws_allocator *allocator, JNIEnv *env, - jobject http_request_body_stream); + jobject http_request_body_stream, + bool use_ffm); struct aws_http_message *aws_http_request_new_from_java_http_request( JNIEnv *env, @@ -40,7 +48,8 @@ int aws_apply_java_http_request_changes_to_native_request( JNIEnv *env, jbyteArray marshalled_request, jobject jni_body_stream, - struct aws_http_message *message); + struct aws_http_message *message, + bool use_ffm); /* if this fails a java exception has been set. */ jobject aws_java_http_request_from_native(JNIEnv *env, struct aws_http_message *message, jobject request_body_stream); diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c index 765b6a983..0da87f49e 100644 --- a/src/native/java_class_ids.c +++ b/src/native/java_class_ids.c @@ -18,6 +18,11 @@ static void s_cache_http_request_body_stream(JNIEnv *env) { (*env)->GetMethodID(env, cls, "sendRequestBody", "(Ljava/nio/ByteBuffer;)Z"); AWS_FATAL_ASSERT(http_request_body_stream_properties.send_outgoing_body); + /* FFM path: sendRequestBody(long address, long length) -> int bytes_written */ + http_request_body_stream_properties.send_outgoing_body_ffm = + (*env)->GetMethodID(env, cls, "sendRequestBody", "(JJ)I"); + AWS_FATAL_ASSERT(http_request_body_stream_properties.send_outgoing_body_ffm); + http_request_body_stream_properties.reset_position = (*env)->GetMethodID(env, cls, "resetPosition", "()Z"); AWS_FATAL_ASSERT(http_request_body_stream_properties.reset_position); @@ -726,6 +731,11 @@ static void s_cache_s3_meta_request_response_handler_native_adapter_properties(J (*env)->GetMethodID(env, cls, "onResponseBody", "([BJJ)I"); AWS_FATAL_ASSERT(s3_meta_request_response_handler_native_adapter_properties.onResponseBody); + /* FFM path: onResponseBodyFFM(long address, long length, long rangeStart) -> int */ + s3_meta_request_response_handler_native_adapter_properties.onResponseBodyFFM = + (*env)->GetMethodID(env, cls, "onResponseBodyFFM", "(JJJ)I"); + AWS_FATAL_ASSERT(s3_meta_request_response_handler_native_adapter_properties.onResponseBodyFFM); + s3_meta_request_response_handler_native_adapter_properties.onFinished = (*env)->GetMethodID( env, cls, "onFinished", "(II[BLjava/lang/String;IZLjava/lang/Throwable;Ljava/nio/ByteBuffer;)V"); AWS_FATAL_ASSERT(s3_meta_request_response_handler_native_adapter_properties.onFinished); diff --git a/src/native/java_class_ids.h b/src/native/java_class_ids.h index d1da8565e..21e5c703c 100644 --- a/src/native/java_class_ids.h +++ b/src/native/java_class_ids.h @@ -10,7 +10,8 @@ /* HttpRequestBodyStream */ struct java_http_request_body_stream_properties { - jmethodID send_outgoing_body; + jmethodID send_outgoing_body; /* sendRequestBody(Ljava/nio/ByteBuffer;)Z - JNI path */ + jmethodID send_outgoing_body_ffm; /* sendRequestBody(JJ)I - FFM path */ jmethodID reset_position; jmethodID get_length; }; @@ -327,7 +328,8 @@ extern struct java_s3_meta_request_properties s3_meta_request_properties; /* S3MetaRequestResponseHandlerNativeAdapter */ struct java_s3_meta_request_response_handler_native_adapter_properties { - jmethodID onResponseBody; + jmethodID onResponseBody; /* onResponseBody([BJJ)I - JNI path: byte[], rangeStart, rangeEnd */ + jmethodID onResponseBodyFFM; /* onResponseBodyFFM(JJJ)I - FFM path: address, length, rangeStart */ jmethodID onFinished; jmethodID onResponseHeaders; jmethodID onProgress; diff --git a/src/native/s3_client.c b/src/native/s3_client.c index ccaae2fdc..f3b08dfff 100644 --- a/src/native/s3_client.c +++ b/src/native/s3_client.c @@ -50,6 +50,9 @@ struct s3_client_make_meta_request_callback_data { struct aws_input_stream *input_stream; struct aws_signing_config_data signing_config_data; jthrowable java_exception; + /* When true, use FFM-style callbacks (raw pointer + length as jlong) + * instead of allocating JNI wrapper objects (byte[] / DirectByteBuffer). */ + bool use_ffm; }; static void s_on_s3_client_shutdown_complete_callback(void *user_data); @@ -606,8 +609,6 @@ static int s_on_s3_meta_request_body_callback( const struct aws_byte_cursor *body, uint64_t range_start, void *user_data) { - (void)body; - (void)range_start; int return_value = AWS_OP_ERR; uint64_t range_end = range_start + body->len; @@ -623,25 +624,45 @@ static int s_on_s3_meta_request_body_callback( return AWS_OP_ERR; } - jobject jni_payload = aws_jni_byte_array_from_cursor(env, body); - if (jni_payload == NULL) { - /* JVM is out of memory, but native code can still have memory available, handle it and don't crash. */ - aws_jni_check_and_clear_exception(env); - aws_jni_release_thread_env(callback_data->jvm, &jvm_env_context); - /********** JNI ENV RELEASE **********/ - return aws_raise_error(AWS_ERROR_JAVA_CRT_JVM_OUT_OF_MEMORY); - } - jint body_response_result = 0; if (callback_data->java_s3_meta_request_response_handler_native_adapter != NULL) { - body_response_result = (*env)->CallIntMethod( - env, - callback_data->java_s3_meta_request_response_handler_native_adapter, - s3_meta_request_response_handler_native_adapter_properties.onResponseBody, - jni_payload, - range_start, - range_end); + if (callback_data->use_ffm) { + /* + * FFM path: pass the raw native pointer and length as jlong primitives. + * No byte[] allocation, no memory copy. + * Java wraps this as a MemorySegment (zero-copy) in onResponseBodyFFM(). + */ + body_response_result = (*env)->CallIntMethod( + env, + callback_data->java_s3_meta_request_response_handler_native_adapter, + s3_meta_request_response_handler_native_adapter_properties.onResponseBodyFFM, + (jlong)(uintptr_t)body->ptr, /* raw native pointer */ + (jlong)body->len, /* chunk length in bytes */ + (jlong)range_start); /* byte offset within the S3 object */ + } else { + /* + * JNI path (unchanged): copy the body bytes into a new Java byte[]. + */ + jobject jni_payload = aws_jni_byte_array_from_cursor(env, body); + if (jni_payload == NULL) { + /* JVM is out of memory, but native code can still have memory available. */ + aws_jni_check_and_clear_exception(env); + aws_jni_release_thread_env(callback_data->jvm, &jvm_env_context); + /********** JNI ENV RELEASE **********/ + return aws_raise_error(AWS_ERROR_JAVA_CRT_JVM_OUT_OF_MEMORY); + } + + body_response_result = (*env)->CallIntMethod( + env, + callback_data->java_s3_meta_request_response_handler_native_adapter, + s3_meta_request_response_handler_native_adapter_properties.onResponseBody, + jni_payload, + range_start, + range_end); + + (*env)->DeleteLocalRef(env, jni_payload); + } if (aws_jni_get_and_clear_exception(env, &(callback_data->java_exception))) { AWS_LOGF_ERROR( @@ -660,8 +681,6 @@ static int s_on_s3_meta_request_body_callback( return_value = AWS_OP_SUCCESS; cleanup: - (*env)->DeleteLocalRef(env, jni_payload); - aws_jni_release_thread_env(callback_data->jvm, &jvm_env_context); /********** JNI ENV RELEASE **********/ @@ -1284,7 +1303,8 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_s3_S3Client_s3ClientMake jboolean fio_options_set, jboolean should_stream, jdouble disk_throughput_gbps, - jboolean direct_io) { + jboolean direct_io, + jboolean use_ffm) { (void)jni_class; aws_cache_jni_ids(env); @@ -1327,12 +1347,16 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_s3_S3Client_s3ClientMake (*env)->NewGlobalRef(env, java_response_handler_jobject); AWS_FATAL_ASSERT(callback_data->java_s3_meta_request_response_handler_native_adapter != NULL); + /* Store the FFM flag so body/upload callbacks can branch on it */ + callback_data->use_ffm = (bool)use_ffm; + request_message = aws_http_message_new_request(allocator); AWS_FATAL_ASSERT(request_message); AWS_FATAL_ASSERT( - AWS_OP_SUCCESS == aws_apply_java_http_request_changes_to_native_request( - env, jni_marshalled_message_data, jni_http_request_body_stream, request_message)); + AWS_OP_SUCCESS == + aws_apply_java_http_request_changes_to_native_request( + env, jni_marshalled_message_data, jni_http_request_body_stream, request_message, callback_data->use_ffm)); if (jni_operation_name) { operation_name = aws_jni_byte_cursor_from_jbyteArray_acquire(env, jni_operation_name); From dc7dcbdad9cf46cbfdfecd3d7c8e440168b71419 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Fri, 8 May 2026 09:20:38 -0700 Subject: [PATCH 2/3] Fix build errors: update all callers of changed function signatures Three callers were missing the new use_ffm parameter: - aws_signing.c: aws_input_stream_new_from_java_http_request_body_stream() -> add false (signing path always uses JNI) - mqtt_connection.c: aws_apply_java_http_request_changes_to_native_request() -> add false (MQTT path always uses JNI) - mqtt5_client.c: aws_apply_java_http_request_changes_to_native_request() -> add false (MQTT5 path always uses JNI) Also fix the unused-parameter warning in http_request_utils.c: - aws_apply_java_http_request_changes_to_native_request() now passes use_ffm through to aws_input_stream_new_from_java_http_request_body_stream() instead of hardcoding false, making the parameter actually used. --- src/native/aws_signing.c | 2 +- src/native/http_request_utils.c | 5 ++--- src/native/mqtt5_client.c | 2 +- src/native/mqtt_connection.c | 2 +- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/native/aws_signing.c b/src/native/aws_signing.c index 9e1a2b6a7..5e874170e 100644 --- a/src/native/aws_signing.c +++ b/src/native/aws_signing.c @@ -494,7 +494,7 @@ void JNICALL Java_software_amazon_awssdk_crt_auth_signing_AwsSigner_awsSignerSig AWS_FATAL_ASSERT(callback_data->java_original_chunk_body != NULL); callback_data->chunk_body_stream = aws_input_stream_new_from_java_http_request_body_stream( - aws_jni_get_allocator(), env, java_chunk_body_stream); + aws_jni_get_allocator(), env, java_chunk_body_stream, false /* use_ffm: signing path uses JNI */); if (callback_data->chunk_body_stream == NULL) { aws_jni_throw_runtime_exception(env, "Error building chunk body stream"); goto on_error; diff --git a/src/native/http_request_utils.c b/src/native/http_request_utils.c index 64ada0611..a0d3a95cd 100644 --- a/src/native/http_request_utils.c +++ b/src/native/http_request_utils.c @@ -435,10 +435,9 @@ int aws_apply_java_http_request_changes_to_native_request( } if (jni_body_stream) { - /* use_ffm=false here: this path is used by non-S3 HTTP requests that - * don't go through the FFM meta-request path. */ + /* Pass use_ffm through so the stream uses the correct callback path. */ struct aws_input_stream *body_stream = aws_input_stream_new_from_java_http_request_body_stream( - aws_jni_get_allocator(), env, jni_body_stream, false); + aws_jni_get_allocator(), env, jni_body_stream, use_ffm); aws_http_message_set_body_stream(message, body_stream); /* request controls the lifetime of body stream fully */ diff --git a/src/native/mqtt5_client.c b/src/native/mqtt5_client.c index f5018a1b8..84b6844a1 100644 --- a/src/native/mqtt5_client.c +++ b/src/native/mqtt5_client.c @@ -1650,7 +1650,7 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt5_Mqtt5Client_mqtt5Cl } if (aws_apply_java_http_request_changes_to_native_request( - env, jni_marshalled_request, NULL, ws_handshake->http_request)) { + env, jni_marshalled_request, NULL, ws_handshake->http_request, false /* use_ffm: MQTT5 path uses JNI */)) { error_code = aws_last_error(); goto done; } diff --git a/src/native/mqtt_connection.c b/src/native/mqtt_connection.c index 2d68f1d99..c35c12b5d 100644 --- a/src/native/mqtt_connection.c +++ b/src/native/mqtt_connection.c @@ -1290,7 +1290,7 @@ void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClien } if (aws_apply_java_http_request_changes_to_native_request( - env, jni_marshalled_request, NULL, ws_handshake->http_request)) { + env, jni_marshalled_request, NULL, ws_handshake->http_request, false /* use_ffm: MQTT path uses JNI */)) { error_code = aws_last_error(); goto done; } From b7e5c9073ba98bc66dfce2ff9b295a45d1c15b9b Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Fri, 8 May 2026 09:31:39 -0700 Subject: [PATCH 3/3] need to compile using java 22 for FFM --- pom.xml | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 81e7d78a6..2fb755428 100644 --- a/pom.xml +++ b/pom.xml @@ -49,14 +49,29 @@ + compile-for-java-8 - [9,) + [9,22) 8 + + + compile-for-java-22 + + [22,) + + + 22 + + continuous-integration