Skip to content

Commit c7e0940

Browse files
authored
HTTP/2 write stream support (#635)
1 parent 519388e commit c7e0940

12 files changed

+442
-82
lines changed

.builder/actions/aws_crt_python.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def run(self, env):
2929

3030
# Enable S3 tests
3131
env.shell.setenv('AWS_TEST_S3', '1')
32+
env.shell.setenv('AWS_TEST_LOCALHOST', '1')
3233

3334
actions = [
3435
[self.python, '--version'],

awscrt/http.py

Lines changed: 141 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,26 @@ def new(cls,
112112
If successful, the Future will contain a new :class:`HttpClientConnection`.
113113
Otherwise, it will contain an exception.
114114
"""
115+
return HttpClientConnection._new_common(
116+
host_name,
117+
port,
118+
bootstrap,
119+
socket_options,
120+
tls_connection_options,
121+
proxy_options)
122+
123+
@staticmethod
124+
def _new_common(
125+
host_name,
126+
port,
127+
bootstrap=None,
128+
socket_options=None,
129+
tls_connection_options=None,
130+
proxy_options=None,
131+
expected_version=None):
132+
"""
133+
Initialize the generic part of the HttpClientConnection class.
134+
"""
115135
assert isinstance(bootstrap, ClientBootstrap) or bootstrap is None
116136
assert isinstance(host_name, str)
117137
assert isinstance(port, int)
@@ -120,44 +140,30 @@ def new(cls,
120140
assert isinstance(proxy_options, HttpProxyOptions) or proxy_options is None
121141

122142
future = Future()
143+
123144
try:
124145
if not socket_options:
125146
socket_options = SocketOptions()
126147

127148
if not bootstrap:
128149
bootstrap = ClientBootstrap.get_or_create_static_default()
129150

130-
connection = cls()
131-
connection._host_name = host_name
132-
connection._port = port
133-
134-
def on_connection_setup(binding, error_code, http_version):
135-
if error_code == 0:
136-
connection._binding = binding
137-
connection._version = HttpVersion(http_version)
138-
future.set_result(connection)
139-
else:
140-
future.set_exception(awscrt.exceptions.from_code(error_code))
141-
142-
# on_shutdown MUST NOT reference the connection itself, just the shutdown_future within it.
143-
# Otherwise we create a circular reference that prevents the connection from getting GC'd.
144-
shutdown_future = connection.shutdown_future
145-
146-
def on_shutdown(error_code):
147-
if error_code:
148-
shutdown_future.set_exception(awscrt.exceptions.from_code(error_code))
149-
else:
150-
shutdown_future.set_result(None)
151+
connection_core = _HttpClientConnectionCore(
152+
host_name,
153+
port,
154+
bootstrap=bootstrap,
155+
tls_connection_options=tls_connection_options,
156+
connect_future=future,
157+
expected_version=expected_version)
151158

152159
_awscrt.http_client_connection_new(
153160
bootstrap,
154-
on_connection_setup,
155-
on_shutdown,
156161
host_name,
157162
port,
158163
socket_options,
159164
tls_connection_options,
160-
proxy_options)
165+
proxy_options,
166+
connection_core)
161167

162168
except Exception as e:
163169
future.set_exception(e)
@@ -219,6 +225,33 @@ def request(self, request, on_response=None, on_body=None):
219225
return HttpClientStream(self, request, on_response, on_body)
220226

221227

228+
class Http2ClientConnection(HttpClientConnection):
229+
"""
230+
HTTP/2 client connection.
231+
232+
This class extends HttpClientConnection with HTTP/2 specific functionality.
233+
"""
234+
@classmethod
235+
def new(cls,
236+
host_name,
237+
port,
238+
bootstrap=None,
239+
socket_options=None,
240+
tls_connection_options=None,
241+
proxy_options=None):
242+
return HttpClientConnection._new_common(
243+
host_name,
244+
port,
245+
bootstrap,
246+
socket_options,
247+
tls_connection_options,
248+
proxy_options,
249+
HttpVersion.Http2)
250+
251+
def request(self, request, on_response=None, on_body=None, manual_write=False):
252+
return Http2ClientStream(self, request, on_response, on_body, manual_write)
253+
254+
222255
class HttpStreamBase(NativeResource):
223256
"""Base for HTTP stream classes"""
224257
__slots__ = ('_connection', '_completion_future', '_on_body_cb')
@@ -258,9 +291,12 @@ class HttpClientStream(HttpStreamBase):
258291
completes. If the exchange fails to complete, the Future will
259292
contain an exception indicating why it failed.
260293
"""
261-
__slots__ = ('_response_status_code', '_on_response_cb', '_on_body_cb', '_request')
294+
__slots__ = ('_response_status_code', '_on_response_cb', '_on_body_cb', '_request', '_version')
262295

263296
def __init__(self, connection, request, on_response=None, on_body=None):
297+
self._generic_init(connection, request, on_response, on_body)
298+
299+
def _generic_init(self, connection, request, on_response=None, on_body=None, http2_manual_write=False):
264300
assert isinstance(connection, HttpClientConnection)
265301
assert isinstance(request, HttpRequest)
266302
assert callable(on_response) or on_response is None
@@ -273,8 +309,14 @@ def __init__(self, connection, request, on_response=None, on_body=None):
273309

274310
# keep HttpRequest alive until stream completes
275311
self._request = request
312+
self._version = connection.version
276313

277-
self._binding = _awscrt.http_client_stream_new(self, connection, request)
314+
self._binding = _awscrt.http_client_stream_new(self, connection, request, http2_manual_write)
315+
316+
@property
317+
def version(self):
318+
"""HttpVersion: Protocol used by this stream"""
319+
return self._version
278320

279321
@property
280322
def response_status_code(self):
@@ -307,6 +349,24 @@ def _on_complete(self, error_code):
307349
self._completion_future.set_exception(awscrt.exceptions.from_code(error_code))
308350

309351

352+
class Http2ClientStream(HttpClientStream):
353+
def __init__(self, connection, request, on_response=None, on_body=None, manual_write=False):
354+
super()._generic_init(connection, request, on_response, on_body, manual_write)
355+
356+
def write_data(self, data_stream, end_stream=False):
357+
future = Future()
358+
body_stream = InputStream.wrap(data_stream, allow_none=True)
359+
360+
def on_write_complete(error_code):
361+
if error_code:
362+
future.set_exception(awscrt.exceptions.from_code(error_code))
363+
else:
364+
future.set_result(None)
365+
366+
_awscrt.http2_client_stream_write_data(self, body_stream, end_stream, on_write_complete)
367+
return future
368+
369+
310370
class HttpMessageBase(NativeResource):
311371
"""
312372
Base for HttpRequest and HttpResponse classes.
@@ -625,3 +685,58 @@ def __init__(self,
625685
self.auth_username = auth_username
626686
self.auth_password = auth_password
627687
self.connection_type = connection_type
688+
689+
690+
class _HttpClientConnectionCore:
691+
'''
692+
Private class to keep all the related Python object alive until C land clean up for HttpClientConnection
693+
'''
694+
695+
def __init__(
696+
self,
697+
host_name,
698+
port,
699+
bootstrap=None,
700+
tls_connection_options=None,
701+
connect_future=None,
702+
expected_version=None):
703+
self._shutdown_future = None
704+
self._host_name = host_name
705+
self._port = port
706+
self._bootstrap = bootstrap
707+
self._tls_connection_options = tls_connection_options
708+
self._connect_future = connect_future
709+
self._expected_version = expected_version
710+
711+
def _on_connection_setup(self, binding, error_code, http_version):
712+
if self._expected_version and self._expected_version != http_version:
713+
# unexpected protocol version
714+
# AWS_ERROR_HTTP_UNSUPPORTED_PROTOCOL
715+
self._connect_future.set_exception(awscrt.exceptions.from_code(2060))
716+
return
717+
if error_code != 0:
718+
self._connect_future.set_exception(awscrt.exceptions.from_code(error_code))
719+
return
720+
if http_version == HttpVersion.Http2:
721+
connection = Http2ClientConnection()
722+
else:
723+
connection = HttpClientConnection()
724+
725+
connection._host_name = self._host_name
726+
connection._port = self._port
727+
728+
connection._binding = binding
729+
connection._version = HttpVersion(http_version)
730+
self._shutdown_future = connection.shutdown_future
731+
self._connect_future.set_result(connection)
732+
# release reference to the future, as it points to connection which creates a cycle reference.
733+
self._connect_future = None
734+
735+
def _on_shutdown(self, error_code):
736+
if self._shutdown_future is None:
737+
# connection failed, ignore shutdown
738+
return
739+
if error_code:
740+
self._shutdown_future.set_exception(awscrt.exceptions.from_code(error_code))
741+
else:
742+
self._shutdown_future.set_result(None)

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,5 @@ dev = [
3535
"build>=1.2.2", # for building wheels
3636
"sphinx>=7.2.6,<7.3; python_version >= '3.9'", # for building docs
3737
"websockets>=13.1", # for tests
38+
"h2", # for tests
3839
]

source/http.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ PyObject *aws_py_http_connection_is_open(PyObject *self, PyObject *args);
3737
PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args);
3838

3939
PyObject *aws_py_http_client_stream_new(PyObject *self, PyObject *args);
40-
4140
PyObject *aws_py_http_client_stream_activate(PyObject *self, PyObject *args);
4241

42+
PyObject *aws_py_http2_client_stream_write_data(PyObject *self, PyObject *args);
43+
4344
/* Create capsule around new request-style aws_http_message struct */
4445
PyObject *aws_py_http_message_new_request(PyObject *self, PyObject *args);
4546

0 commit comments

Comments
 (0)