66import uuid
77import warnings
88from collections import deque
9- from collections .abc import Iterator , Mapping , Sequence
9+ from collections .abc import AsyncIterator , Iterator , Mapping , Sequence
1010from types import TracebackType
1111from typing import TYPE_CHECKING , Any , Union , cast
1212from urllib .parse import parse_qsl , unquote , urlencode
@@ -313,7 +313,10 @@ async def read(self, *, decode: bool = False) -> bytes:
313313 while not self ._at_eof :
314314 data .extend (await self .read_chunk (self .chunk_size ))
315315 if decode :
316- return await self .decode_async (data )
316+ decoded_data = bytearray ()
317+ async for d in self .decode_iter (data ):
318+ decoded_data .extend (d )
319+ return decoded_data
317320 return data
318321
319322 async def read_chunk (self , size : int = chunk_size ) -> bytes :
@@ -508,16 +511,16 @@ def decode(self, data: bytes) -> bytes:
508511 Decodes data according the specified Content-Encoding
509512 or Content-Transfer-Encoding headers value.
510513
511- Note: For large payloads, consider using decode_async () instead
514+ Note: For large payloads, consider using decode_iter () instead
512515 to avoid blocking the event loop during decompression.
513516 """
514517 data = self ._apply_content_transfer_decoding (data )
515518 if self ._needs_content_decoding ():
516519 return self ._decode_content (data )
517520 return data
518521
519- async def decode_async (self , data : bytes ) -> bytes :
520- """Decodes data asynchronously .
522+ async def decode_iter (self , data : bytes ) -> AsyncIterator [ bytes ] :
523+ """Async generator that yields decoded data chunks .
521524
522525 Decodes data according the specified Content-Encoding
523526 or Content-Transfer-Encoding headers value.
@@ -527,8 +530,10 @@ async def decode_async(self, data: bytes) -> bytes:
527530 """
528531 data = self ._apply_content_transfer_decoding (data )
529532 if self ._needs_content_decoding ():
530- return await self ._decode_content_async (data )
531- return data
533+ async for d in self ._decode_content_async (data ):
534+ yield d
535+ else :
536+ yield data
532537
533538 def _decode_content (self , data : bytes ) -> bytes :
534539 encoding = self .headers .get (CONTENT_ENCODING , "" ).lower ()
@@ -542,17 +547,18 @@ def _decode_content(self, data: bytes) -> bytes:
542547
543548 raise RuntimeError (f"unknown content encoding: { encoding } " )
544549
545- async def _decode_content_async (self , data : bytes ) -> bytes :
550+ async def _decode_content_async (self , data : bytes ) -> AsyncIterator [ bytes ] :
546551 encoding = self .headers .get (CONTENT_ENCODING , "" ).lower ()
547552 if encoding == "identity" :
548- return data
549- if encoding in {"deflate" , "gzip" }:
550- return await ZLibDecompressor (
553+ yield data
554+ elif encoding in {"deflate" , "gzip" }:
555+ d = ZLibDecompressor (
551556 encoding = encoding ,
552557 suppress_deflate_header = True ,
553- ).decompress (data , max_length = self ._max_decompress_size )
554-
555- raise RuntimeError (f"unknown content encoding: { encoding } " )
558+ )
559+ yield await d .decompress (data , max_length = self ._max_decompress_size )
560+ else :
561+ raise RuntimeError (f"unknown content encoding: { encoding } " )
556562
557563 def _decode_content_transfer (self , data : bytes ) -> bytes :
558564 encoding = self .headers .get (CONTENT_TRANSFER_ENCODING , "" ).lower ()
@@ -623,10 +629,9 @@ async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> byt
623629
624630 async def write (self , writer : AbstractStreamWriter ) -> None :
625631 field = self ._value
626- chunk = await field .read_chunk (size = 2 ** 16 )
627- while chunk :
628- await writer .write (await field .decode_async (chunk ))
629- chunk = await field .read_chunk (size = 2 ** 16 )
632+ while chunk := await field .read_chunk (size = 2 ** 18 ):
633+ async for d in field .decode_iter (chunk ):
634+ await writer .write (d )
630635
631636
632637class MultipartReader :
0 commit comments