Skip to content

Update CRT sync HTTP client to use caller thread to read from input stream#7027

Open
zoewangg wants to merge 9 commits into
masterfrom
zoewang/crtSyncPullPump
Open

Update CRT sync HTTP client to use caller thread to read from input stream#7027
zoewangg wants to merge 9 commits into
masterfrom
zoewang/crtSyncPullPump

Conversation

@zoewangg

@zoewangg zoewangg commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Motivation and Context

AwsCrtHttpClient (sync) deadlocks when a customer pipes a streaming InputStream from one request into the body of another request that shares the same CRT event loop. The most common pattern: GetObject returns a ResponseInputStream whose bytes are delivered by event-loop thread T1; the customer passes that stream as the body of PutObject on the same client; CRT's pull callback runs inputStream.read() on T1, which blocks waiting for bytes that T1 itself is supposed to deliver. The thread blocks on its own work and the request hangs indefinitely.

The root cause is architectural: in master, the synchronous CRT pull callback sendRequestBody(ByteBuffer) invokes inputStream.read() directly on the CRT event-loop thread. Any blocking InputStream that depends on the same event loop reaches this state.

Modifications

Move the blocking inputStream.read() off the CRT event-loop thread by introducing a producer/consumer hand-off:

  • The caller's thread (already blocked on responseFuture.join()) runs a producer loop that reads the body into a bounded chunk pool.
  • The CRT event-loop thread drains chunks via a non-blocking pollDrain consumer that returns 0 when no data is ready, allowing CRT to reschedule via aws_channel_schedule_task_now.

New internal types under software.amazon.awssdk.http.crt.internal.request:

  • BodyChunkPipe — bounded SPSC pipe with OPEN → EOF/ERROR/ABORTED state machine, lazy chunk allocation gated on connection acquisition.
  • Chunk — reusable byte-buffer wrapper with package-private accessors.
  • PipeBackedRequestBodyStreamHttpRequestBodyStream impl whose sendRequestBody is non-blocking.
  • SyncRequestBodyPump — caller-thread producer loop.
          acquireForFill()              publish()
    free ──────────────────▶ producer ──────────▶ ready
     ▲                                               │
     │                                               │ pollDrain()
     └───────────────── recycle() ◀──────────────────┘
                (consumer finished draining it)

Wiring changes:

  • CrtRequestExecutor#execute returns a Result(responseFuture, pump, streamFuture) exposing the stream future for the caller-thread gate.
  • CrtHttpRequest#call() now waits for stream acquisition before invoking pump.pump(), and registers a responseFuture.whenComplete hook that aborts the pump on exceptional completion to wake a parked producer if CRT fails the request mid-pump.

CrtRequestInputStreamAdapter is removed; replaced by the pipe-backed flow.

Tuning: pool depth = 4 chunks × 128 KB. Earlier perf experiments showed depth=2 starves the consumer (high futex/wake cost) and 16 KB chunks don't amortize the per-chunk coordination cost. Lazy allocation means queued-for-connection requests don't pin body buffers.

Async client path is unchanged.

Testing

Added unit tests and integ tests

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)

Checklist

License

  • I confirm that this pull request can be released under the Apache 2 license

zoewangg added 3 commits June 10, 2026 11:06
Move blocking InputStream.read() off the CRT event-loop thread to fix
a deadlock when a PUT body is sourced from a GET InputStream that
shares the same event loop.

Caller thread runs a producer loop reading the body into a bounded chunk
pool; CRT's pull callback drains via non-blocking pollDrain. Pool is
allocated lazily after stream acquisition (depth=4 chunks, 128KB each)
to avoid pinning heap on requests queued for a connection.
…Pump

# Conflicts:
#	http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java
* per-request footprint minimal until the producer actually starts pumping (i.e., until after the
* CRT stream has been acquired).
*/
Chunk acquireForFill() throws InterruptedException {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why default visibility?

* CRT stream has been acquired).
*/
Chunk acquireForFill() throws InterruptedException {
synchronized (freeLock) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using locking, could we use something like the ConcurrentLinkedDeque?

return totalBytesConsumed;
}
}
int n = Math.min(dst.remaining(), pendingDrain.len() - pendingDrain.pos());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto on this - I think this may need to be a long

Comment on lines +234 to +235
c.pos(0);
c.len(0);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor - maybe a clear or reset method on the chunk?

* @return number of bytes drained, or {@code -1} on EOF with no remaining data.
* @throws RuntimeException if the pipe is in ERROR or ABORTED state with no remaining data.
*/
int pollDrain(ByteBuffer dst) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a long instead of int here.

totalBytesConsumed += n;
if (pendingDrain.pos() >= pendingDrain.len()) {
// The chunk has been fully copied into dst, so we return it to the free deque
// (and notify the producer in case it was waiting). This is what bounds the pool:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this notify the producer? Is it because of the freeLock.notifyAll();?

// pipe was aborted while we were waiting; stop without signaling EOF.
return;
}
int read;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto on int->long here (probably)

public void pump() throws IOException {
try (InputStream in = contentStreamProvider.newStream()) {
while (true) {
Chunk chunk = pipe.acquireForFill();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to check my own understanding - this will block waiting for a chunk and so is what provides back pressure right?

Comment on lines +67 to +68
Thread.currentThread().interrupt();
pipe.abort();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be totally off here, but I think generally the current thread interrupt should be after the other handling (pipe.abort()).

zoewangg added 6 commits June 10, 2026 20:15
When assertFailsWithinTimeBound times out waiting for the request future,
print a full thread dump (via ThreadMXBean.dumpAllThreads with locked
monitors and synchronizers) to stderr before throwing the AssertionError.

Surefire captures the forked-JVM stderr to surefire-reports, so the dump
survives Catapult's report-export step. Search the per-class -output.txt
for the marker "=== THREAD DUMP ===".

Adds checkstyle-suppressions entries for LongRunningRequestTestSupport so
the java.lang.management imports and System.err usage do not trip
NonJavaBaseModuleCheck and the System-console Regexp rule.
Routes the hang-time thread dump through Logger.error so it lands in the
SDK's configured appender alongside the lifecycle logs, instead of racing
with CRT native stderr writes (the surefire "Corrupted channel by directly
writing to native stream" warning). Drops the now-unneeded Regexp
suppression for LongRunningRequestTestSupport; keeps the
NonJavaBaseModuleCheck suppression because java.lang.management is still
used by dumpAllThreads().
- Add Chunk.reset() and use it in BodyChunkPipe.recycle so the position+length
  reset is a single named call.
- Swap the order in SyncRequestBodyPump's InterruptedException handler so
  cleanup (pipe.abort()) runs before re-asserting the thread interrupt,
  matching the conventional "do work, then re-interrupt last" idiom.
Gates crtResponseHandler.onAcquireStream(stream) on throwable == null in
both CrtRequestExecutor and CrtAsyncRequestExecutor. On the failure path,
streamBase is null and onAcquireStream(null) is wrong even though it
happens to be a no-op today: passing null to a method that expects a real
stream is a contract violation. Knowingly diverges from origin/master,
which has the same issue.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants