Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSCRTHTTPClient-aee08c2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT HTTP Client",
"contributor": "",
"description": "Fixed a potential deadlock in `AwsCrtHttpClient` that could occur when the request body `InputStream` blocked waiting for data on the CRT event loop thread. This could happen when a blocking stream (e.g., a `BufferedInputStream` wrapping a `ResponseInputStream`) was used as a request body and the read depended on the same event loop thread to deliver data. Request body writing now happens on the caller thread."
}
26 changes: 26 additions & 0 deletions http-clients/aws-crt-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,32 @@
</archive>
</configuration>
</plugin>
<!-- The Reactive Streams TCK tests are based on TestNG. See http://maven.apache.org/surefire/maven-surefire-plugin/examples/testng.html#Running_TestNG_and_JUnit_Tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven.surefire.version}</version>
<configuration>
<properties>
<property>
<name>junit</name>
<value>false</value>
</property>
</properties>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit-platform</artifactId>
<version>${maven.surefire.version}</version>
</dependency>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-testng</artifactId>
<version>${maven.surefire.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
import static software.amazon.awssdk.http.HttpMetric.HTTP_CLIENT_NAME;

import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
Expand All @@ -35,6 +38,8 @@
import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase;
import software.amazon.awssdk.http.crt.internal.CrtRequestContext;
import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor;
import software.amazon.awssdk.http.crt.internal.CrtStreamHandler;
import software.amazon.awssdk.http.crt.internal.CrtUtils;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.CompletableFutureUtils;

Expand Down Expand Up @@ -107,6 +112,8 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
}

private static final class CrtHttpRequest implements ExecutableHttpRequest {
private static final int WRITE_BUFFER_SIZE = 16 * 1024;

private final CrtRequestContext context;
private volatile CompletableFuture<SdkHttpFullResponse> responseFuture;

Expand All @@ -119,7 +126,10 @@ public HttpExecuteResponse call() throws IOException {
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();

try {
responseFuture = new CrtRequestExecutor().execute(context);
CrtRequestExecutor.Result result = new CrtRequestExecutor().execute(context);
responseFuture = result.responseFuture();
writeRequestBody(result.streamHandler());

SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
builder.response(response);
builder.responseBody(response.content().orElse(null));
Expand All @@ -140,6 +150,10 @@ public HttpExecuteResponse call() throws IOException {
}

if (cause instanceof HttpException) {
Throwable wrapped = CrtUtils.wrapCrtException(cause);
if (wrapped instanceof IOException) {
throw (IOException) wrapped;
}
throw (HttpException) cause;
}

Expand All @@ -151,6 +165,24 @@ public HttpExecuteResponse call() throws IOException {
}
}

private void writeRequestBody(CrtStreamHandler streamHandler) throws IOException {
ContentStreamProvider provider = context.sdkRequest().contentStreamProvider().orElse(null);
if (provider == null) {
return;
}

streamHandler.waitForStream();
try (InputStream inputStream = provider.newStream()) {
byte[] buf = new byte[WRITE_BUFFER_SIZE];
int read;
while ((read = inputStream.read(buf, 0, buf.length)) >= 0) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query : I see caller of writeRequestBody catches CompletionException ? How is streamHandler.closeConnection() called in the case when IOException is thrown by inputStream.read() also writeRequestBody function signature also says it throws IOException)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Will fix

byte[] chunk = read == buf.length ? buf : Arrays.copyOf(buf, read);
CompletableFutureUtils.joinInterruptibly(streamHandler.writeData(chunk, false));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we run a throughput/performance benchmark comparing before and after these changes to see if there's any impact?

}
CompletableFutureUtils.joinInterruptibly(streamHandler.writeData(null, true));
}
}

@Override
public void abort() {
if (responseFuture != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,32 @@
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.crt.internal.request.CrtRequestBodyPublisherSubscriber;
import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.metrics.NoOpMetricCollector;
import software.amazon.awssdk.utils.Logger;

@SdkInternalApi
public final class CrtAsyncRequestExecutor {

private static final Logger log = Logger.loggerFor(CrtAsyncRequestExecutor.class);

public CompletableFuture<Void> execute(CrtAsyncRequestContext executionContext) {
AsyncExecuteRequest asyncRequest = executionContext.sdkRequest();
CompletableFuture<Void> requestFuture = createAsyncExecutionFuture(asyncRequest);
ResponseHandlerErrorNotifier errorNotifier = new ResponseHandlerErrorNotifier(asyncRequest.responseHandler());

try {
doExecute(executionContext, asyncRequest, requestFuture);
doExecute(executionContext, asyncRequest, requestFuture, errorNotifier);
} catch (Throwable t) {
reportAsyncFailure(t, requestFuture, asyncRequest.responseHandler());
reportAsyncFailure(t, requestFuture, errorNotifier);
}

return requestFuture;
}

private void doExecute(CrtAsyncRequestContext executionContext,
AsyncExecuteRequest asyncRequest,
CompletableFuture<Void> requestFuture) {
CompletableFuture<Void> requestFuture,
ResponseHandlerErrorNotifier errorNotifier) {
MetricCollector metricCollector = executionContext.metricCollector();
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);

Expand All @@ -67,24 +66,48 @@ private void doExecute(CrtAsyncRequestContext executionContext,

HttpRequestBase crtRequest = toAsyncCrtRequest(executionContext);

CompletableFuture<HttpStreamBase> streamFuture = new CompletableFuture<>();
CrtStreamHandler streamHandler = new CrtStreamHandler(streamFuture);

HttpStreamBaseResponseHandler crtResponseHandler =
CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler());
CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler(), streamHandler, errorNotifier);

CompletableFuture<HttpStreamBase> streamFuture =
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);
CrtRequestBodyPublisherSubscriber bodySubscriber =
new CrtRequestBodyPublisherSubscriber(streamHandler, requestFuture, errorNotifier);

long finalAcquireStartTime = acquireStartTime;

streamFuture.whenComplete((stream, throwable) -> {
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, true)
.handle((stream, throwable) -> {
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}

if (throwable != null) {
handleAcquireFailure(throwable, streamFuture, requestFuture, errorNotifier);
return null;
}
try {
stream.activate();
streamFuture.complete(stream);
asyncRequest.requestContentPublisher().subscribe(bodySubscriber);
} catch (Throwable t) {
handleAcquireFailure(t, streamFuture, requestFuture, errorNotifier);
}
return null;
}).exceptionally(t -> {
handleAcquireFailure(t, streamFuture, requestFuture, errorNotifier);
return null;
});
}

if (throwable != null) {
Throwable toThrow = wrapCrtException(throwable);
reportAsyncFailure(toThrow, requestFuture, asyncRequest.responseHandler());
}
});
private void handleAcquireFailure(Throwable t,
CompletableFuture<HttpStreamBase> streamFuture,
CompletableFuture<Void> requestFuture,
ResponseHandlerErrorNotifier errorNotifier) {
Throwable toThrow = wrapCrtException(t);
streamFuture.completeExceptionally(toThrow);
reportAsyncFailure(toThrow, requestFuture, errorNotifier);
}

/**
Expand All @@ -109,18 +132,10 @@ private CompletableFuture<Void> createAsyncExecutionFuture(AsyncExecuteRequest r
return future;
}

/**
* Notify the provided response handler and future of the failure.
*/
private void reportAsyncFailure(Throwable cause,
CompletableFuture<Void> executeFuture,
SdkAsyncHttpResponseHandler responseHandler) {
try {
responseHandler.onError(cause);
} catch (Exception e) {
log.error(() -> "SdkAsyncHttpResponseHandler " + responseHandler + " threw an exception in onError. It will be "
+ "ignored.", e);
}
ResponseHandlerErrorNotifier errorNotifier) {
errorNotifier.tryNotifyError(cause);
executeFuture.completeExceptionally(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter;
import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler;
Expand All @@ -32,49 +31,91 @@
@SdkInternalApi
public final class CrtRequestExecutor {

public CompletableFuture<SdkHttpFullResponse> execute(CrtRequestContext executionContext) {
CompletableFuture<SdkHttpFullResponse> requestFuture = new CompletableFuture<>();
public Result execute(CrtRequestContext executionContext) {
CompletableFuture<SdkHttpFullResponse> responseFuture = new CompletableFuture<>();
CompletableFuture<HttpStreamBase> streamFuture = new CompletableFuture<>();
CrtStreamHandler streamHandler = new CrtStreamHandler(streamFuture);

try {
doExecute(executionContext, requestFuture);
doExecute(executionContext, responseFuture, streamHandler, streamFuture);
} catch (Throwable t) {
requestFuture.completeExceptionally(t);
// Fail streamFuture too so any caller blocked in waitForStream() unblocks.
streamFuture.completeExceptionally(t);
responseFuture.completeExceptionally(t);
}

return requestFuture;
return new Result(responseFuture, streamHandler);
}

private void doExecute(CrtRequestContext executionContext, CompletableFuture<SdkHttpFullResponse> requestFuture) {
private void doExecute(CrtRequestContext executionContext,
CompletableFuture<SdkHttpFullResponse> responseFuture,
CrtStreamHandler streamHandler,
CompletableFuture<HttpStreamBase> streamFuture) {
MetricCollector metricCollector = executionContext.metricCollector();
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);

long acquireStartTime = 0;

if (shouldPublishMetrics) {
// go ahead and get acquireStartTime for the concurrency timer as early as possible,
// so it's as accurate as possible, but only do it in a branch since clock_gettime()
// results in a full sys call barrier (multiple mutexes and a hw interrupt).
acquireStartTime = System.nanoTime();
}

HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);
InputStreamAdaptingHttpStreamResponseHandler crtResponseHandler =
new InputStreamAdaptingHttpStreamResponseHandler(responseFuture, streamHandler);

HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext);

CompletableFuture<HttpStreamBase> streamFuture =
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);
boolean hasBody = executionContext.sdkRequest().contentStreamProvider().isPresent();

long finalAcquireStartTime = acquireStartTime;

streamFuture.whenComplete((streamBase, throwable) -> {
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, hasBody)
.handle((streamBase, throwable) -> {
if (shouldPublishMetrics) {
reportMetrics(executionContext.streamManager(), metricCollector, finalAcquireStartTime);
}

if (throwable != null) {
handleAcquireFailure(throwable, streamFuture, responseFuture);
return null;
}
try {
streamBase.activate();
streamFuture.complete(streamBase);
} catch (Throwable t) {
handleAcquireFailure(t, streamFuture, responseFuture);
}
Comment thread
zoewangg marked this conversation as resolved.
return null;
}).exceptionally(t -> {
handleAcquireFailure(t, streamFuture, responseFuture);
return null;
});
}

private void handleAcquireFailure(Throwable t,
CompletableFuture<HttpStreamBase> streamFuture,
CompletableFuture<SdkHttpFullResponse> responseFuture) {
Throwable toThrow = wrapCrtException(t);
streamFuture.completeExceptionally(toThrow);
responseFuture.completeExceptionally(toThrow);
}

if (throwable != null) {
Throwable toThrow = wrapCrtException(throwable);
requestFuture.completeExceptionally(toThrow);
}
});
@SdkInternalApi
public static final class Result {
private final CompletableFuture<SdkHttpFullResponse> responseFuture;
private final CrtStreamHandler streamHandler;

private Result(CompletableFuture<SdkHttpFullResponse> responseFuture, CrtStreamHandler streamHandler) {
this.responseFuture = responseFuture;
this.streamHandler = streamHandler;
}

public CompletableFuture<SdkHttpFullResponse> responseFuture() {
return responseFuture;
}

public CrtStreamHandler streamHandler() {
return streamHandler;
}
}
}
Loading
Loading