2222from io import BytesIO
2323from concurrent .futures import Future
2424from typing import List , Tuple , Optional , Callable , AsyncIterator
25+ import threading
2526
2627
2728class AIOHttpClientConnectionUnified (HttpClientConnectionBase ):
@@ -328,7 +329,8 @@ class AIOHttpClientStreamUnified(HttpClientStreamBase):
328329 '_completion_future' ,
329330 '_stream_completed' ,
330331 '_status_code' ,
331- '_loop' )
332+ '_loop' ,
333+ '_deque_lock' )
332334
333335 def __init__ (self ,
334336 connection : AIOHttpClientConnection ,
@@ -347,8 +349,8 @@ def __init__(self,
347349 raise TypeError ("loop must be an instance of asyncio.AbstractEventLoop" )
348350 self ._loop = loop
349351
350- # deque is thread-safe for appending and popping, so that we don't need
351- # locks to handle the callbacks from the C thread
352+ # Lock to protect check-then-act sequences on deques for thread safety in free-threaded Python
353+ self . _deque_lock = threading . Lock ()
352354 self ._chunk_futures = deque ()
353355 self ._received_chunks = deque ()
354356 self ._stream_completed = False
@@ -373,12 +375,16 @@ def _on_response(self, status_code: int, name_value_pairs: List[Tuple[str, str]]
373375 self ._response_headers_future .set_result (name_value_pairs )
374376
375377 def _on_body (self , chunk : bytes ) -> None :
376- """Process body chunk on the correct event loop thread."""
377- if self ._chunk_futures :
378- future = self ._chunk_futures .popleft ()
379- future .set_result (chunk )
380- else :
381- self ._received_chunks .append (chunk )
378+ """Process body chunk - called from C thread."""
379+ with self ._deque_lock :
380+ if self ._chunk_futures :
381+ future = self ._chunk_futures .popleft ()
382+ else :
383+ self ._received_chunks .append (chunk )
384+ return
385+
386+ # Set result outside lock (Future is thread-safe)
387+ future .set_result (chunk )
382388
383389 def _on_complete (self , error_code : int ) -> None :
384390 """Set the completion status of the stream."""
@@ -387,10 +393,14 @@ def _on_complete(self, error_code: int) -> None:
387393 else :
388394 self ._completion_future .set_exception (awscrt .exceptions .from_code (error_code ))
389395
390- # Resolve all pending chunk futures with an empty string to indicate end of stream
391- while self ._chunk_futures :
392- future = self ._chunk_futures .popleft ()
393- future .set_result ("" )
396+ # Resolve all pending chunk futures with lock protection
397+ with self ._deque_lock :
398+ pending_futures = list (self ._chunk_futures )
399+ self ._chunk_futures .clear ()
400+
401+ # Set results outside lock (Future is thread-safe)
402+ for future in pending_futures :
403+ future .set_result (b"" )
394404
395405 async def _set_request_body_generator (self , body_iterator : AsyncIterator [bytes ]):
396406 ...
@@ -418,14 +428,17 @@ async def get_next_response_chunk(self) -> bytes:
418428 bytes: The next chunk of data from the response body.
419429 Returns empty bytes when the stream is completed and no more chunks are left.
420430 """
421- if self ._received_chunks :
422- return self ._received_chunks .popleft ()
423- elif self ._completion_future .done ():
424- return b""
425- else :
426- future = Future ()
427- self ._chunk_futures .append (future )
428- return await asyncio .wrap_future (future , loop = self ._loop )
431+ with self ._deque_lock :
432+ if self ._received_chunks :
433+ return self ._received_chunks .popleft ()
434+ elif self ._completion_future .done ():
435+ return b""
436+ else :
437+ future = Future ()
438+ self ._chunk_futures .append (future )
439+
440+ # Await outside lock
441+ return await asyncio .wrap_future (future , loop = self ._loop )
429442
430443 async def wait_for_completion (self ) -> int :
431444 """Wait asynchronously for the stream to complete.
0 commit comments