-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathHttp1StreamManager.java
More file actions
162 lines (144 loc) · 7.13 KB
/
Http1StreamManager.java
File metadata and controls
162 lines (144 loc) · 7.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
package software.amazon.awssdk.crt.http;
import software.amazon.awssdk.crt.CrtRuntimeException;
import java.util.concurrent.CompletableFuture;
/**
* Manages a Pool of HTTP/1.1 Streams. Creates and manages HTTP/1.1 connections
* under the hood. Will grab a connection from HttpClientConnectionManager to
* make request on it, and will return it back until the request finishes.
*/
public class Http1StreamManager implements AutoCloseable {
private HttpClientConnectionManager connectionManager = null;
/**
* Factory function for Http1StreamManager instances
*
* @param options the connection manager options configure to connection manager under the hood
* @return a new instance of an Http1StreamManager
*/
public static Http1StreamManager create(HttpClientConnectionManagerOptions options) {
return new Http1StreamManager(options);
}
private Http1StreamManager(HttpClientConnectionManagerOptions options) {
this.connectionManager = HttpClientConnectionManager.create(options);
}
public CompletableFuture<Void> getShutdownCompleteFuture() {
return this.connectionManager.getShutdownCompleteFuture();
}
/**
* Request an HTTP/1.1 HttpStream from StreamManager.
*
* @param request HttpRequest. The Request to make to the Server.
* @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop
* @return A future for a HttpStream that will be completed when the stream is
* acquired.
* @throws CrtRuntimeException Exception happens from acquiring stream.
*/
public CompletableFuture<HttpStream> acquireStream(HttpRequest request,
HttpStreamBaseResponseHandler streamHandler) {
return this.acquireStream((HttpRequestBase) request, streamHandler);
}
/**
* Request an HTTP/1.1 HttpStream from StreamManager.
*
* @param request HttpRequest. The Request to make to the Server.
* @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop
* @param useManualDataWrites A boolean variable to signal that body will be streamed using async writes.
* @return A future for a HttpStream that will be completed when the stream is
* acquired.
* @throws CrtRuntimeException Exception happens from acquiring stream.
*/
public CompletableFuture<HttpStream> acquireStream(HttpRequest request,
HttpStreamBaseResponseHandler streamHandler, boolean useManualDataWrites) {
return this.acquireStream((HttpRequestBase) request, streamHandler, useManualDataWrites);
}
/**
* Request an HTTP/1.1 HttpStream from StreamManager.
*
* @param request HttpRequestBase. The Request to make to the Server.
* @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop
* @return A future for a HttpStream that will be completed when the stream is
* acquired.
* @throws CrtRuntimeException Exception happens from acquiring stream.
*/
public CompletableFuture<HttpStream> acquireStream(HttpRequestBase request,
HttpStreamBaseResponseHandler streamHandler) {
return this.acquireStream(request, streamHandler, false); // overloading to ensure backward-compatibility
}
/**
* Request an HTTP/1.1 HttpStream from StreamManager.
*
* @param request HttpRequestBase. The Request to make to the Server.
* @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop
* @param useManualDataWrites A boolean variable to signal that body will be streamed using async writes.
* @return A future for a HttpStream that will be completed when the stream is
* acquired.
* @throws CrtRuntimeException Exception happens from acquiring stream.
*/
public CompletableFuture<HttpStream> acquireStream(HttpRequestBase request,
HttpStreamBaseResponseHandler streamHandler, boolean useManualDataWrites) {
CompletableFuture<HttpStream> completionFuture = new CompletableFuture<>();
HttpClientConnectionManager connManager = this.connectionManager;
connManager.acquireConnection().whenComplete((conn, throwable) -> {
if (throwable != null) {
completionFuture.completeExceptionally(throwable);
} else {
try {
HttpStreamBase stream = conn.makeRequest(request, new HttpStreamBaseResponseHandler() {
@Override
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
HttpHeader[] nextHeaders) {
streamHandler.onResponseHeaders(stream, responseStatusCode, blockType, nextHeaders);
}
@Override
public void onResponseHeadersDone(HttpStreamBase stream, int blockType) {
streamHandler.onResponseHeadersDone(stream, blockType);
}
@Override
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
return streamHandler.onResponseBody(stream, bodyBytesIn);
}
@Override
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
streamHandler.onResponseComplete(stream, errorCode);
/* Release the connection back */
connManager.releaseConnection(conn);
}
}, useManualDataWrites);
completionFuture.complete((HttpStream) stream);
/* Active the stream for user */
try {
stream.activate();
} catch (CrtRuntimeException e) {
/* If activate failed, complete callback will not be invoked */
streamHandler.onResponseComplete(stream, e.errorCode);
/* Release the connection back */
connManager.releaseConnection(conn);
}
} catch (Exception ex) {
connManager.releaseConnection(conn);
completionFuture.completeExceptionally(ex);
}
}
});
return completionFuture;
}
/**
* @return concurrency metrics for the current manager
*/
public HttpManagerMetrics getManagerMetrics() {
return this.connectionManager.getManagerMetrics();
}
/**
* @return maximum number of connections this manager will pool
*/
public int getMaxConnections() {
return this.connectionManager.getMaxConnections();
}
@Override
public void close() {
this.connectionManager.close();
}
}