Skip to content

Add asyncio.Executor matching concurrent.futures.Executor #129769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

jurca
Copy link

@jurca jurca commented Feb 7, 2025

The Executor provides means for scheduling a executing asyncio tasks with capped parallelism. The asyncio Executor's API matches the concurrent.futures.Executor's API for consistency.

See also: https://discuss.python.org/t/add-task-pipeline-to-asyncio-with-capped-parallelism-and-lazy-input-reading/79292

I will add documentation once the API is agreed upon to prevent any differences between code and docs.

The Executor provides means for scheduling a executing asyncio tasks
with capped parallelism. The asyncio Executor's API matches the
concurrent.futures.Executor's API for consistency.

See also: https://discuss.python.org/t/add-task-pipeline-to-asyncio-with-capped-parallelism-and-lazy-input-reading/79292/2
@ghost
Copy link

ghost commented Feb 7, 2025

All commit authors signed the Contributor License Agreement.
CLA signed

@bedevere-app
Copy link

bedevere-app bot commented Feb 7, 2025

Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply the skip news label instead.

@ZeroIntensity
Copy link
Member

Please make an issue for this. I suspect this needs to get discussed on DPO first.

Copy link
Contributor

@asvetlov asvetlov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made a brief review concentrating on await executor.submit(...) method.
map() is another more complex beast; let's carefully work on it after basic submit() / shutdown() functionality is done.

Comment on lines 17 to 19
class _WorkFunction[R](Protocol):
def __call__(self, *args, **kwargs) -> R | Awaitable[R]:
...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class _WorkFunction[R](Protocol):
def __call__(self, *args, **kwargs) -> R | Awaitable[R]:
...
class _WorkFunction[R, **P](Protocol):
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R | Awaitable[R]:
...

If the module has type hints -- let's use it constantly everywhere


async def _run_work_item[R](work_item: _WorkItem[R]) -> R:
result = work_item.fn(*work_item.args, **work_item.kwargs)
if isawaitable(result):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I disagree with accepting two function colors.
In my mind, async executor should work with async functions only.
Processing sync functions without blocking calls inside don't make sense; they could be called without an executor.
Handling sync functions with blocking calls requires a to_thread() wrapper; it is not a subject of the proposed class.

fn: _WorkFunction[R],
*iterables: Iterable | AsyncIterable,
timeout: Optional[float] = None,
chunksize: int = 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This argument is never used; please drop it.
asyncio.Executor is not 100% compatible with concurrent.futures.Executor anyway.

tasks_in_flight_limit = len(self._workers) + self._input_queue.maxsize
resume_feeding = Event()

async def _feeder() -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the function could be extracted into a method.

)
if remaining_time is not None and remaining_time <= 0:
raise TimeoutError()
result = await wait_for(task, remaining_time)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please replace wait_for() with async with timeout().

continue

try:
task = create_task(_run_work_item(work_item))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if the worker should create a task per processed item.
It looks like an overhead now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, however, I think create_task is needed if I want to race fn against possible cancellation of the work item, and cancel fn's execution if that happens.

If I were to remove create_task and just await fn(...), it will prevent immediate cancellation of long-running work items, which could be undesirable for the users, e.g. if it would occur while leaving an Executor's context due to a raised exception.

Do you know of a way to achieve this behavior without using create_task?

Comment on lines 214 to 216
for _ in self._workers:
await self._input_queue.put(None)
self._input_queue.shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these three lines be reduced to bare self._input_queue.shutdown()?

self._input_queue.shutdown()

if wait:
await gather(*self._workers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could a worker raise an exception? Should .shutdown() propagate the exception to a caller?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there is a bug in the implementation, a worker should never raise an exception.

return result


async def _worker[R](input_queue: Queue[Optional[_WorkItem[R]]]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make the function a private method of Executor.

Comment on lines 208 to 212
finalization_tasks.append(create_task(
_consume_cancelled_future(work_item.future),
))
if finalization_tasks:
await gather(*finalization_tasks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the code waits for all futures cancellations, there is no need to create a task per future.
for loop with waiting futures one by one could be enough; it doesn't wait the last future longer than the current code.

@bedevere-app
Copy link

bedevere-app bot commented Feb 9, 2025

A Python core developer has requested some changes be made to your pull request before we can consider merging it. If you could please address their requests along with any other requests in other reviews from core developers that would be appreciated.

Once you have made the requested changes, please leave a comment on this pull request containing the phrase I have made the requested changes; please review again. I will then notify any core developers who have left a review that you're ready for them to take another look at this pull request.

@asvetlov asvetlov marked this pull request as draft February 9, 2025 08:09
jurca added 13 commits February 11, 2025 10:54
Processing sync functions without blocking calls inside don't make
sense; they could be called without an executor.

Handling sync functions with blocking calls requires a to_thread()
wrapper; it is not a subject of the Executor class.
The goal is not to match concurrent.futures.Executor's API 100%, and the
parameter does not really make sense in asyncio context.
As Queue.shutdown() was already used, sending a terminator None value is
essentially superfluous.
Using `gather` creates a task per item, introducing both performance and
memory overhead.

On the other hand, this removes the automagic concurrency if the calling
code provides multiple async iterables. This is deemed to be outside of
the Executor's responsibility and can be handled at the caller.
@bedevere-app
Copy link

bedevere-app bot commented Feb 11, 2025

Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply the skip news label instead.

@jurca
Copy link
Author

jurca commented Feb 11, 2025

Thank you for your feedback, I've tried my best to fix the code!

I have made the requested changes; please review again.

@asvetlov Just to be 100% sure, given where we currently are in this process, would you like me to create a Github Issue, cross-link it with the DPO conversation, and reference it here and in all commit messages? I'm asking so that I don't create one if you think it's unnecessary in this case.

@bedevere-app
Copy link

bedevere-app bot commented Feb 11, 2025

Thanks for making the requested changes!

@asvetlov: please review the changes made to this pull request.

@bedevere-app bedevere-app bot requested a review from asvetlov February 11, 2025 14:01
@asvetlov
Copy link
Contributor

@jurca I think we don't need an issue, this PR also should be closed soon.

@kumaraditya303 asked to make a third-party library first for battle testing.

It means that Executor implementation from that library could be merged into CPython after some time, and not in 3.14 definitely.

I created https://github.com/asvetlov/aiolibs-executor and invited you as a collaborator.

The library's code borrows some lines from this PR, but other parts are implemented differently.

After getting 100% test coverage, I'll transfer the repo into aio-libs github organization and release it on PyPI.

I plan to use it for my job's code and collect feedback on its usage. aiolibs has aiojobs project already (also written by me), but I believe an executor-like API is better.

If you want to join the project -- you are welcome!

We can return to the discussion about adding executors to stdlib later.
I still believe it is a good feature; it is easy enough to support it but hard enough to write it from scratch, especially for a casual user. I use aiojobs in many projects; the demand for such functionality is high enough.

@jurca
Copy link
Author

jurca commented Feb 11, 2025

@asvetlov Perfectly understood! I understand @kumaraditya303 's position on this, sometimes it's better to be cautious.

I'll be happy to help with aiolibs-executor, I've already scanned over the code and should be able to do some work later this week. In the meantime, feel free to take any part of this code and integrate it into the library. If you'd like me to tackle anything specific, feel free to assign me an issue or send me a message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants