88import software .amazon .awssdk .crt .http .HttpRequestBodyStream ;
99import software .amazon .awssdk .crt .s3 .*;
1010
11- import java .nio .ByteBuffer ;
11+ import java .lang .foreign .Arena ;
12+ import java .lang .foreign .MemorySegment ;
13+ import java .lang .foreign .ValueLayout ;
1214import java .nio .charset .StandardCharsets ;
1315import java .nio .file .Path ;
1416import java .util .ArrayList ;
1719
1820/**
1921 * A single transfer task (upload or download) executed via the FFM-backed
20- * aws-crt-java S3Client. The logic is identical to CRTJavaTask; the FFM
21- * internals are transparent at this level since aws-crt-java exposes the same
22- * public API regardless of whether JNI or FFM is used under the hood.
23- *
24- * If the FFM branch of aws-crt-java changes the HttpRequestBodyStream
25- * sendRequestBody() signature (e.g. ByteBuffer -> MemorySegment), update
26- * UploadFromRamStream below accordingly.
22+ * aws-crt-java S3Client.
23+ * <p>
24+ * This task uses {@link S3MetaRequestOptions#withUseFFM(boolean) useFFM=true}
25+ * so that:
26+ * <ul>
27+ * <li><b>Downloads:</b> response body chunks are delivered as
28+ * {@link MemorySegment} views of native memory (zero-copy, no
29+ * {@code byte[]} allocation).</li>
30+ * <li><b>Uploads:</b> the upload stream writes directly into the native
31+ * buffer via {@link MemorySegment} (no {@code DirectByteBuffer} wrapper
32+ * object allocation).</li>
33+ * </ul>
2734 */
2835class FFMJavaTask implements S3MetaRequestResponseHandler {
2936
@@ -43,6 +50,9 @@ class FFMJavaTask implements S3MetaRequestResponseHandler {
4350
4451 options .withResponseHandler (this );
4552
53+ // Enable FFM mode: zero-copy downloads, direct-write uploads
54+ options .withUseFFM (true );
55+
4656 String httpMethod ;
4757 String httpPath = "/" + config .key ;
4858 HttpRequestBodyStream requestUploadStream = null ;
@@ -100,11 +110,21 @@ void waitUntilDone() {
100110 }
101111 }
102112
113+ /**
114+ * FFM download path: body chunk delivered as a zero-copy {@link MemorySegment}
115+ * view of native memory. The benchmark discards the data, so we just return 0.
116+ */
117+ @ Override
118+ public int onResponseBody (MemorySegment bodyBytesIn , long objectRangeStart , long objectRangeEnd ) {
119+ // Benchmark discards downloaded data — no backpressure increment needed.
120+ return 0 ;
121+ }
122+
103123 @ Override
104124 public void onFinished (S3FinishedResponseContext context ) {
105125 if (context .getErrorCode () != 0 ) {
106126 // FFMJavaTask failed. Report error and kill program...
107- System .err .printf ("FFMJavaTask[%d] failed. actions :%s key:%s error_code:%s/ n" ,
127+ System .err .printf ("FFMJavaTask[%d] failed. action :%s key:%s error_code:%s% n" ,
108128 taskI , config .action , config .key , CRT .awsErrorName (context .getErrorCode ()));
109129
110130 if (context .getResponseStatus () != 0 ) {
@@ -128,6 +148,14 @@ public void onFinished(S3FinishedResponseContext context) {
128148 }
129149 }
130150
151+ /**
152+ * FFM upload stream: writes random data directly into the native buffer via
153+ * {@link MemorySegment}, avoiding the {@code DirectByteBuffer} wrapper object
154+ * that the JNI path allocates on every call.
155+ * <p>
156+ * Returns the number of bytes written. Returning {@code 0} signals
157+ * end-of-stream to the native layer.
158+ */
131159 static class UploadFromRamStream implements HttpRequestBodyStream {
132160 final long size ;
133161 long bytesWritten ;
@@ -138,21 +166,45 @@ static class UploadFromRamStream implements HttpRequestBodyStream {
138166 this .size = size ;
139167 }
140168
169+ /**
170+ * FFM path: write directly into the native buffer at {@code address}.
171+ * Returns bytes written; 0 signals end-of-stream.
172+ */
141173 @ Override
142- public boolean sendRequestBody (ByteBuffer dstBuf ) {
143- /*
144- * `randomData` is just a buffer of random data whose length may not equal
145- * `size`. We'll send its contents repeatedly until size bytes have been
146- * uploaded. We do this, so we can upload huge objects without actually
147- * allocating a huge buffer.
148- */
149- while (bytesWritten < size && dstBuf .remaining () > 0 ) {
150- int amtToTransfer = (int ) Math .min (size - bytesWritten , dstBuf .remaining ());
151- amtToTransfer = Math .min (amtToTransfer , randomData .length );
152- dstBuf .put (randomData , 0 , amtToTransfer );
153- bytesWritten += amtToTransfer ;
174+ public int sendRequestBody (long address , long length ) {
175+ long remaining = size - bytesWritten ;
176+ if (remaining <= 0 ) {
177+ return 0 ; // end-of-stream
178+ }
179+
180+ long toWrite = Math .min (remaining , length );
181+
182+ // Wrap the native destination buffer as a MemorySegment (zero-copy).
183+ MemorySegment dest = MemorySegment .ofAddress (address )
184+ .reinterpret (toWrite , Arena .ofAuto (), null );
185+
186+ // Copy from randomData (looping) into the native segment.
187+ long written = 0 ;
188+ while (written < toWrite ) {
189+ int chunk = (int ) Math .min (toWrite - written , randomData .length );
190+ MemorySegment .copy (MemorySegment .ofArray (randomData ), ValueLayout .JAVA_BYTE , 0 ,
191+ dest , ValueLayout .JAVA_BYTE , written , chunk );
192+ written += chunk ;
154193 }
155- return bytesWritten == size ;
194+
195+ bytesWritten += written ;
196+ return (int ) written ;
197+ }
198+
199+ @ Override
200+ public boolean resetPosition () {
201+ bytesWritten = 0 ;
202+ return true ;
203+ }
204+
205+ @ Override
206+ public long getLength () {
207+ return size ;
156208 }
157209 }
158210}
0 commit comments