Closed as not planned
Description
I'm extremely new to asyncio, so my apologies if this is obvious. If I've made an error, I'd love an explanation.
I was receiving the error Task was destroyed but it is pending!
, even though all my tests were passing. My tests were basically (1) create web service (2) test API functionality using aiohttp
, for example:
@pytest.mark.asyncio
async def test_server_running(event_loop, unused_tcp_port):
port = unused_tcp_port
make_service(port) # Creates a Tornado Application
async with aiohttp.ClientSession() as session:
async with session.get(f"http://localhost:{port}") as response:
result = await response.text()
assert result is not None
# await tear_down(event_loop) # <-- see below
I was able to resolve this with a tear down function (inspired by the method AsyncTestCase.tearDown()
.
async def tear_down(event_loop):
# Collect all tasks and cancel those that are not 'done'.
tasks = asyncio.all_tasks(event_loop)
tasks = [t for t in tasks if not t.done()]
for task in tasks:
task.cancel()
# Wait for all tasks to complete, ignoring any CancelledErrors
try:
await asyncio.wait(tasks)
except asyncio.exceptions.CancelledError:
pass
Is this an appropriate solution to the Task was destroyed but it is pending!
error? Did I miss something obvious? Is there a better way?
Thanks.