Replies: 2 comments 4 replies
-
|
This is what I came up with. It works™️, but it reads private property import asyncio
import os
import typing as t
import anyio
from aiohttp import Payload, hdrs
from aiohttp.abc import AbstractStreamWriter
from aiohttp.payload import READ_SIZE, guess_filename, register_payload
if t.TYPE_CHECKING:
from typing import AsyncIterable, AsyncIterator
_AsyncIterator = AsyncIterator[bytes]
_AsyncIterable = AsyncIterable[bytes]
else:
from collections.abc import AsyncIterable, AsyncIterator
_AsyncIterator = AsyncIterator
_AsyncIterable = AsyncIterable
_CLOSE_FUTURES: t.Set[asyncio.Future[None]] = set()
# Implementation is mostly derived from IOBasePayload
class AnyIOAsyncFilePayload(Payload):
_value: anyio.AsyncFile
# _consumed = False (inherited) - File can be re-read from the same position
_start_position: t.Optional[int] = None
# _autoclose = False (inherited) - Has file handle that needs explicit closing
def __init__(
self, value: anyio.AsyncFile[t.Any], disposition: str = "attachment", *args: t.Any, **kwargs: t.Any
) -> None:
if "filename" not in kwargs:
kwargs["filename"] = guess_filename(value)
super().__init__(value, *args, **kwargs)
if self._filename is not None and disposition is not None:
if hdrs.CONTENT_DISPOSITION not in self.headers:
self.set_content_disposition(disposition, filename=self._filename)
async def _set_or_restore_start_position(self) -> None:
"""Set or restore the start position of the file-like object."""
if self._start_position is None:
try:
self._start_position = await self._value.tell()
except (OSError, AttributeError):
self._consumed = True # Cannot seek, mark as consumed
return
try:
await self._value.seek(self._start_position)
except (OSError, AttributeError):
# Failed to seek back - mark as consumed since we've already read
self._consumed = True
async def _read_and_available_len(
self, remaining_content_len: t.Optional[int]
) -> tuple[t.Optional[int], bytes]:
"""
Read the file-like object and return both its total size and the first chunk.
Args:
remaining_content_len: Optional limit on how many bytes to read in this operation.
If None, READ_SIZE will be used as the default chunk size.
Returns:
A tuple containing:
- The total size of the remaining unread content (None if size cannot be determined)
- The first chunk of bytes read from the file object
This method is optimized to perform both size calculation and initial read
in a single operation, which is executed in a single executor job to minimize
context switches and file operations when streaming content.
"""
await self._set_or_restore_start_position()
size = self.size # Call size only once since it does I/O
return size, await self._value.read(
min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE)
)
async def _read(self, remaining_content_len: t.Optional[int]) -> bytes:
"""
Read a chunk of data from the file-like object.
Args:
remaining_content_len: Optional maximum number of bytes to read.
If None, READ_SIZE will be used as the default chunk size.
Returns:
A chunk of bytes read from the file object, respecting the
remaining_content_len limit if specified.
This method is used for subsequent reads during streaming after
the initial _read_and_available_len call has been made.
"""
return await self._value.read(remaining_content_len or READ_SIZE) # type: ignore[no-any-return]
@property
def size(self) -> t.Optional[int]:
"""
Size of the payload in bytes.
Returns the total size of the payload content from the initial position.
This ensures consistent Content-Length for requests, including 307/308 redirects
where the same payload instance is reused.
Returns None if the size cannot be determined (e.g., for unseekable streams).
"""
try:
# Store the start position on first access.
# This is critical when the same payload instance is reused (e.g., 307/308
# redirects). Without storing the initial position, after the payload is
# read once, the file position would be at EOF, which would cause the
# size calculation to return 0 (file_size - EOF position).
# By storing the start position, we ensure the size calculation always
# returns the correct total size for any subsequent use.
if self._start_position is None:
self._start_position = self._value._fp.tell()
# Return the total size from the start position
# This ensures Content-Length is correct even after reading
fileno: int = self._value.fileno() # ty:ignore[call-non-callable]
return os.fstat(fileno).st_size - self._start_position
except (AttributeError, OSError):
return None
async def write(self, writer: AbstractStreamWriter) -> None:
"""
Write the entire file-like payload to the writer stream.
Args:
writer: An AbstractStreamWriter instance that handles the actual writing
This method writes the entire file content without any length constraint.
It delegates to write_with_length() with no length limit for implementation
consistency.
Note:
For new implementations that need length control, use write_with_length() directly.
This method is maintained for backwards compatibility with existing code.
"""
await self.write_with_length(writer, None)
async def write_with_length(
self, writer: AbstractStreamWriter, content_length: t.Optional[int]
) -> None:
"""
Write file-like payload with a specific content length constraint.
Args:
writer: An AbstractStreamWriter instance that handles the actual writing
content_length: Maximum number of bytes to write (None for unlimited)
This method implements optimized streaming of file content with length constraints:
1. File reading is performed in a thread pool to avoid blocking the event loop
2. Content is read and written in chunks to maintain memory efficiency
3. Writing stops when either:
- All available file content has been written (when size is known)
- The specified content_length has been reached
4. File resources are properly closed even if the operation is cancelled
The implementation carefully handles both known-size and unknown-size payloads,
as well as constrained and unconstrained content lengths.
"""
total_written_len = 0
remaining_content_len = content_length
# Get initial data and available length
available_len, chunk = await self._read_and_available_len(remaining_content_len)
# Process data chunks until done
while chunk:
chunk_len = len(chunk)
# Write data with or without length constraint
if remaining_content_len is None:
await writer.write(chunk)
else:
await writer.write(chunk[:remaining_content_len])
remaining_content_len -= chunk_len
total_written_len += chunk_len
# Check if we're done writing
if self._should_stop_writing(
available_len, total_written_len, remaining_content_len
):
return
# Read next chunk
chunk = await self._read(
min(READ_SIZE, remaining_content_len)
if remaining_content_len is not None
else READ_SIZE
)
def _should_stop_writing(
self,
available_len: t.Optional[int],
total_written_len: int,
remaining_content_len: t.Optional[int],
) -> bool:
"""
Determine if we should stop writing data.
Args:
available_len: Known size of the payload if available (None if unknown)
total_written_len: Number of bytes already written
remaining_content_len: Remaining bytes to be written for content-length limited responses
Returns:
True if we should stop writing data, based on either:
- Having written all available data (when size is known)
- Having written all requested content (when content-length is specified)
"""
return (available_len is not None and total_written_len >= available_len) or (
remaining_content_len is not None and remaining_content_len <= 0
)
def _close(self) -> None:
"""
Async safe synchronous close operations for backwards compatibility.
This method exists only for backwards
compatibility. Use the async close() method instead.
WARNING: This method MUST be called from within an event loop.
Calling it outside an event loop will raise RuntimeError.
"""
# Skip if already consumed
if self._consumed:
return
self._consumed = True # Mark as consumed to prevent further writes
# Schedule file closing without awaiting to prevent cancellation issues
loop = asyncio.get_running_loop()
close_future = loop.run_in_executor(None, self._value.close) # ty:ignore[invalid-argument-type]
# Hold a strong reference to the future to prevent it from being
# garbage collected before it completes.
_CLOSE_FUTURES.add(close_future)
close_future.add_done_callback(_CLOSE_FUTURES.remove)
async def close(self) -> None:
"""
Close the payload if it holds any resources.
IMPORTANT: This method must not await anything that might not finish
immediately, as it may be called during cleanup/cancellation. Schedule
any long-running operations without awaiting them.
"""
self._close()
def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
"""
Return string representation of the value.
WARNING: This method does blocking I/O and should not be called in the event loop.
"""
raise TypeError("Unable to decode. This method has to be async.")
async def _read_all(self) -> bytes:
"""Read the entire file-like object and return its content as bytes."""
await self._set_or_restore_start_position()
# Use readlines() to ensure we get all content
return b"".join(await self._value.readlines())
async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
"""
Return bytes representation of the value.
This method reads the entire file content and returns it as bytes.
It is equivalent to reading the file-like object directly.
The file reading is performed in an executor to avoid blocking the event loop.
"""
return await self._read_all()
register_payload(AnyIOAsyncFilePayload, anyio.AsyncFile) |
Beta Was this translation helpful? Give feedback.
-
|
If using the file directly, I think the actual writes are non-blocking, because we already pass to the executor. We use blockbuster in our tests, so we get test failures anytime blocking code is introduced, so you can be pretty confident that our code will remain non-blocking in future. There's also an existing example with aiofiles at:
As you've already noticed, it is possible to register new payloads to the registry, thus it is extremely difficult to provide any type safety. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Aiohttp ClientSession supports uploading
Anytype of content. Which, from the typing point of view doesn't tell us much.However, I enabled Ruff linters flake8-async (ASYNC), and specifically blocking-open-call-in-async-function (ASYNC230) complained about the blocking
open()call in a async function. So I though "hmm, that sounds reasonable, let's check out the alternatives". I triedasync with aiofiles.openfirst, then rolled back, and triedasync with await anyio.open_file(as suggested by the lint) instead. Both resulted in a behavior change, thankfully caught by HTTP traffic snapshot tests:Content-Typebecameapplication/octet-streamfor all files, andContent-Lengthheader got replaced byTransfer-Encoding: chunked.Apparently, under the hood
aiohttphas special treatment for the built-in blockingopen()file objects, or rather for anything that inherits from / implementsio.IOBase, calling theiros.fstat(self._value.fileno()).st_sizeon a threadpool, in order to generateContent-Lengthheader in advance.This magic breaks with third-party libraries, whose objects do not inherit from
io.IOBase. Bothaiofiles.openandanyio.open_filetoaiohttplook like a genericAsyncIterablePayloadwith the only API surface__aiter__andanext, so it's not even trying:The questions here are:
run_in_executorbehind the scene?anyiomoved them to an executor thread as well.aiohttpwith those specific libraries, or at least withanyio? Like, register inPAYLOAD_REGISTRYa subclass ofAsyncIterablePayloadspecifically foranyio.AsyncFiletype. Where would such adapter belong to -- aiohttp itself, anyio itself, or an extra 3rd party library?Beta Was this translation helpful? Give feedback.
All reactions