Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,29 @@

<profiles>
<profile>
<!-- On JDK 9-21, compile at Java 8 level for broad compatibility. -->
<id>compile-for-java-8</id>
<activation>
<jdk>[9,)</jdk>
<jdk>[9,22)</jdk>
</activation>
<properties>
<maven.compiler.release>8</maven.compiler.release>
</properties>
</profile>
<profile>
<!--
On JDK 22+, compile at Java 22 level so that java.lang.foreign
(Foreign Function & Memory API) is available. This is required
for the FFM-backed S3 hot paths (useFFM=true).
-->
<id>compile-for-java-22</id>
<activation>
<jdk>[22,)</jdk>
</activation>
<properties>
<maven.compiler.release>22</maven.compiler.release>
</properties>
</profile>
<profile>
<id>continuous-integration</id>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package software.amazon.awssdk.crt.http;

import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;

/**
Expand All @@ -29,6 +31,44 @@ default boolean sendRequestBody(ByteBuffer bodyBytesOut) {
return true;
}

/**
* FFM variant of {@link #sendRequestBody(ByteBuffer)}.
* <p>
* Called from native when the meta request was created with
* {@link software.amazon.awssdk.crt.s3.S3MetaRequestOptions#withUseFFM(boolean)
* useFFM=true}. The native layer passes the destination buffer as a raw
* pointer ({@code address}) and its capacity ({@code length}) as {@code long}
* primitives — no {@code DirectByteBuffer} wrapper object is allocated.
* <p>
* Implementations should write up to {@code length} bytes of request body
* data into the native buffer starting at {@code address} and return the
* number of bytes actually written. Returning {@code 0} signals that the
* body is complete (end-of-stream).
* <p>
* The default implementation bridges to the {@link #sendRequestBody(ByteBuffer)}
* overload via a {@link MemorySegment} → {@link ByteBuffer} view, so existing
* implementations that only override the {@code ByteBuffer} version continue
* to work correctly (at the cost of the {@code ByteBuffer} wrapper allocation
* that FFM mode is trying to avoid).
*
* @param address Raw native pointer to the start of the destination buffer.
* @param length Capacity of the destination buffer in bytes.
* @return Number of bytes written into the buffer, or {@code 0} when the
* body is fully consumed (end-of-stream).
*/
default int sendRequestBody(long address, long length) {
// Default: wrap the native buffer as a ByteBuffer and delegate to the
// existing overload so that implementations that only override the
// ByteBuffer version still work.
MemorySegment seg = MemorySegment.ofAddress(address)
.reinterpret(length, Arena.ofAuto(), null);
ByteBuffer buf = seg.asByteBuffer();
boolean done = sendRequestBody(buf);
// Return how many bytes were written (ByteBuffer.position() tracks this).
// If done==true and nothing was written, return 0 to signal end-of-stream.
return buf.position();
}

/**
* Called from native when the processing needs the stream to rewind itself back to its beginning.
* If the stream does not support rewinding or the rewind fails, false should be returned
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/software/amazon/awssdk/crt/s3/S3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ public S3MetaRequest makeMetaRequest(S3MetaRequestOptions options) {
fioOptionsSet,
shouldStream,
diskThroughputGbps,
directIo);
directIo,
options.getUseFFM());

metaRequest.setMetaRequestNativeHandle(metaRequestNativeHandle);

Expand Down Expand Up @@ -305,5 +306,6 @@ private static native long s3ClientMakeMetaRequest(long clientId, S3MetaRequest
boolean fioOptionsSet,
boolean shouldStream,
double diskThroughputGbps,
boolean directIo);
boolean directIo,
boolean useFFM);
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ private static Map<Integer, MetaRequestType> buildEnumMapping() {
private ResumeToken resumeToken;
private Long objectSizeHint;
private FileIoOptions fileIoOptions;
/**
* When true, the native layer uses FFM (Foreign Function & Memory) style
* callbacks that pass raw native pointers as {@code long} values instead of
* allocating JNI wrapper objects (byte[] for downloads, DirectByteBuffer for
* uploads). Requires Java 22+ and {@code --enable-native-access=ALL-UNNAMED}.
*/
private boolean useFFM = false;

public S3MetaRequestOptions withMetaRequestType(MetaRequestType metaRequestType) {
this.metaRequestType = metaRequestType;
Expand Down Expand Up @@ -472,4 +479,39 @@ public S3MetaRequestOptions withFileIoOptions(FileIoOptions fileIoOptions) {
public FileIoOptions getFileIoOptions() {
return fileIoOptions;
}

/**
* Enable FFM (Foreign Function &amp; Memory) mode for this meta request.
* <p>
* When {@code true}, the native layer passes raw native memory pointers as
* {@code long} values to Java callbacks instead of allocating JNI wrapper
* objects:
* <ul>
* <li><b>Downloads:</b> {@code onResponseBody} receives a
* {@code MemorySegment} view of native memory (zero-copy) instead of
* a heap-allocated {@code byte[]}.</li>
* <li><b>Uploads:</b> {@code sendRequestBody} receives a raw pointer and
* length as {@code long} values instead of a {@code DirectByteBuffer}
* wrapper object.</li>
* </ul>
* Requires Java 22+ and the JVM flag
* {@code --enable-native-access=ALL-UNNAMED}.
*
* @param useFFM {@code true} to use FFM callbacks, {@code false} (default)
* to use the standard JNI callbacks.
* @return this
*/
public S3MetaRequestOptions withUseFFM(boolean useFFM) {
this.useFFM = useFFM;
return this;
}

/**
* Returns whether FFM (Foreign Function &amp; Memory) mode is enabled.
*
* @return {@code true} if FFM mode is enabled, {@code false} otherwise.
*/
public boolean getUseFFM() {
return useFFM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package software.amazon.awssdk.crt.s3;

import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;
import software.amazon.awssdk.crt.http.HttpHeader;

Expand Down Expand Up @@ -50,6 +51,38 @@ default int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long o
return 0;
}

/**
* FFM variant of {@link #onResponseBody(ByteBuffer, long, long)}.
* <p>
* Invoked instead of the {@code ByteBuffer} overload when the meta request was
* created with {@link S3MetaRequestOptions#withUseFFM(boolean) useFFM=true}.
* The body data is delivered as a {@link MemorySegment} that is a zero-copy
* view directly into native (off-heap) memory — no {@code byte[]} is allocated
* and no data is copied.
* <p>
* <b>Do NOT</b> retain a reference to {@code bodyBytesIn} beyond the lifetime
* of this method call. The underlying native memory is only guaranteed to be
* valid for the duration of the callback.
* <p>
* The default implementation falls back to the {@code ByteBuffer} overload by
* copying the data, so existing implementations that only override the
* {@code ByteBuffer} version continue to work correctly.
*
* @param bodyBytesIn A zero-copy view of the native response body chunk.
* @param objectRangeStart Byte offset of the first byte in this chunk within
* the full S3 object.
* @param objectRangeEnd Past-the-end byte offset (i.e.
* {@code objectRangeStart + chunk length}).
* @return The number of bytes to increment the flow-control window by.
* Ignored when backpressure is disabled.
*/
default int onResponseBody(MemorySegment bodyBytesIn, long objectRangeStart, long objectRangeEnd) {
// Default: copy into a heap ByteBuffer and delegate to the ByteBuffer overload
// so that implementations that only override the ByteBuffer version still work.
ByteBuffer buf = ByteBuffer.wrap(bodyBytesIn.toArray(java.lang.foreign.ValueLayout.JAVA_BYTE));
return onResponseBody(buf, objectRangeStart, objectRangeEnd);
}

/**
* Invoked when the entire meta request execution is complete.
* @param context a wrapper object containing the following fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,54 @@
*/
package software.amazon.awssdk.crt.s3;

import software.amazon.awssdk.crt.http.HttpHeader;

import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.ByteBuffer;

import software.amazon.awssdk.crt.http.HttpHeader;

class S3MetaRequestResponseHandlerNativeAdapter {
private S3MetaRequestResponseHandler responseHandler;

S3MetaRequestResponseHandlerNativeAdapter(S3MetaRequestResponseHandler responseHandler) {
this.responseHandler = responseHandler;
}

/**
* Standard JNI path: native code allocates a {@code byte[]} containing a copy
* of the response body chunk and passes it here. Used when {@code useFFM=false}.
*/
int onResponseBody(byte[] bodyBytesIn, long objectRangeStart, long objectRangeEnd) {
return this.responseHandler.onResponseBody(ByteBuffer.wrap(bodyBytesIn), objectRangeStart, objectRangeEnd);
}

/**
* FFM path: native code passes the raw native pointer ({@code address}) and
* the chunk length ({@code length}) as {@code long} primitives — no heap
* allocation, no copy. Used when {@code useFFM=true}.
* <p>
* We wrap the native memory as a {@link MemorySegment} scoped to a
* {@link Arena#ofAuto() auto arena} so that the segment is valid for the
* duration of this call but cannot be retained beyond it (the native buffer
* is only guaranteed live for the callback's lifetime).
*
* @param address Raw native pointer to the start of the body chunk.
* @param length Number of bytes in the chunk.
* @param objectRangeStart Byte offset of the first byte within the S3 object.
* @return The number of bytes to increment the flow-control window by.
*/
int onResponseBodyFFM(long address, long length, long objectRangeStart) {
// Wrap the native pointer as a MemorySegment — zero copy.
// reinterpret() is required to give the segment a known size; the
// Arena.ofAuto() cleanup action is null because the native side owns
// the memory and will free it after the callback returns.
MemorySegment segment = MemorySegment.ofAddress(address)
.reinterpret(length, Arena.ofAuto(), null);
long objectRangeEnd = objectRangeStart + length;
return this.responseHandler.onResponseBody(segment, objectRangeStart, objectRangeEnd);
}

void onFinished(int errorCode, int responseStatus, byte[] errorPayload, String errorOperationName, int checksumAlgorithm, boolean didValidateChecksum, Throwable cause, final ByteBuffer headersBlob) {
HttpHeader[] errorHeaders = headersBlob == null ? null : HttpHeader.loadHeadersFromMarshalledHeadersBlob(headersBlob);
S3FinishedResponseContext context = new S3FinishedResponseContext(errorCode, responseStatus, errorPayload, errorOperationName, ChecksumAlgorithm.getEnumValueFromInteger(checksumAlgorithm), didValidateChecksum, cause, errorHeaders);
Expand Down
2 changes: 1 addition & 1 deletion src/native/aws_signing.c
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ void JNICALL Java_software_amazon_awssdk_crt_auth_signing_AwsSigner_awsSignerSig
AWS_FATAL_ASSERT(callback_data->java_original_chunk_body != NULL);

callback_data->chunk_body_stream = aws_input_stream_new_from_java_http_request_body_stream(
aws_jni_get_allocator(), env, java_chunk_body_stream);
aws_jni_get_allocator(), env, java_chunk_body_stream, false /* use_ffm: signing path uses JNI */);
if (callback_data->chunk_body_stream == NULL) {
aws_jni_throw_runtime_exception(env, "Error building chunk body stream");
goto on_error;
Expand Down
69 changes: 53 additions & 16 deletions src/native/http_request_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ struct aws_http_request_body_stream_impl {
jobject http_request_body_stream;
bool body_done;
bool is_valid;
/* When true, call sendRequestBody(long address, long length) -> int
* instead of sendRequestBody(ByteBuffer) -> boolean. */
bool use_ffm;
};

static int s_aws_input_stream_seek(struct aws_input_stream *stream, int64_t offset, enum aws_stream_seek_basis basis) {
Expand Down Expand Up @@ -94,21 +97,49 @@ static int s_aws_input_stream_read(struct aws_input_stream *stream, struct aws_b
}

size_t out_remaining = dest->capacity - dest->len;
int result = AWS_OP_SUCCESS;

jobject direct_buffer = aws_jni_direct_byte_buffer_from_raw_ptr(env, dest->buffer + dest->len, out_remaining);
if (impl->use_ffm) {
/*
* FFM path: pass the raw native pointer and capacity as jlong primitives.
* Java writes directly into the native buffer and returns the number of
* bytes written as an int. Returning 0 signals end-of-stream.
* No DirectByteBuffer wrapper object is allocated.
*/
jlong ptr = (jlong)(uintptr_t)(dest->buffer + dest->len);
jlong len = (jlong)out_remaining;

impl->body_done = (*env)->CallBooleanMethod(
env, impl->http_request_body_stream, http_request_body_stream_properties.send_outgoing_body, direct_buffer);
jint bytes_written = (*env)->CallIntMethod(
env, impl->http_request_body_stream, http_request_body_stream_properties.send_outgoing_body_ffm, ptr, len);

int result = AWS_OP_SUCCESS;
if (aws_jni_check_and_clear_exception(env)) {
result = aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
if (aws_jni_check_and_clear_exception(env)) {
result = aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
} else {
dest->len += (size_t)bytes_written;
/* 0 bytes written signals end-of-stream */
if (bytes_written == 0) {
impl->body_done = true;
}
}
} else {
size_t amt_written = aws_jni_byte_buffer_get_position(env, direct_buffer);
dest->len += amt_written;
}
/*
* JNI path (unchanged): wrap the native buffer as a DirectByteBuffer,
* call sendRequestBody(ByteBuffer), then read back position().
*/
jobject direct_buffer = aws_jni_direct_byte_buffer_from_raw_ptr(env, dest->buffer + dest->len, out_remaining);

(*env)->DeleteLocalRef(env, direct_buffer);
impl->body_done = (*env)->CallBooleanMethod(
env, impl->http_request_body_stream, http_request_body_stream_properties.send_outgoing_body, direct_buffer);

if (aws_jni_check_and_clear_exception(env)) {
result = aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
} else {
size_t amt_written = aws_jni_byte_buffer_get_position(env, direct_buffer);
dest->len += amt_written;
}

(*env)->DeleteLocalRef(env, direct_buffer);
}

aws_jni_release_thread_env(impl->jvm, &jvm_env_context);
/********** JNI ENV RELEASE **********/
Expand Down Expand Up @@ -188,7 +219,8 @@ static struct aws_input_stream_vtable s_aws_input_stream_vtable = {
struct aws_input_stream *aws_input_stream_new_from_java_http_request_body_stream(
struct aws_allocator *allocator,
JNIEnv *env,
jobject http_request_body_stream) {
jobject http_request_body_stream,
bool use_ffm) {
struct aws_http_request_body_stream_impl *impl =
aws_mem_calloc(allocator, 1, sizeof(struct aws_http_request_body_stream_impl));

Expand All @@ -200,6 +232,8 @@ struct aws_input_stream *aws_input_stream_new_from_java_http_request_body_stream
AWS_FATAL_ASSERT(jvmresult == 0);

impl->is_valid = true;
impl->use_ffm = use_ffm;

if (http_request_body_stream != NULL) {
impl->http_request_body_stream = (*env)->NewGlobalRef(env, http_request_body_stream);
if (impl->http_request_body_stream == NULL) {
Expand Down Expand Up @@ -370,7 +404,8 @@ int aws_apply_java_http_request_changes_to_native_request(
JNIEnv *env,
jbyteArray marshalled_request,
jobject jni_body_stream,
struct aws_http_message *message) {
struct aws_http_message *message,
bool use_ffm) {

/* come back to this when we decide we need to. */
(void)jni_body_stream;
Expand Down Expand Up @@ -400,8 +435,9 @@ int aws_apply_java_http_request_changes_to_native_request(
}

if (jni_body_stream) {
struct aws_input_stream *body_stream =
aws_input_stream_new_from_java_http_request_body_stream(aws_jni_get_allocator(), env, jni_body_stream);
/* Pass use_ffm through so the stream uses the correct callback path. */
struct aws_input_stream *body_stream = aws_input_stream_new_from_java_http_request_body_stream(
aws_jni_get_allocator(), env, jni_body_stream, use_ffm);

aws_http_message_set_body_stream(message, body_stream);
/* request controls the lifetime of body stream fully */
Expand Down Expand Up @@ -440,8 +476,9 @@ struct aws_http_message *aws_http_request_new_from_java_http_request(
}

if (jni_body_stream != NULL) {
struct aws_input_stream *body_stream =
aws_input_stream_new_from_java_http_request_body_stream(aws_jni_get_allocator(), env, jni_body_stream);
/* use_ffm=false: this function is used by non-S3 HTTP requests */
struct aws_input_stream *body_stream = aws_input_stream_new_from_java_http_request_body_stream(
aws_jni_get_allocator(), env, jni_body_stream, false);
if (body_stream == NULL) {
exception_message = "aws_fill_out_request: Error building body stream";
goto on_error;
Expand Down
Loading
Loading