99# SPDX-License-Identifier: Apache-2.0.
1010
1111import asyncio
12+ import io
13+ import _awscrt
1214from concurrent .futures import Future
1315import awscrt .exceptions
14- from typing import List , Tuple , Optional , Union , Callable , Any
16+ from typing import List , Tuple , Optional , Union , Callable , Any , AsyncIterator
1517from awscrt .http import (
16- HttpClientConnection , HttpRequest , HttpClientStream , HttpProxyOptions ,
18+ HttpClientConnectionBase , HttpRequest , HttClientStreamBase , HttpProxyOptions ,
1719 Http2Setting , HttpVersion , Http2ClientStream
1820)
1921from awscrt .io import (
2224from collections import deque
2325
2426
25- class HttpClientConnectionAsync (HttpClientConnection ):
27+ class HttpClientConnectionAsync (HttpClientConnectionBase ):
2628 """
2729 An async HTTP client connection.
2830
2931 Use `HttpClientConnectionAsync.new()` to establish a new connection.
3032 """
31- __slots__ = ('_host_name' , '_port' )
3233
3334 @classmethod
3435 async def new (cls ,
@@ -62,7 +63,7 @@ async def new(cls,
6263 Returns:
6364 HttpClientConnectionAsync: A new HTTP client connection.
6465 """
65- future = HttpClientConnection ._generic_new (
66+ future = cls ._generic_new (
6667 host_name ,
6768 port ,
6869 bootstrap ,
@@ -81,8 +82,8 @@ async def close(self) -> None:
8182 Returns:
8283 None: When shutdown is complete.
8384 """
84- close_future = super (). close ( )
85- await asyncio .wrap_future (close_future )
85+ _awscrt . http_connection_close ( self . _binding )
86+ await asyncio .wrap_future (self . shutdown_future )
8687
8788 def request (self ,
8889 request : 'HttpRequest' ,
@@ -100,7 +101,7 @@ def request(self,
100101 return HttpClientStreamAsync (self , request , loop )
101102
102103
103- class Http2ClientConnectionAsync (HttpClientConnectionAsync ):
104+ class Http2ClientConnectionAsync (HttpClientConnectionBase ):
104105 """
105106 An async HTTP/2 client connection.
106107
@@ -133,7 +134,7 @@ async def new(cls,
133134
134135 * `settings` (List[Http2Setting]): List of settings that were changed.
135136 """
136- future = HttpClientConnection ._generic_new (
137+ future = cls ._generic_new (
137138 host_name ,
138139 port ,
139140 bootstrap ,
@@ -146,6 +147,18 @@ async def new(cls,
146147 asyncio_connection = True )
147148 return await asyncio .wrap_future (future )
148149
150+ async def close (self ) -> None :
151+ """Close the connection asynchronously.
152+
153+ Shutdown is asynchronous. This call has no effect if the connection is already
154+ closing.
155+
156+ Returns:
157+ None: When shutdown is complete.
158+ """
159+ _awscrt .http_connection_close (self ._binding )
160+ await asyncio .wrap_future (self .shutdown_future )
161+
149162 def request (self ,
150163 request : 'HttpRequest' ,
151164 manual_write : bool = False ,
@@ -164,23 +177,7 @@ def request(self,
164177 return Http2ClientStreamAsync (self , request , manual_write , loop )
165178
166179
167- class HttpClientStreamAsync (HttpClientStream ):
168- """Async HTTP stream that sends a request and receives a response.
169-
170- Create an HttpClientStreamAsync with `HttpClientConnectionAsync.request()`.
171-
172- Attributes:
173- connection (HttpClientConnectionAsync): This stream's connection.
174-
175- completion_future (asyncio.Future): Future that will contain
176- the response status code (int) when the request/response exchange
177- completes. If the exchange fails to complete, the Future will
178- contain an exception indicating why it failed.
179-
180- Notes:
181- All async method on a stream (await stream.next(), etc.) must be performed in the
182- thread that owns the event loop used to create the stream
183- """
180+ class HttpClientStreamAsyncBase (HttClientStreamBase ):
184181 __slots__ = (
185182 '_response_status_future' ,
186183 '_response_headers_future' ,
@@ -191,18 +188,6 @@ class HttpClientStreamAsync(HttpClientStream):
191188 '_status_code' ,
192189 '_loop' )
193190
194- def __init__ (self , connection : HttpClientConnectionAsync , request : HttpRequest ,
195- loop : Optional [asyncio .AbstractEventLoop ] = None ) -> None :
196- """Initialize an HTTP client stream.
197-
198- Args:
199- connection (HttpClientConnectionAsync): The connection to send the request on.
200- request (HttpRequest): The HTTP request to send.
201- loop (Optional[asyncio.AbstractEventLoop]): Event loop to use for async operations.
202- If None, the current event loop is used.
203- """
204- self ._init_common (connection , request , loop = loop )
205-
206191 def _init_common (self , connection : HttpClientConnectionAsync ,
207192 request : HttpRequest ,
208193 http2_manual_write : bool = False ,
@@ -231,7 +216,7 @@ def _init_common(self, connection: HttpClientConnectionAsync,
231216 self ._status_code = None
232217
233218 # Activate the stream immediately
234- self . activate ( )
219+ _awscrt . http_client_stream_activate ( self )
235220
236221 def _on_response (self , status_code : int , name_value_pairs : List [Tuple [str , str ]]) -> None :
237222 self ._status_code = status_code
@@ -311,7 +296,38 @@ async def wait_for_completion(self) -> int:
311296 return await self ._completion_future
312297
313298
314- class Http2ClientStreamAsync (HttpClientStreamAsync , Http2ClientStream ):
299+ class HttpClientStreamAsync (HttpClientStreamAsyncBase ):
300+ """Async HTTP stream that sends a request and receives a response.
301+
302+ Create an HttpClientStreamAsync with `HttpClientConnectionAsync.request()`.
303+
304+ Attributes:
305+ connection (HttpClientConnectionAsync): This stream's connection.
306+
307+ completion_future (asyncio.Future): Future that will contain
308+ the response status code (int) when the request/response exchange
309+ completes. If the exchange fails to complete, the Future will
310+ contain an exception indicating why it failed.
311+
312+ Notes:
313+ All async method on a stream (await stream.next(), etc.) must be performed in the
314+ thread that owns the event loop used to create the stream
315+ """
316+
317+ def __init__ (self , connection : HttpClientConnectionAsync , request : HttpRequest ,
318+ loop : Optional [asyncio .AbstractEventLoop ] = None ) -> None :
319+ """Initialize an HTTP client stream.
320+
321+ Args:
322+ connection (HttpClientConnectionAsync): The connection to send the request on.
323+ request (HttpRequest): The HTTP request to send.
324+ loop (Optional[asyncio.AbstractEventLoop]): Event loop to use for async operations.
325+ If None, the current event loop is used.
326+ """
327+ super ()._init_common (connection , request , loop = loop )
328+
329+
330+ class Http2ClientStreamAsync (HttpClientStreamAsyncBase ):
315331 """HTTP/2 stream that sends a request and receives a response.
316332
317333 Create an Http2ClientStreamAsync with `Http2ClientConnectionAsync.request()`.
@@ -321,17 +337,31 @@ def __init__(self, connection: HttpClientConnectionAsync, request: HttpRequest,
321337 loop : Optional [asyncio .AbstractEventLoop ] = None ) -> None :
322338 super ()._init_common (connection , request , http2_manual_write = manual_write , loop = loop )
323339
324- async def write_data_async (self ,
325- data_stream : Union [InputStream , Any ],
326- end_stream : bool = False ) -> None :
340+ async def write_data (self ,
341+ data_stream : Union [InputStream , Any ],
342+ end_stream : bool = False ) -> None :
327343 """Write data to the stream asynchronously.
328344
329345 Args:
330- data_stream (Union[InputStream, Any]): Data to write.
346+ data_stream (AsyncIterator[bytes]): Async iterator that yields bytes to write.
347+ Can be None to write an empty body, which is useful to finalize a request
348+ with end_stream=True.
331349 end_stream (bool): Whether this is the last data to write.
332350
333351 Returns:
334352 None: When the write completes.
335353 """
336- future = self .write_data (data_stream , end_stream )
354+ future : Future = Future ()
355+ body_stream : InputStream = InputStream .wrap (data_stream , allow_none = True )
356+
357+ def on_write_complete (error_code : int ) -> None :
358+ if future .cancelled ():
359+ # the future was cancelled, so we don't need to set the result or exception
360+ return
361+ if error_code :
362+ future .set_exception (awscrt .exceptions .from_code (error_code ))
363+ else :
364+ future .set_result (None )
365+
366+ _awscrt .http2_client_stream_write_data (self , body_stream , end_stream , on_write_complete )
337367 await asyncio .wrap_future (future )
0 commit comments