Skip to content

Commit 1f1f759

Browse files
samtxwbarnhasam89173
authored
Use loop.create_task() for agent tasks (#598)
* Update agent.py to call loop.create_task rather than asyncio.Task Credit goes to @samtx for finding this solution for #175 (comment) * Update agent.py Co-authored-by: Sam Friedman <[email protected]> * Fix linting in agent.py * Update unit test to verify slurp is awaited * Update unit test to check awaitable task creation * Linting and formatting --------- Co-authored-by: William Barnhart <[email protected]> Co-authored-by: Sam Friedman <[email protected]>
1 parent 6588a97 commit 1f1f759

File tree

2 files changed

+35
-30
lines changed

2 files changed

+35
-30
lines changed

faust/agents/agent.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,9 @@ async def _prepare_actor(self, aref: ActorRefT, beacon: NodeT) -> ActorRefT:
661661
else:
662662
# agent yields and is an AsyncIterator so we have to consume it.
663663
coro = self._slurp(aref, aiter(aref))
664-
task = asyncio.Task(self._execute_actor(coro, aref), loop=self.loop)
664+
# Calling asyncio.Task is not proper usage of asyncio,
665+
# we need to create the task directly from the loop
666+
task = self.loop.create_task(self._execute_actor(coro, aref))
665667
task._beacon = beacon # type: ignore
666668
aref.actor_task = task
667669
self._actors.add(aref)

tests/unit/agents/test_agent.py

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -392,22 +392,23 @@ async def test_start_task(self, *, agent):
392392
assert ret is agent._prepare_actor.return_value
393393

394394
@pytest.mark.asyncio
395-
async def test_prepare_actor__AsyncIterable(self, *, agent):
395+
async def test_prepare_actor__AsyncIterable(self, *, agent, monkeypatch):
396+
async def mock_execute_actor(coro, aref):
397+
await coro
398+
399+
mock_beacon = Mock(name="beacon", autospec=Node)
400+
mock_slurp = AsyncMock(name="slurp")
401+
monkeypatch.setattr(agent, "_slurp", mock_slurp)
402+
monkeypatch.setattr(agent, "_execute_actor", mock_execute_actor)
396403
aref = agent(index=0, active_partitions=None)
397-
with patch("asyncio.Task") as Task:
398-
agent._slurp = Mock(name="_slurp")
399-
agent._execute_actor = Mock(name="_execute_actor")
400-
beacon = Mock(name="beacon", autospec=Node)
401-
ret = await agent._prepare_actor(aref, beacon)
402-
agent._slurp.assert_called()
403-
coro = agent._slurp()
404-
agent._execute_actor.assert_called_once_with(coro, aref)
405-
Task.assert_called_once_with(agent._execute_actor(), loop=agent.loop)
406-
task = Task()
407-
assert task._beacon is beacon
408-
assert aref.actor_task is task
409-
assert aref in agent._actors
410-
assert ret is aref
404+
ret = await agent._prepare_actor(aref, mock_beacon)
405+
task = aref.actor_task
406+
await task
407+
mock_slurp.assert_awaited()
408+
assert mock_slurp.await_args.args[0] is aref
409+
assert task._beacon is mock_beacon
410+
assert aref in agent._actors
411+
assert ret is aref
411412

412413
@pytest.mark.asyncio
413414
async def test_prepare_actor__Awaitable(self, *, agent2):
@@ -428,22 +429,24 @@ async def test_prepare_actor__Awaitable(self, *, agent2):
428429
assert ret is aref
429430

430431
@pytest.mark.asyncio
431-
async def test_prepare_actor__Awaitable_with_multiple_topics(self, *, agent2):
432+
async def test_prepare_actor__Awaitable_with_multiple_topics(
433+
self, *, agent2, monkeypatch
434+
):
432435
aref = agent2(index=0, active_partitions=None)
433-
asyncio.ensure_future(aref.it).cancel() # silence warning
434436
agent2.channel.topics = ["foo", "bar"]
435-
with patch("asyncio.Task") as Task:
436-
agent2._execute_actor = Mock(name="_execute_actor")
437-
beacon = Mock(name="beacon", autospec=Node)
438-
ret = await agent2._prepare_actor(aref, beacon)
439-
coro = aref
440-
agent2._execute_actor.assert_called_once_with(coro, aref)
441-
Task.assert_called_once_with(agent2._execute_actor(), loop=agent2.loop)
442-
task = Task()
443-
assert task._beacon is beacon
444-
assert aref.actor_task is task
445-
assert aref in agent2._actors
446-
assert ret is aref
437+
mock_beacon = Mock(name="beacon", autospec=Node)
438+
mock_slurp = AsyncMock(name="slurp")
439+
mock_execute_actor = AsyncMock(name="execute_actor")
440+
monkeypatch.setattr(agent2, "_slurp", mock_slurp)
441+
monkeypatch.setattr(agent2, "_execute_actor", mock_execute_actor)
442+
ret = await agent2._prepare_actor(aref, mock_beacon)
443+
task = aref.actor_task
444+
mock_slurp.assert_not_called()
445+
mock_slurp.assert_not_awaited()
446+
mock_execute_actor.assert_called_with(aref, aref)
447+
assert task._beacon is mock_beacon
448+
assert aref in agent2._actors
449+
assert ret is aref
447450

448451
@pytest.mark.asyncio
449452
async def test_prepare_actor__Awaitable_cannot_have_sinks(self, *, agent2):

0 commit comments

Comments
 (0)