Skip to content

Add asyncio.Executor matching concurrent.futures.Executor #129769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions Lib/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .coroutines import *
from .events import *
from .exceptions import *
from .executor import *
from .futures import *
from .graph import *
from .locks import *
Expand All @@ -27,6 +28,7 @@
coroutines.__all__ +
events.__all__ +
exceptions.__all__ +
executor.__all__ +
futures.__all__ +
graph.__all__ +
locks.__all__ +
Expand Down
226 changes: 226 additions & 0 deletions Lib/asyncio/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import time
from collections.abc import AsyncIterable, Awaitable, Iterable
from dataclasses import dataclass
from typing import Any, Protocol

from . import timeouts
from .exceptions import CancelledError
from .futures import Future
from .locks import Event
from .queues import Queue, QueueShutDown
from .tasks import FIRST_COMPLETED, Task, create_task, gather, wait

__all__ = (
"Executor",
)


class _WorkFunction[**P, R](Protocol):
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Awaitable[R]:
...


@dataclass(frozen=True, slots=True)
class _WorkItem[**P, R]:
fn: _WorkFunction[P, R]
args: tuple[Any, ...]
kwargs: dict[Any, Any]
future: Future[R]


async def _azip(*iterables: Iterable | AsyncIterable) -> AsyncIterable[tuple]:
def _as_async_iterable[T](
iterable: Iterable[T] | AsyncIterable[T],
) -> AsyncIterable[T]:
async def _to_async_iterable(
iterable: Iterable[T],
) -> AsyncIterable[T]:
for item in iterable:
yield item

if isinstance(iterable, AsyncIterable):
return iterable
return _to_async_iterable(iterable)

async_iterables = [_as_async_iterable(iterable) for iterable in iterables]
iterators = [aiter(async_iterable) for async_iterable in async_iterables]
while True:
try:
items = [await anext(iterator) for iterator in iterators]
yield tuple(items)
except StopAsyncIteration:
break


async def _consume_cancelled_future(future):
try:
await future
except CancelledError:
pass


class Executor[**P, R]:
_input_queue: Queue[_WorkItem[P, R]]
_workers: list[Task]
_feeders: set[Task]
_shutdown: bool = False

def __init__(self, max_workers: int) -> None:
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")

self._input_queue = Queue(max_workers)
self._workers = [
create_task(self._worker())
for _ in range(max_workers)
]
self._feeders = set()

async def submit(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about context vars?
Now, workers work with a context implicitly copied during the Executor creation.
How could I submit a function with the context that I have at the moment of submit() call?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I have no strong opinions about context vars, I do see they can be useful.

I suggest adding a context: Context | None = None parameter to submit() and map() that will be set to contextvars.copy_context() if None, and propagating the context in _WorkItem to be used inside a worker when executing fn.

self,
fn: _WorkFunction[P, R],
/,
*args: P.args,
**kwargs: P.kwargs,
) -> Future[R]:
if self._shutdown:
raise RuntimeError("Cannot schedule new tasks after shutdown")

future = Future()
work_item = _WorkItem(fn, args, kwargs, future)
await self._input_queue.put(work_item)

return future

async def map(
self,
fn: _WorkFunction[P, R],
*iterables: Iterable | AsyncIterable,
timeout: float | None = None,
) -> AsyncIterable[R]:
if self._shutdown:
raise RuntimeError("Cannot schedule new tasks after shutdown")

end_time = None if timeout is None else time.monotonic() + timeout

inputs_stream = _azip(*iterables)
submitted_tasks = Queue[Future[R]]()
tasks_in_flight_limit = len(self._workers) + self._input_queue.maxsize
resume_feeding = Event()

feeder_task = create_task(self._feeder(
inputs_stream,
fn,
submitted_tasks,
tasks_in_flight_limit,
resume_feeding,
))
self._feeders.add(feeder_task)
feeder_task.add_done_callback(self._feeders.remove)

try:
while True:
task = await submitted_tasks.get()

remaining_time = (
None if end_time is None else end_time - time.monotonic()
)
if remaining_time is not None and remaining_time <= 0:
raise TimeoutError()

async with timeouts.timeout(remaining_time):
result = await task
yield result
resume_feeding.set()
except QueueShutDown:
# The executor was shut down while map was running.
pass
finally:
feeder_task.cancel()
await _consume_cancelled_future(feeder_task)

finalization_tasks = []
while submitted_tasks.qsize() > 0:
task = submitted_tasks.get_nowait()
task.cancel()
finalization_tasks.append(task)
for task in finalization_tasks:
await _consume_cancelled_future(task)

async def shutdown(self, wait=True, *, cancel_futures=False) -> None:
if self._shutdown:
return
self._shutdown = True

if cancel_futures:
finalization_tasks = []
while not self._input_queue.empty():
work_item = self._input_queue.get_nowait()
work_item.future.cancel()
finalization_tasks.append(work_item.future)
for task in finalization_tasks:
await _consume_cancelled_future(task)

self._input_queue.shutdown()

if wait:
await gather(*self._workers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could a worker raise an exception? Should .shutdown() propagate the exception to a caller?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there is a bug in the implementation, a worker should never raise an exception.


async def _worker(self) -> None:
while True:
try:
work_item = await self._input_queue.get()
item_future = work_item.future

try:
if item_future.cancelled():
continue

task = create_task(work_item.fn(
*work_item.args,
**work_item.kwargs,
))
await wait([task, item_future], return_when=FIRST_COMPLETED)
if not item_future.cancelled():
item_future.set_result(task.result())
else:
task.cancel()
except BaseException as exception:
if not item_future.cancelled():
item_future.set_exception(exception)
finally:
self._input_queue.task_done()
except QueueShutDown: # The executor has been shut down.
break

async def _feeder[I](
self,
inputs_stream: AsyncIterable[I],
fn: _WorkFunction[P, R],
submitted_tasks: Queue[Future[R]],
tasks_in_flight_limit: int,
resume_feeding: Event,
) -> None:
try:
async for args in inputs_stream:
if self._shutdown:
break
future = await self.submit(fn, *args) # type: ignore
await submitted_tasks.put(future)

if submitted_tasks.qsize() >= tasks_in_flight_limit:
await resume_feeding.wait()
resume_feeding.clear()
except QueueShutDown:
# The executor was shut down while feeder waited to submit a
# task.
pass
finally:
submitted_tasks.shutdown()

async def __aenter__(self) -> "Executor":
return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
await self.shutdown(wait=True)
return False
Loading
Loading