diff --git a/README.md b/README.md index a8b1e774..38ed5100 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ The subdirectories contain code examples for connecting and using Aurora DSQL in | JavaScript | [node-postgres (standalone)](javascript/node-postgres/) | | JavaScript | [Postgres.js](javascript/postgres-js/) | | Python | [Jupyter](python/jupyter) | +| Python | [asyncpg](python/asyncpg/) | | Python | [psycopg](python/psycopg/) | | Python | [psycopg2](python/psycopg2/) | | Python | [SQLAlchemy](python/sqlalchemy) | diff --git a/python/asyncpg/README.md b/python/asyncpg/README.md new file mode 100644 index 00000000..30dd6202 --- /dev/null +++ b/python/asyncpg/README.md @@ -0,0 +1,151 @@ +# Aurora DSQL with asyncpg + +This example demonstrates how to use the Aurora DSQL Python Connector with asyncpg to connect to Amazon Aurora DSQL clusters and perform basic database operations. + +Aurora DSQL is a distributed SQL database service that provides high availability and scalability for +your PostgreSQL-compatible applications. +Asyncpg is a popular PostgreSQL database library for Python that allows +you to interact with PostgreSQL databases using Python code. + +## About the code example + +The example demonstrates a flexible connection approach that works for both admin and non-admin users: + +* When connecting as an **admin user**, the example uses the `public` schema and generates an admin authentication + token. +* When connecting as a **non-admin user**, the example uses a custom `myschema` schema and generates a standard + authentication token. + +The code automatically detects the user type and adjusts its behavior accordingly. + +## ⚠️ Important + +* Running this code might result in charges to your AWS account. +* We recommend that you grant your code least privilege. At most, grant only the + minimum permissions required to perform the task. For more information, see + [Grant least privilege](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege). +* This code is not tested in every AWS Region. For more information, see + [AWS Regional Services](https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services). + + +## TLS connection configuration + +This example uses direct TLS connections where supported, and verifies the server certificate is trusted. Verified SSL +connections should be used where possible to ensure data security during transmission. + +* Driver versions following the release of PostgreSQL 17 support direct TLS connections, bypassing the traditional + PostgreSQL connection preamble +* Direct TLS connections provide improved connection performance and enhanced security +* Not all PostgreSQL drivers support direct TLS connections yet, or only in recent versions following PostgreSQL 17 +* Ensure your installed driver version supports direct TLS negotiation, or use a version that is at least as recent as + the one used in this sample +* If your driver doesn't support direct TLS connections, you may need to use the traditional preamble connection instead + + +### Prerequisites + +* You must have an AWS account, and have your default credentials and AWS Region + configured as described in the + [Globally configuring AWS SDKs and tools](https://docs.aws.amazon.com/credref/latest/refdocs/creds-config-files.html) + guide. +* [Python 3.10.0](https://www.python.org/) or later. +* You must have an Aurora DSQL cluster. For information about creating an Aurora DSQL cluster, see the + [Getting started with Aurora DSQL](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/getting-started.html) + guide. +* If connecting as a non-admin user, ensure the user is linked to an IAM role and is granted access to the `myschema` + schema. See the + [Using database roles with IAM roles](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/using-database-and-iam-roles.html) + guide. + +### Download the Amazon root certificate from the official trust store + +Download the Amazon root certificate from the official trust store: + +``` +wget https://www.amazontrust.com/repository/AmazonRootCA1.pem -O root.pem +``` + +### Set up environment for examples + +1. Create and activate a Python virtual environment: + +```bash +python3 -m venv .venv +source .venv/bin/activate # Linux, macOS +# or +.venv\Scripts\activate # Windows +``` + +2. Install the required packages for running the examples: + +```bash +pip install -r requirements.txt +``` + +### Run the code + +#### What the Examples Do + +The example demonstrates the following operations: + +- Opening a connection to an Aurora DSQL cluster +- Creating a table +- Inserting and querying data + +The example is designed to work with both admin and non-admin users: + +- When run as an admin user, it uses the `public` schema +- When run as a non-admin user, it uses the `myschema` schema + +**Note:** running the example will use actual resources in your AWS account and may incur charges. + +The connection pool examples demonstrate: +- Creating a connection pool for Aurora DSQL +- Using async context managers for connection management +- Performing database operations through the pool +- Running multiple concurrent database operations +- Using asyncio.gather() for parallel execution +- Proper resource management with connection pools + +#### Environment Cluster Details + +Set environment variables for your cluster details: + +```bash +# e.g. "admin" +export CLUSTER_USER="" + +# e.g. "foo0bar1baz2quux3quuux4.dsql.us-east-1.on.aws" +export CLUSTER_ENDPOINT="" + +# e.g. "us-east-1" +export REGION="" +``` + +#### Run the example: + +```bash +# Run example directly +python src/example.py + +# Run example using pytest +pytest ./test/test_example.py + +# Run all using pytest +pytest ./test +``` + +The examples contain comments explaining the code and the operations being performed. + +## Additional resources + +* [Amazon Aurora DSQL Documentation](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/what-is-aurora-dsql.html) +* [Amazon Aurora DSQL Python Connector Documentation](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/SECTION_program-with-dsql-connector-for-python.html) +* [Asyncpg Documentation](https://magicstack.github.io/asyncpg/current/) +* [AWS SDK for Python (Boto3) Documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) + +--- + +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/python/asyncpg/pyproject.toml b/python/asyncpg/pyproject.toml new file mode 100644 index 00000000..748eaf71 --- /dev/null +++ b/python/asyncpg/pyproject.toml @@ -0,0 +1,4 @@ +[tool.pytest.ini_options] +pythonpath = [ + "src" +] diff --git a/python/asyncpg/requirements.txt b/python/asyncpg/requirements.txt new file mode 100644 index 00000000..f92e0aa2 --- /dev/null +++ b/python/asyncpg/requirements.txt @@ -0,0 +1,5 @@ +aurora-dsql-python-connector>=0.2.0 +boto3>=1.35.74 +asyncpg>=0.30.0 +pytest>=8.0 +pytest-asyncio>=1.2.0 diff --git a/python/asyncpg/src/example.py b/python/asyncpg/src/example.py new file mode 100644 index 00000000..85e7bfff --- /dev/null +++ b/python/asyncpg/src/example.py @@ -0,0 +1,101 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import asyncio +import os + +import aurora_dsql_asyncpg as dsql + + +async def create_connection(cluster_user, cluster_endpoint, region): + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "database": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": 5432, + "region": region, + "ssl": "verify-full", + "sslrootcert": ssl_cert_path, + } + + # Make a connection to the cluster + conn = await dsql.connect(**conn_params) + + if cluster_user == "admin": + schema = "public" + else: + schema = "myschema" + + try: + await conn.execute(f"SET search_path = {schema};") + except Exception as e: + await conn.close() + raise e + + return conn + + +async def exercise_connection(conn): + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS owner( + id uuid NOT NULL DEFAULT gen_random_uuid(), + name varchar(30) NOT NULL, + city varchar(80) NOT NULL, + telephone varchar(20) DEFAULT NULL, + PRIMARY KEY (id)) + """ + ) + + # Insert some rows + await conn.execute( + "INSERT INTO owner(name, city, telephone) VALUES($1, $2, $3)", + "John Doe", + "Anytown", + "555-555-1999", + ) + + row = await conn.fetchrow("SELECT * FROM owner WHERE name=$1", "John Doe") + + # Verify the result we got is what we inserted before + assert row[0] is not None + assert row[1] == "John Doe" + assert row[2] == "Anytown" + assert row[3] == "555-555-1999" + + # Clean up the table after the example. If we run the example again + # we do not have to worry about data inserted by previous runs + await conn.execute("DELETE FROM owner WHERE name = $1", "John Doe") + + +async def main(): + conn = None + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + + conn = await create_connection(cluster_user, cluster_endpoint, region) + await exercise_connection(conn) + finally: + if conn is not None: + await conn.close() + + print("Connection exercised successfully") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/asyncpg/src/example_with_connection_pool.py b/python/asyncpg/src/example_with_connection_pool.py new file mode 100644 index 00000000..92f0da4c --- /dev/null +++ b/python/asyncpg/src/example_with_connection_pool.py @@ -0,0 +1,64 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import asyncio +import os + +import aurora_dsql_asyncpg as dsql + + +async def connect_with_pool(cluster_user, cluster_endpoint, region): + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + pool_params = { + "database": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": 5432, + "region": region, + "ssl": "verify-full", + "sslrootcert": ssl_cert_path, + "min_size": 2, + "max_size": 5, + } + + pool = await dsql.create_pool(**pool_params) + try: + async with pool.acquire() as conn: + result = await conn.fetchval("SELECT 1") + assert result == 1 + finally: + await pool.close() + + +async def main(): + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + await connect_with_pool(cluster_user, cluster_endpoint, region) + + finally: + pass + + print("Pool exercised successfully") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/asyncpg/src/example_with_connection_pool_concurrent.py b/python/asyncpg/src/example_with_connection_pool_concurrent.py new file mode 100644 index 00000000..d39d3859 --- /dev/null +++ b/python/asyncpg/src/example_with_connection_pool_concurrent.py @@ -0,0 +1,79 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import asyncio +import os + +import aurora_dsql_asyncpg as dsql + + +async def worker_task(pool, worker_id): + """Simulate concurrent database operations.""" + + async with pool.acquire() as conn: + result = await conn.fetchval("SELECT $1::int", worker_id) + return result + + +async def connect_with_pool_concurrent_connections( + cluster_user, cluster_endpoint, region +): + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + pool_params = { + "database": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": 5432, + "region": region, + "ssl": "verify-full", + "sslrootcert": ssl_cert_path, + "min_size": 5, + "max_size": 10, + } + + pool = None + try: + pool = await dsql.create_pool(**pool_params) + # Run multiple concurrent workers + num_workers = 5 + tasks = [worker_task(pool, i) for i in range(num_workers)] + results = await asyncio.gather(*tasks) + for result in results: + print(result) + finally: + if pool is not None: + await pool.close() + + +async def main(): + + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + + await connect_with_pool_concurrent_connections( + cluster_user, cluster_endpoint, region + ) + + finally: + pass + + print("Concurrent pool operations completed successfully") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/asyncpg/test/test_example.py b/python/asyncpg/test/test_example.py new file mode 100644 index 00000000..f4a9955b --- /dev/null +++ b/python/asyncpg/test/test_example.py @@ -0,0 +1,16 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example import main + + +# Smoke tests that our example works fine +@pytest.mark.asyncio +async def test_example(): + try: + await main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/asyncpg/test/test_example_with_pool.py b/python/asyncpg/test/test_example_with_pool.py new file mode 100644 index 00000000..89357976 --- /dev/null +++ b/python/asyncpg/test/test_example_with_pool.py @@ -0,0 +1,16 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_with_connection_pool import main + + +# Smoke tests that our example works fine +@pytest.mark.asyncio +async def test_example_with_pool(): + try: + await main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/asyncpg/test/test_example_with_pool_concurrent.py b/python/asyncpg/test/test_example_with_pool_concurrent.py new file mode 100644 index 00000000..912f0ab9 --- /dev/null +++ b/python/asyncpg/test/test_example_with_pool_concurrent.py @@ -0,0 +1,16 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_with_connection_pool_concurrent import main + + +# Smoke tests that our example works fine +@pytest.mark.asyncio +async def test_example_with_pool_concurrent(): + try: + await main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg/README.md b/python/psycopg/README.md index 53b4d969..8d6df46b 100644 --- a/python/psycopg/README.md +++ b/python/psycopg/README.md @@ -20,6 +20,13 @@ The example demonstrates a flexible connection approach that works for both admi The code automatically detects the user type and adjusts its behavior accordingly. +### Recommended vs. Alternative + +The default examples uses the preferred method leveraging the +[DSQL Python Connector](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/SECTION_program-with-dsql-connector-for-python.html), +but an [alternative example](src/alternatives/example_boto.py) that connects by creating boto3 client (not recommended) +is provided. + ## ⚠️ Important * Running this code might result in charges to your AWS account. @@ -50,7 +57,7 @@ connections should be used where possible to ensure data security during transmi configured as described in the [Globally configuring AWS SDKs and tools](https://docs.aws.amazon.com/credref/latest/refdocs/creds-config-files.html) guide. -* [Python 3.8.0](https://www.python.org/) or later. +* [Python 3.10.0](https://www.python.org/) or later. * You must have an Aurora DSQL cluster. For information about creating an Aurora DSQL cluster, see the [Getting started with Aurora DSQL](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/getting-started.html) guide. @@ -86,6 +93,8 @@ pip install -r requirements.txt ### Run the code +#### What the Examples Do + The example demonstrates the following operations: - Opening a connection to an Aurora DSQL cluster @@ -99,6 +108,15 @@ The example is designed to work with both admin and non-admin users: **Note:** running the example will use actual resources in your AWS account and may incur charges. +The extended examples demonstrate: +- Creating a connection pool for Aurora DSQL +- Using async context managers for connection management +- Performing database operations through the pool +- Running multiple concurrent database operations +- Proper resource management with connection pools + +#### Environment Cluster Details + Set environment variables for your cluster details: ```bash @@ -112,17 +130,25 @@ export CLUSTER_ENDPOINT="" export REGION="" ``` -Run the example: +#### Run the example: ```bash +# Run example directly python src/example.py + +# Run example using pytest +pytest ./test/test_example.py + +# Run all using pytest +pytest ./test ``` -The example contains comments explaining the code and the operations being performed. +The examples contain comments explaining the code and the operations being performed. ## Additional resources * [Amazon Aurora DSQL Documentation](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/what-is-aurora-dsql.html) +* [Amazon Aurora DSQL Python Connector Documentation](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/SECTION_program-with-dsql-connector-for-python.html) * [Psycopg Documentation](https://www.psycopg.org/psycopg3/docs/) * [AWS SDK for Python (Boto3) Documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) diff --git a/python/psycopg/requirements.txt b/python/psycopg/requirements.txt index 254d91bd..7776da25 100644 --- a/python/psycopg/requirements.txt +++ b/python/psycopg/requirements.txt @@ -1,3 +1,6 @@ +aurora-dsql-python-connector>=0.2.0 boto3>=1.35.74 -psycopg[binary]>=3 +psycopg[binary]>=3.1.0 +psycopg-pool>=3.2.6 pytest>=8 +pytest-asyncio>=1.2.0 diff --git a/python/psycopg/src/alternatives/example_boto.py b/python/psycopg/src/alternatives/example_boto.py new file mode 100644 index 00000000..ab4120e2 --- /dev/null +++ b/python/psycopg/src/alternatives/example_boto.py @@ -0,0 +1,120 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 + +--- + +boto3 client alternative to using the aurora-dsql-python-connector + +this method is NOT recommended, but should be used when custom drivers or infrastructure are +incompatible with the dsql connector +""" + +import boto3 +import psycopg +import os +import sys +from psycopg import pq + + +def create_connection(cluster_user, cluster_endpoint, region): + # Generate a fresh password token for each connection, to ensure the token is not expired + # when the connection is established + client = boto3.client("dsql", region_name=region) + + if cluster_user == "admin": + password_token = client.generate_db_connect_admin_auth_token(cluster_endpoint, region) + else: + password_token = client.generate_db_connect_auth_token(cluster_endpoint, region) + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "sslmode": "verify-full", + "sslrootcert": ssl_cert_path, + "password": password_token + } + + # Use the more efficient connection method if it's supported. + if pq.version() >= 170000: + conn_params["sslnegotiation"] = "direct" + + # Make a connection to the cluster + conn = psycopg.connect(**conn_params) + + if cluster_user == "admin": + schema = "public" + else: + schema = "myschema" + + try: + with conn.cursor() as cur: + cur.execute(f"SET search_path = {schema};") + conn.commit() + except Exception as e: + conn.close() + raise e + + return conn + + +def exercise_connection(conn): + conn.set_autocommit(True) + + cur = conn.cursor() + + cur.execute(""" + CREATE TABLE IF NOT EXISTS owner( + id uuid NOT NULL DEFAULT gen_random_uuid(), + name varchar(30) NOT NULL, + city varchar(80) NOT NULL, + telephone varchar(20) DEFAULT NULL, + PRIMARY KEY (id)) + """) + + # Insert some rows + cur.execute("INSERT INTO owner(name, city, telephone) VALUES('John Doe', 'Anytown', '555-555-1999')") + + cur.execute("SELECT * FROM owner WHERE name='John Doe'") + row = cur.fetchone() + + # Verify the result we got is what we inserted before + assert row[0] != None + assert row[1] == "John Doe" + assert row[2] == "Anytown" + assert row[3] == "555-555-1999" + + # Clean up the table after the example. If we run the example again + # we do not have to worry about data inserted by previous runs + cur.execute("DELETE FROM owner where name = 'John Doe'") + + +def main(): + conn = None + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert cluster_endpoint is not None, "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + + conn = create_connection(cluster_user, cluster_endpoint, region) + exercise_connection(conn) + finally: + if conn is not None: + conn.close() + + print("Connection exercised successfully") + + +if __name__ == "__main__": + main() diff --git a/python/psycopg/src/example.py b/python/psycopg/src/example.py index 51b65022..49979802 100644 --- a/python/psycopg/src/example.py +++ b/python/psycopg/src/example.py @@ -1,19 +1,14 @@ -import boto3 -import psycopg +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + import os -import sys from psycopg import pq +import aurora_dsql_psycopg as dsql def create_connection(cluster_user, cluster_endpoint, region): - # Generate a fresh password token for each connection, to ensure the token is not expired - # when the connection is established - client = boto3.client("dsql", region_name=region) - - if cluster_user == "admin": - password_token = client.generate_db_connect_admin_auth_token(cluster_endpoint, region) - else: - password_token = client.generate_db_connect_auth_token(cluster_endpoint, region) ssl_cert_path = "./root.pem" if not os.path.isfile(ssl_cert_path): @@ -24,9 +19,9 @@ def create_connection(cluster_user, cluster_endpoint, region): "user": cluster_user, "host": cluster_endpoint, "port": "5432", + "region": region, "sslmode": "verify-full", "sslrootcert": ssl_cert_path, - "password": password_token } # Use the more efficient connection method if it's supported. @@ -34,7 +29,7 @@ def create_connection(cluster_user, cluster_endpoint, region): conn_params["sslnegotiation"] = "direct" # Make a connection to the cluster - conn = psycopg.connect(**conn_params) + conn = dsql.connect(**conn_params) if cluster_user == "admin": schema = "public" @@ -57,23 +52,27 @@ def exercise_connection(conn): cur = conn.cursor() - cur.execute(""" + cur.execute( + """ CREATE TABLE IF NOT EXISTS owner( id uuid NOT NULL DEFAULT gen_random_uuid(), name varchar(30) NOT NULL, city varchar(80) NOT NULL, telephone varchar(20) DEFAULT NULL, PRIMARY KEY (id)) - """) + """ + ) # Insert some rows - cur.execute("INSERT INTO owner(name, city, telephone) VALUES('John Doe', 'Anytown', '555-555-1999')") + cur.execute( + "INSERT INTO owner(name, city, telephone) VALUES('John Doe', 'Anytown', '555-555-1999')" + ) cur.execute("SELECT * FROM owner WHERE name='John Doe'") row = cur.fetchone() # Verify the result we got is what we inserted before - assert row[0] != None + assert row[0] is not None assert row[1] == "John Doe" assert row[2] == "Anytown" assert row[3] == "555-555-1999" @@ -90,7 +89,9 @@ def main(): assert cluster_user is not None, "CLUSTER_USER environment variable is not set" cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) - assert cluster_endpoint is not None, "CLUSTER_ENDPOINT environment variable is not set" + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" region = os.environ.get("REGION", None) assert region is not None, "REGION environment variable is not set" diff --git a/python/psycopg/src/example_async.py b/python/psycopg/src/example_async.py new file mode 100644 index 00000000..e33e4480 --- /dev/null +++ b/python/psycopg/src/example_async.py @@ -0,0 +1,110 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import os +from psycopg import pq +import aurora_dsql_psycopg as dsql + + +async def create_connection(cluster_user, cluster_endpoint, region): + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "region": region, + "sslmode": "verify-full", + "sslrootcert": ssl_cert_path, + } + + # Use the more efficient connection method if it's supported. + if pq.version() >= 170000: + conn_params["sslnegotiation"] = "direct" + + # Make a connection to the cluster + conn = await dsql.DSQLAsyncConnection.connect(**conn_params) + + if cluster_user == "admin": + schema = "public" + else: + schema = "myschema" + + try: + async with conn.cursor() as cur: + await cur.execute(f"SET search_path = {schema};") + await conn.commit() + except Exception as e: + await conn.close() + raise e + + return conn + + +async def exercise_connection(conn): + await conn.set_autocommit(True) + + async with conn.cursor() as cur: + await cur.execute( + """ + CREATE TABLE IF NOT EXISTS owner( + id uuid NOT NULL DEFAULT gen_random_uuid(), + name varchar(30) NOT NULL, + city varchar(80) NOT NULL, + telephone varchar(20) DEFAULT NULL, + PRIMARY KEY (id)) + """ + ) + + # Insert some rows + await cur.execute( + "INSERT INTO owner(name, city, telephone) VALUES('John Doe', 'Anytown', '555-555-1999')" + ) + + await cur.execute("SELECT * FROM owner WHERE name='John Doe'") + row = await cur.fetchone() + + # Verify the result we got is what we inserted before + assert row[0] is not None + assert row[1] == "John Doe" + assert row[2] == "Anytown" + assert row[3] == "555-555-1999" + + # Clean up the table after the example. If we run the example again + # we do not have to worry about data inserted by previous runs + await cur.execute("DELETE FROM owner where name = 'John Doe'") + + +async def main(): + conn = None + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + + conn = await create_connection(cluster_user, cluster_endpoint, region) + await exercise_connection(conn) + finally: + if conn is not None: + await conn.close() + + print("Connection exercised successfully") + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/python/psycopg/src/example_with_connection_pool.py b/python/psycopg/src/example_with_connection_pool.py new file mode 100644 index 00000000..55a7f68e --- /dev/null +++ b/python/psycopg/src/example_with_connection_pool.py @@ -0,0 +1,68 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import os +from psycopg_pool import ConnectionPool as PsycopgPool +import aurora_dsql_psycopg as dsql + + +def connect_with_pool(cluster_user, cluster_endpoint, region): + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "region": region, + "sslmode": "verify-full", + "sslrootcert": ssl_cert_path, + } + + pool = PsycopgPool( + "", # Empty conninfo + connection_class=dsql.DSQLConnection, + kwargs=conn_params, # Pass params as kwargs + min_size=2, + max_size=8, + max_lifetime=3300, + ) + + # Use the pool as a context manager + with pool as p: + # Request a connection from the pool + with p.connection() as conn: + # Execute a query + with conn.cursor() as cur: + cur.execute("SELECT 1") + result = cur.fetchone() + print(f"Query result: {result}") + assert result[0] == 1 + + +def main(): + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + connect_with_pool(cluster_user, cluster_endpoint, region) + finally: + pass + + print("Connection pool exercised successfully") + + +if __name__ == "__main__": + main() diff --git a/python/psycopg/src/example_with_connection_pool_async.py b/python/psycopg/src/example_with_connection_pool_async.py new file mode 100644 index 00000000..2c47f9e3 --- /dev/null +++ b/python/psycopg/src/example_with_connection_pool_async.py @@ -0,0 +1,67 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import os +from psycopg_pool import AsyncConnectionPool as PsycopgPoolAsync +import aurora_dsql_psycopg as dsql + + +async def connect_with_pool(cluster_user, cluster_endpoint, region): + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "region": region, + "sslmode": "verify-full", + "sslrootcert": ssl_cert_path, + } + + async with PsycopgPoolAsync( + "", + connection_class=dsql.DSQLAsyncConnection, + kwargs=conn_params, # Pass params as kwargs + min_size=2, + max_size=10, + max_lifetime=3300, + ) as pool: + + async with pool.connection() as conn: + async with conn.cursor() as cur: + await cur.execute("SELECT 1") + result = await cur.fetchone() + print(f"Query result: {result}") + assert result[0] == 1 + + +async def main(): + + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + await connect_with_pool(cluster_user, cluster_endpoint, region) + finally: + pass + + print("Async connection pool exercised successfully") + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/python/psycopg/src/example_with_connection_pool_concurrent.py b/python/psycopg/src/example_with_connection_pool_concurrent.py new file mode 100644 index 00000000..fedabefb --- /dev/null +++ b/python/psycopg/src/example_with_connection_pool_concurrent.py @@ -0,0 +1,92 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import os +import threading +from psycopg_pool import ConnectionPool as PsycopgPool +import aurora_dsql_psycopg as dsql + + +def connect_with_pool_concurrent_connections(cluster_user, cluster_endpoint, region): + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "region": region, + "sslmode": "verify-full", + "sslrootcert": ssl_cert_path, + } + + pool = PsycopgPool( + "", # Empty conninfo + connection_class=dsql.DSQLConnection, + kwargs=conn_params, # Pass params as kwargs + min_size=2, + max_size=8, + max_lifetime=3300, + open=True, + ) + + # Shared list to collect exceptions from worker threads + exceptions = [] + + def worker(thread_id): + try: + with pool.connection() as conn: + with conn.cursor() as cur: + cur.execute("SELECT %s", (thread_id,)) + result = cur.fetchone() + print(f"Thread {thread_id} result: {result}") + assert result[0] == thread_id + + except Exception as e: + print(f"Thread {thread_id} failed: {e}") + exceptions.append((thread_id, e)) # Store exception with thread ID + + NUM_THREADS = 8 + threads = [] + for i in range(NUM_THREADS): + thread = threading.Thread(target=worker, args=(i + 1,)) + threads.append(thread) + thread.start() + + # Wait for all threads + for thread in threads: + thread.join() + + # Check if any threads had exceptions + if exceptions: + print(f"Errors occurred in {len(exceptions)} threads:") + for thread_id, exc in exceptions: + print(f" Thread {thread_id}: {exc}") + raise RuntimeError(f"One or more worker threads failed: {exceptions}") + +def main(): + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + connect_with_pool_concurrent_connections(cluster_user, cluster_endpoint, region) + finally: + pass + + print("Connection pool with concurrent connections exercised successfully") + + +if __name__ == "__main__": + main() diff --git a/python/psycopg/test/alternatives/test_example_boto.py b/python/psycopg/test/alternatives/test_example_boto.py new file mode 100644 index 00000000..4950fe0f --- /dev/null +++ b/python/psycopg/test/alternatives/test_example_boto.py @@ -0,0 +1,16 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from alternatives.example_boto import main + +import pytest + + +# Smoke tests that our example works fine +def test_example(): + try: + main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg/test/test_example.py b/python/psycopg/test/test_example.py index c1ff694d..e9c2ca1a 100644 --- a/python/psycopg/test/test_example.py +++ b/python/psycopg/test/test_example.py @@ -1,3 +1,8 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + from example import main import pytest diff --git a/python/psycopg/test/test_example_async.py b/python/psycopg/test/test_example_async.py new file mode 100644 index 00000000..b330ecf2 --- /dev/null +++ b/python/psycopg/test/test_example_async.py @@ -0,0 +1,16 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_async import main + + +# Smoke tests that our async example works fine +@pytest.mark.asyncio +async def test_example_async(): + try: + await main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg/test/test_example_with_pool.py b/python/psycopg/test/test_example_with_pool.py new file mode 100644 index 00000000..f957f9be --- /dev/null +++ b/python/psycopg/test/test_example_with_pool.py @@ -0,0 +1,15 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_with_connection_pool import main + + +# Smoke tests that our example works fine +def test_example_with_pool(): + try: + main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg/test/test_example_with_pool_async.py b/python/psycopg/test/test_example_with_pool_async.py new file mode 100644 index 00000000..2a4f0960 --- /dev/null +++ b/python/psycopg/test/test_example_with_pool_async.py @@ -0,0 +1,16 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_with_connection_pool_async import main + + +# Smoke tests that our async example works fine +@pytest.mark.asyncio +async def test_example_with_pool_async(): + try: + await main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg/test/test_example_with_pool_concurrent.py b/python/psycopg/test/test_example_with_pool_concurrent.py new file mode 100644 index 00000000..73f0d9b8 --- /dev/null +++ b/python/psycopg/test/test_example_with_pool_concurrent.py @@ -0,0 +1,15 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_with_connection_pool_concurrent import main + + +# Smoke tests that our example works fine +def test_example_with_pool_concurrent(): + try: + main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg2/README.md b/python/psycopg2/README.md index 5b55f6b9..ea86c05d 100644 --- a/python/psycopg2/README.md +++ b/python/psycopg2/README.md @@ -81,12 +81,13 @@ source .venv/bin/activate # Linux, macOS 2. Install the required packages for running the examples: ```bash -pip install "boto3>=1.35.74" -pip install --prefer-binary "psycopg2-binary>=2.9" +pip install -r requirements.txt ``` ### Run the code +#### What the Examples Do + The example demonstrates the following operations: - Opening a connection to an Aurora DSQL cluster @@ -100,6 +101,15 @@ The example is designed to work with both admin and non-admin users: **Note:** running the example will use actual resources in your AWS account and may incur charges. +The extended examples demonstrate: +- Creating a connection pool for Aurora DSQL +- Using async context managers for connection management +- Performing database operations through the pool +- Running multiple concurrent database operations +- Proper resource management with connection pools + +#### Environment Cluster Details + Set environment variables for your cluster details: ```bash @@ -113,17 +123,25 @@ export CLUSTER_ENDPOINT="" export REGION="" ``` -Run the example: +#### Run the example: ```bash +# Run example directly python src/example.py + +# Run example using pytest +pytest ./test/test_example.py + +# Run all using pytest +pytest ./test ``` -The example contains comments explaining the code and the operations being performed. +The examples contain comments explaining the code and the operations being performed. ## Additional resources * [Amazon Aurora DSQL Documentation](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/what-is-aurora-dsql.html) +* [Amazon Aurora DSQL Python Connector Documentation](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/SECTION_program-with-dsql-connector-for-python.html) * [Psycopg2 Documentation](https://www.psycopg.org/docs/) * [AWS SDK for Python (Boto3) Documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) diff --git a/python/psycopg2/requirements.txt b/python/psycopg2/requirements.txt index a26cb7de..84ceb617 100644 --- a/python/psycopg2/requirements.txt +++ b/python/psycopg2/requirements.txt @@ -1,3 +1,4 @@ +aurora-dsql-python-connector>=0.2.0 boto3>=1.35.74 psycopg2-binary>=2.9 pytest>=8 diff --git a/python/psycopg2/src/alternatives/example_boto.py b/python/psycopg2/src/alternatives/example_boto.py new file mode 100644 index 00000000..da3323e4 --- /dev/null +++ b/python/psycopg2/src/alternatives/example_boto.py @@ -0,0 +1,116 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 + +--- + +boto3 client alternative to using the aurora-dsql-python-connector + +this method is NOT recommended, but should be used when custom drivers or infrastructure are +incompatible with the dsql connector +""" + +import boto3 +import psycopg2 +import psycopg2.extensions +import os +import sys + + +def create_connection(cluster_user, cluster_endpoint, region): + # Generate a fresh password token for each connection, to ensure the token is not expired + # when the connection is established + client = boto3.client("dsql", region_name=region) + + if cluster_user == "admin": + password_token = client.generate_db_connect_admin_auth_token(cluster_endpoint, region) + else: + password_token = client.generate_db_connect_auth_token(cluster_endpoint, region) + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "sslmode": "verify-full", + "sslrootcert": "./root.pem", + "password": password_token + } + + # Use the more efficient connection method if it's supported. + if psycopg2.extensions.libpq_version() >= 170000: + conn_params["sslnegotiation"] = "direct" + + # Make a connection to the cluster + conn = psycopg2.connect(**conn_params) + + if cluster_user == "admin": + schema = "public" + else: + schema = "myschema" + + try: + with conn.cursor() as cur: + cur.execute(f"SET search_path = {schema};") + conn.commit() + except Exception as e: + conn.close() + raise e + + return conn + + +def exercise_connection(conn): + conn.set_session(autocommit=True) + + cur = conn.cursor() + + cur.execute(""" + CREATE TABLE IF NOT EXISTS owner( + id uuid NOT NULL DEFAULT gen_random_uuid(), + name varchar(30) NOT NULL, + city varchar(80) NOT NULL, + telephone varchar(20) DEFAULT NULL, + PRIMARY KEY (id)) + """) + + # Insert some rows + cur.execute("INSERT INTO owner(name, city, telephone) VALUES('John Doe', 'Anytown', '555-555-1999')") + + cur.execute("SELECT * FROM owner WHERE name='John Doe'") + row = cur.fetchone() + + # Verify the result we got is what we inserted before + assert row[0] != None + assert row[1] == "John Doe" + assert row[2] == "Anytown" + assert row[3] == "555-555-1999" + + # Clean up the table after the example. If we run the example again + # we do not have to worry about data inserted by previous runs + cur.execute("DELETE FROM owner where name = 'John Doe'") + + +def main(): + conn = None + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert cluster_endpoint is not None, "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + + conn = create_connection(cluster_user, cluster_endpoint, region) + exercise_connection(conn) + finally: + if conn is not None: + conn.close() + + print("Connection exercised successfully") + + +if __name__ == "__main__": + main() diff --git a/python/psycopg2/src/example.py b/python/psycopg2/src/example.py index d2069852..83744d9c 100644 --- a/python/psycopg2/src/example.py +++ b/python/psycopg2/src/example.py @@ -1,28 +1,24 @@ -import boto3 +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import os import psycopg2 import psycopg2.extensions -import os -import sys +import aurora_dsql_psycopg2 as dsql def create_connection(cluster_user, cluster_endpoint, region): - # Generate a fresh password token for each connection, to ensure the token is not expired - # when the connection is established - client = boto3.client("dsql", region_name=region) - - if cluster_user == "admin": - password_token = client.generate_db_connect_admin_auth_token(cluster_endpoint, region) - else: - password_token = client.generate_db_connect_auth_token(cluster_endpoint, region) conn_params = { "dbname": "postgres", "user": cluster_user, "host": cluster_endpoint, "port": "5432", + "region": region, "sslmode": "verify-full", "sslrootcert": "./root.pem", - "password": password_token } # Use the more efficient connection method if it's supported. @@ -30,7 +26,7 @@ def create_connection(cluster_user, cluster_endpoint, region): conn_params["sslnegotiation"] = "direct" # Make a connection to the cluster - conn = psycopg2.connect(**conn_params) + conn = dsql.connect(**conn_params) if cluster_user == "admin": schema = "public" @@ -53,23 +49,29 @@ def exercise_connection(conn): cur = conn.cursor() - cur.execute(""" + cur.execute("DROP TABLE IF EXISTS owner") + + cur.execute( + """ CREATE TABLE IF NOT EXISTS owner( id uuid NOT NULL DEFAULT gen_random_uuid(), name varchar(30) NOT NULL, city varchar(80) NOT NULL, telephone varchar(20) DEFAULT NULL, PRIMARY KEY (id)) - """) + """ + ) # Insert some rows - cur.execute("INSERT INTO owner(name, city, telephone) VALUES('John Doe', 'Anytown', '555-555-1999')") + cur.execute( + "INSERT INTO owner(name, city, telephone) VALUES('John Doe', 'Anytown', '555-555-1999')" + ) cur.execute("SELECT * FROM owner WHERE name='John Doe'") row = cur.fetchone() # Verify the result we got is what we inserted before - assert row[0] != None + assert row[0] is not None assert row[1] == "John Doe" assert row[2] == "Anytown" assert row[3] == "555-555-1999" @@ -86,7 +88,9 @@ def main(): assert cluster_user is not None, "CLUSTER_USER environment variable is not set" cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) - assert cluster_endpoint is not None, "CLUSTER_ENDPOINT environment variable is not set" + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" region = os.environ.get("REGION", None) assert region is not None, "REGION environment variable is not set" diff --git a/python/psycopg2/src/example_with_connection_pool.py b/python/psycopg2/src/example_with_connection_pool.py new file mode 100644 index 00000000..f4c96bbe --- /dev/null +++ b/python/psycopg2/src/example_with_connection_pool.py @@ -0,0 +1,68 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import os +import aurora_dsql_psycopg2 as dsql + + +def connect_with_pool(cluster_user, cluster_endpoint, region): + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "region": region, + "sslmode": "verify-full", + "sslrootcert": ssl_cert_path, + } + + pool = dsql.AuroraDSQLThreadedConnectionPool( + minconn=2, + maxconn=8, + **conn_params, + ) + + # Use the pool as a context manager + with pool as p: + # Request a connection from the pool + conn = p.getconn() + try: + # Execute a query + with conn.cursor() as cur: + cur.execute("SELECT 1") + result = cur.fetchone() + print(f"Query result: {result}") + assert result[0] == 1 + finally: + # Return connection to pool + p.putconn(conn) + + +def main(): + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + connect_with_pool(cluster_user, cluster_endpoint, region) + finally: + pass + + print("Connection pool exercised successfully") + + +if __name__ == "__main__": + main() diff --git a/python/psycopg2/src/example_with_connection_pool_concurrent.py b/python/psycopg2/src/example_with_connection_pool_concurrent.py new file mode 100644 index 00000000..c1cdbf4b --- /dev/null +++ b/python/psycopg2/src/example_with_connection_pool_concurrent.py @@ -0,0 +1,90 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import os +import threading +import aurora_dsql_psycopg2 as dsql + + +def connect_with_pool_concurrent_connections(cluster_user, cluster_endpoint, region): + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "region": region, + "sslmode": "verify-full", + "sslrootcert": ssl_cert_path, + } + + pool = dsql.AuroraDSQLThreadedConnectionPool( + minconn=2, + maxconn=8, + **conn_params, + ) + + # Shared list to collect exceptions from worker threads + exceptions = [] + + def worker(thread_id): + try: + conn = pool.getconn() + try: + with conn.cursor() as cur: + cur.execute("SELECT %s", (thread_id,)) + result = cur.fetchone() + print(f"Thread {thread_id} result: {result}") + assert result[0] == thread_id + finally: + pool.putconn(conn) + except Exception as e: + print(f"Thread {thread_id} failed: {e}") + exceptions.append((thread_id, e)) # Store exception with thread ID + + NUM_THREADS = 8 + threads = [] + for i in range(NUM_THREADS): + thread = threading.Thread(target=worker, args=(i + 1,)) + threads.append(thread) + thread.start() + + # Wait for all threads + for thread in threads: + thread.join() + + # Check if any threads had exceptions + if exceptions: + print(f"Errors occurred in {len(exceptions)} threads:") + for thread_id, exc in exceptions: + print(f" Thread {thread_id}: {exc}") + raise RuntimeError(f"One or more worker threads failed: {exceptions}") + + +def main(): + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + connect_with_pool_concurrent_connections(cluster_user, cluster_endpoint, region) + finally: + pass + + print("Connection pool with concurrent connections exercised successfully") + + +if __name__ == "__main__": + main() diff --git a/python/psycopg2/test/alternatives/test_example_boto.py b/python/psycopg2/test/alternatives/test_example_boto.py new file mode 100644 index 00000000..4950fe0f --- /dev/null +++ b/python/psycopg2/test/alternatives/test_example_boto.py @@ -0,0 +1,16 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from alternatives.example_boto import main + +import pytest + + +# Smoke tests that our example works fine +def test_example(): + try: + main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg2/test/test_example.py b/python/psycopg2/test/test_example.py index c1ff694d..1fd5356b 100644 --- a/python/psycopg2/test/test_example.py +++ b/python/psycopg2/test/test_example.py @@ -1,6 +1,10 @@ -from example import main +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" import pytest +from example import main # Smoke tests that our example works fine diff --git a/python/psycopg2/test/test_example_with_pool.py b/python/psycopg2/test/test_example_with_pool.py new file mode 100644 index 00000000..f957f9be --- /dev/null +++ b/python/psycopg2/test/test_example_with_pool.py @@ -0,0 +1,15 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_with_connection_pool import main + + +# Smoke tests that our example works fine +def test_example_with_pool(): + try: + main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg2/test/test_example_with_pool_concurrent.py b/python/psycopg2/test/test_example_with_pool_concurrent.py new file mode 100644 index 00000000..73f0d9b8 --- /dev/null +++ b/python/psycopg2/test/test_example_with_pool_concurrent.py @@ -0,0 +1,15 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_with_connection_pool_concurrent import main + + +# Smoke tests that our example works fine +def test_example_with_pool_concurrent(): + try: + main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}")