Description
The goal with async programming should be to run things in parallel where possible. However the AsyncConsumer consumes messages and runs tasks sequentially. The key is this code in utils.py.
while True:
# Wait for any of them to complete
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# Find the completed one(s), yield results, and replace them
for i, task in enumerate(tasks):
if task.done():
result = task.result()
await dispatch(result)
tasks[i] = asyncio.ensure_future(consumer_callables[i]())
So the loop is forced to wait for the handler to complete before getting the next message.
Using asyncio.create_task(dispatch(result))
instead of await dispatch(result)
here would ensure the tasks run in parallel (it's a little bit more complicated than that as it's needed to keep track of the tasks to report exceptions and avoid warnings). I have a subclass of the AsyncConsumer for my own app which runs tasks in parallel and results in a speedup. So I could submit a PR based on that.
A better solution would be to use the new asyncio.TaskGroup coming in Python 3.11. There seems to be a backport here:
https://pypi.org/project/taskgroup/
There are other libraries implementing something similar to TaskGroup, or a simpler version could be implemented for Channels consumers to use.
What do you think?
- What you expected to happen vs. what actually happened:
I expect that the async consumer can receive messages and process the resulting actions in parallel. Receive message, create task, receive message. Instead the consumer receives messages, creates a task, waits for the task to complete, then receives another message. - How you're running Channels (runserver? daphne/runworker? Nginx/Apache in front?):
Runworker
Activity