|
13 | 13 | # limitations under the License.
|
14 | 14 |
|
15 | 15 | import os
|
| 16 | +from typing import Tuple |
16 | 17 |
|
| 18 | +# [START alloydb_sqlalchemy_connect_async_connector] |
17 | 19 | import asyncpg
|
18 | 20 | import pytest
|
19 | 21 | import sqlalchemy
|
20 |
| -from sqlalchemy.ext.asyncio import create_async_engine |
| 22 | +import sqlalchemy.ext.asyncio |
21 | 23 |
|
22 | 24 | from google.cloud.alloydb.connector import AsyncConnector
|
23 | 25 |
|
24 | 26 |
|
25 |
| -@pytest.mark.asyncio |
26 |
| -async def test_connection_with_asyncpg() -> None: |
27 |
| - async with AsyncConnector() as connector: |
28 |
| - |
29 |
| - async def getconn() -> asyncpg.Connection: |
30 |
| - conn: asyncpg.Connection = await connector.connect( |
31 |
| - os.environ["ALLOYDB_INSTANCE_URI"], |
32 |
| - "asyncpg", |
33 |
| - user=os.environ["ALLOYDB_USER"], |
34 |
| - password=os.environ["ALLOYDB_PASS"], |
35 |
| - db=os.environ["ALLOYDB_DB"], |
36 |
| - ) |
37 |
| - return conn |
| 27 | +async def create_sqlalchemy_engine( |
| 28 | + inst_uri: str, |
| 29 | + user: str, |
| 30 | + password: str, |
| 31 | + db: str, |
| 32 | +) -> Tuple[sqlalchemy.ext.asyncio.engine.AsyncEngine, AsyncConnector]: |
| 33 | + """Creates a connection pool for an AlloyDB instance and returns the pool |
| 34 | + and the connector. Callers are responsible for closing the pool and the |
| 35 | + connector. |
| 36 | +
|
| 37 | + A sample invocation looks like: |
| 38 | +
|
| 39 | + engine, connector = await create_sqlalchemy_engine( |
| 40 | + inst_uri, |
| 41 | + user, |
| 42 | + password, |
| 43 | + db, |
| 44 | + ) |
| 45 | + async with engine.connect() as conn: |
| 46 | + time = await conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() |
| 47 | + curr_time = time[0] |
| 48 | + # do something with query result |
| 49 | + await connector.close() |
| 50 | +
|
| 51 | + Args: |
| 52 | + instance_uri (str): |
| 53 | + The instance URI specifies the instance relative to the project, |
| 54 | + region, and cluster. For example: |
| 55 | + "projects/my-project/locations/us-central1/clusters/my-cluster/instances/my-instance" |
| 56 | + user (str): |
| 57 | + The database user name, e.g., postgres |
| 58 | + password (str): |
| 59 | + The database user's password, e.g., secret-password |
| 60 | + db_name (str): |
| 61 | + The name of the database, e.g., mydb |
| 62 | + """ |
| 63 | + connector = AsyncConnector() |
| 64 | + |
| 65 | + async def getconn() -> asyncpg.Connection: |
| 66 | + conn: asyncpg.Connection = await connector.connect( |
| 67 | + inst_uri, |
| 68 | + "asyncpg", |
| 69 | + user=user, |
| 70 | + password=password, |
| 71 | + db=db, |
| 72 | + ) |
| 73 | + return conn |
38 | 74 |
|
39 | 75 | # create SQLAlchemy connection pool
|
40 |
| - pool = create_async_engine( |
| 76 | + engine = sqlalchemy.ext.asyncio.create_async_engine( |
41 | 77 | "postgresql+asyncpg://",
|
42 | 78 | async_creator=getconn,
|
43 | 79 | execution_options={"isolation_level": "AUTOCOMMIT"},
|
44 | 80 | )
|
| 81 | + return engine, connector |
| 82 | + |
| 83 | + |
| 84 | +# [END alloydb_sqlalchemy_connect_async_connector] |
| 85 | + |
| 86 | + |
| 87 | +@pytest.mark.asyncio |
| 88 | +async def test_connection_with_asyncpg() -> None: |
| 89 | + """Basic test to get time from database.""" |
| 90 | + inst_uri = os.environ["ALLOYDB_INSTANCE_URI"] |
| 91 | + user = os.environ["ALLOYDB_USER"] |
| 92 | + password = os.environ["ALLOYDB_PASS"] |
| 93 | + db = os.environ["ALLOYDB_DB"] |
| 94 | + |
| 95 | + pool, connector = await create_sqlalchemy_engine(inst_uri, user, password, db) |
| 96 | + |
45 | 97 | async with pool.connect() as conn:
|
46 | 98 | res = (await conn.execute(sqlalchemy.text("SELECT 1"))).fetchone()
|
47 | 99 | assert res[0] == 1
|
| 100 | + |
| 101 | + await connector.close() |
0 commit comments