Skip to content

Support multiple event loops with a given Connector #1107

Open
@colonelpanic8

Description

Bug Description

This can occur when sqlalchemy attempts to create a new connection in the context of event loop that is not the event loop that cloud-sql-python-connector is using.

The connector has a member _client:

the code is careful to make sure to instantiate this client in the correct async context (in the running event loop). The reason this has to be done lazily is really just because the init of CloudSQLClient initializees an aiohttp.ClientSession:

ClientSession must be instantiated on the even loop it will be used on because of this:

https://github.com/aio-libs/aiohttp/blob/f662958b150a9d8d92fcbd0c9235e6bee1bedd67/aiohttp/client.py#L257

now, whenever a request is made with this client session, a TimeoutHandle is created, parameterized by self._loop:

https://github.com/aio-libs/aiohttp/blob/f662958b150a9d8d92fcbd0c9235e6bee1bedd67/aiohttp/client.py#L443

This propagates to a TimerContext here:

https://github.com/aio-libs/aiohttp/blob/f662958b150a9d8d92fcbd0c9235e6bee1bedd67/aiohttp/helpers.py#L651

which we enter here:

https://github.com/aio-libs/aiohttp/blob/f662958b150a9d8d92fcbd0c9235e6bee1bedd67/aiohttp/client.py#L474

now when we enter this timer context we try to find the current task:

https://github.com/aio-libs/aiohttp/blob/f662958b150a9d8d92fcbd0c9235e6bee1bedd67/aiohttp/helpers.py#L697 on the right loop, which if we have executed on a different loop, will raise an exception.

One way to fix this would be to make sure that we are running on the right event loop in _perform_refresh:

async def _perform_refresh(self) -> ConnectionInfo:

alternatively, we could try to schedule it properly here:

scheduled_task = asyncio.create_task(_refresh_task(self, delay))

You could also try to make the argument that connectors should be 1:1 with event loops that they are being used on, but at the very least, I think we could add some type of check that we are on the correct event loop in _perform_refresh, especially given that things can actually work perfectly well and only fail later if multiple event loops are used.

Furthermore, the connector class currently handles building a separate event loop if you do not provide one on which it should execute, which seems to strongly suggest that there is no expectation that you have to inject the loop.

I'm happy to implement a fix and I have several ideas about how to do so.

Example code (or command)

No response

Stacktrace

Here is an example stacktrace:

File "/nix/store/ixbvdm0yi27i0wbbpa8fpz854rsdyy5r-python3.11-railbird-0.1.0/lib/python3.11/site-packages/railbird/util/sqlalchemy.py", line 97, in execute_async
result = await session.execute(query) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 455, in execute
result = await greenlet_spawn(
^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 190, in greenlet_spawn
result = context.throw(*sys.exc_info())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2308, in execute
return self._execute_internal(
^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2180, in _execute_internal
conn = self._connection_for_bind(bind)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2047, in _connection_for_bind
return trans._connection_for_bind(engine, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "", line 2, in _connection_for_bind
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/orm/state_changes.py", line 139, in _go
ret_value = fn(self, *arg, **kw)
^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1143, in _connection_for_bind
conn = bind.connect()
^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3268, in connect
return self._connection_cls(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 145, in init
self._dbapi_connection = engine.raw_connection()
^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3292, in raw_connection
return self.pool.connect()
^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 452, in connect
return _ConnectionFairy._checkout(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 1269, in _checkout
fairy = _ConnectionRecord.checkout(pool)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 716, in checkout
rec = pool._do_get()
^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 169, in _do_get
with util.safe_reraise():
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 146, in exit
raise exc_value.with_traceback(exc_tb)
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 167, in _do_get
return self._create_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 393, in _create_connection
return _ConnectionRecord(self)
^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 678, in init
self.__connect()
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 902, in __connect
with util.safe_reraise():
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 146, in exit
raise exc_value.with_traceback(exc_tb)
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 898, in __connect
self.dbapi_connection = connection = pool._invoke_creator(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 365, in
return lambda rec: creator_fn()
^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py", line 112, in creator
return sync_engine.dialect.dbapi.connect( # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 936, in connect
await_only(creator_fn(*arg, **kw)),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 125, in await_only
return current.driver.switch(awaitable) # type: ignore[no-any-return]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/v07b0mhrd591yd77i92zc6d8lh5dalh1-python3.11-sqlalchemy-2.0.23/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 185, in greenlet_spawn
value = await result
^^^^^^^^^^^^
File "/nix/store/ixbvdm0yi27i0wbbpa8fpz854rsdyy5r-python3.11-railbird-0.1.0/lib/python3.11/site-packages/railbird/util/gcp.py", line 144, in getconn_async
return await connector.connect_async(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/8gjdx398fq4ss9a74ydppacmdzspnybl-python3.11-cloud-sql-python-connector-1.9.0/lib/python3.11/site-packages/google/cloud/sql/connector/refresh_utils.py", line 66, in _is_valid
metadata = await task
^^^^^^^^^^
RuntimeError: Timeout context manager should be used inside a task


### Steps to reproduce?

Its a bit hard to reproduce the issue because you need to trigger the need to connect in sqlalchemy.

### Environment

1. OS type and version:
2. Python version:
3. Cloud SQL Python Connector version:

None of these are particularly relevant. I fully understand the cause of the issue.

### Additional Details

_No response_

Metadata

Labels

priority: p3Desirable enhancement or fix. May not be included in next release.type: feature request‘Nice-to-have’ improvement, new feature or different behavior or design.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions