From 03a535380796f00f2856dfcd40fbbaae6be94774 Mon Sep 17 00:00:00 2001 From: Anwesha Mukherjee Date: Mon, 15 Dec 2025 03:54:14 +0000 Subject: [PATCH] Migrate Python Connector Examples Adding the examples using the Aurora DSQL Python Connector (with the smoke tests), since using the connector is the preferred method and offers a more seamless user experience. This is the promoted default path on the documentation, and the samples should be consistent with this. Moved the boto3 examples to an `alternatives`, for use as needed. This also offers agentic benefits, as most agents are now able to leverage a single search space and cross-example contex can stay intact. It creates stronger retrieval since all the variants sit in the same space, and agents will quickly be able to locate the best example since by default, the documentation normally points to the samples repository. --- README.md | 1 + python/asyncpg/README.md | 151 ++++++++++++++++++ python/asyncpg/pyproject.toml | 4 + python/asyncpg/requirements.txt | 5 + python/asyncpg/src/example.py | 101 ++++++++++++ .../src/example_with_connection_pool.py | 64 ++++++++ ...example_with_connection_pool_concurrent.py | 79 +++++++++ python/asyncpg/test/test_example.py | 16 ++ python/asyncpg/test/test_example_with_pool.py | 16 ++ .../test/test_example_with_pool_concurrent.py | 16 ++ python/psycopg/README.md | 32 +++- python/psycopg/requirements.txt | 5 +- .../psycopg/src/alternatives/example_boto.py | 120 ++++++++++++++ python/psycopg/src/example.py | 37 ++--- python/psycopg/src/example_async.py | 110 +++++++++++++ .../src/example_with_connection_pool.py | 68 ++++++++ .../src/example_with_connection_pool_async.py | 67 ++++++++ ...example_with_connection_pool_concurrent.py | 92 +++++++++++ .../test/alternatives/test_example_boto.py | 16 ++ python/psycopg/test/test_example.py | 5 + python/psycopg/test/test_example_async.py | 16 ++ python/psycopg/test/test_example_with_pool.py | 15 ++ .../test/test_example_with_pool_async.py | 16 ++ .../test/test_example_with_pool_concurrent.py | 15 ++ python/psycopg2/README.md | 26 ++- python/psycopg2/requirements.txt | 1 + .../psycopg2/src/alternatives/example_boto.py | 116 ++++++++++++++ python/psycopg2/src/example.py | 40 ++--- .../src/example_with_connection_pool.py | 68 ++++++++ ...example_with_connection_pool_concurrent.py | 90 +++++++++++ .../test/alternatives/test_example_boto.py | 16 ++ python/psycopg2/test/test_example.py | 6 +- .../psycopg2/test/test_example_with_pool.py | 15 ++ .../test/test_example_with_pool_concurrent.py | 15 ++ 34 files changed, 1415 insertions(+), 45 deletions(-) create mode 100644 python/asyncpg/README.md create mode 100644 python/asyncpg/pyproject.toml create mode 100644 python/asyncpg/requirements.txt create mode 100644 python/asyncpg/src/example.py create mode 100644 python/asyncpg/src/example_with_connection_pool.py create mode 100644 python/asyncpg/src/example_with_connection_pool_concurrent.py create mode 100644 python/asyncpg/test/test_example.py create mode 100644 python/asyncpg/test/test_example_with_pool.py create mode 100644 python/asyncpg/test/test_example_with_pool_concurrent.py create mode 100644 python/psycopg/src/alternatives/example_boto.py create mode 100644 python/psycopg/src/example_async.py create mode 100644 python/psycopg/src/example_with_connection_pool.py create mode 100644 python/psycopg/src/example_with_connection_pool_async.py create mode 100644 python/psycopg/src/example_with_connection_pool_concurrent.py create mode 100644 python/psycopg/test/alternatives/test_example_boto.py create mode 100644 python/psycopg/test/test_example_async.py create mode 100644 python/psycopg/test/test_example_with_pool.py create mode 100644 python/psycopg/test/test_example_with_pool_async.py create mode 100644 python/psycopg/test/test_example_with_pool_concurrent.py create mode 100644 python/psycopg2/src/alternatives/example_boto.py create mode 100644 python/psycopg2/src/example_with_connection_pool.py create mode 100644 python/psycopg2/src/example_with_connection_pool_concurrent.py create mode 100644 python/psycopg2/test/alternatives/test_example_boto.py create mode 100644 python/psycopg2/test/test_example_with_pool.py create mode 100644 python/psycopg2/test/test_example_with_pool_concurrent.py diff --git a/README.md b/README.md index a8b1e774..38ed5100 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ The subdirectories contain code examples for connecting and using Aurora DSQL in | JavaScript | [node-postgres (standalone)](javascript/node-postgres/) | | JavaScript | [Postgres.js](javascript/postgres-js/) | | Python | [Jupyter](python/jupyter) | +| Python | [asyncpg](python/asyncpg/) | | Python | [psycopg](python/psycopg/) | | Python | [psycopg2](python/psycopg2/) | | Python | [SQLAlchemy](python/sqlalchemy) | diff --git a/python/asyncpg/README.md b/python/asyncpg/README.md new file mode 100644 index 00000000..30dd6202 --- /dev/null +++ b/python/asyncpg/README.md @@ -0,0 +1,151 @@ +# Aurora DSQL with asyncpg + +This example demonstrates how to use the Aurora DSQL Python Connector with asyncpg to connect to Amazon Aurora DSQL clusters and perform basic database operations. + +Aurora DSQL is a distributed SQL database service that provides high availability and scalability for +your PostgreSQL-compatible applications. +Asyncpg is a popular PostgreSQL database library for Python that allows +you to interact with PostgreSQL databases using Python code. + +## About the code example + +The example demonstrates a flexible connection approach that works for both admin and non-admin users: + +* When connecting as an **admin user**, the example uses the `public` schema and generates an admin authentication + token. +* When connecting as a **non-admin user**, the example uses a custom `myschema` schema and generates a standard + authentication token. + +The code automatically detects the user type and adjusts its behavior accordingly. + +## ⚠️ Important + +* Running this code might result in charges to your AWS account. +* We recommend that you grant your code least privilege. At most, grant only the + minimum permissions required to perform the task. For more information, see + [Grant least privilege](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege). +* This code is not tested in every AWS Region. For more information, see + [AWS Regional Services](https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services). + + +## TLS connection configuration + +This example uses direct TLS connections where supported, and verifies the server certificate is trusted. Verified SSL +connections should be used where possible to ensure data security during transmission. + +* Driver versions following the release of PostgreSQL 17 support direct TLS connections, bypassing the traditional + PostgreSQL connection preamble +* Direct TLS connections provide improved connection performance and enhanced security +* Not all PostgreSQL drivers support direct TLS connections yet, or only in recent versions following PostgreSQL 17 +* Ensure your installed driver version supports direct TLS negotiation, or use a version that is at least as recent as + the one used in this sample +* If your driver doesn't support direct TLS connections, you may need to use the traditional preamble connection instead + + +### Prerequisites + +* You must have an AWS account, and have your default credentials and AWS Region + configured as described in the + [Globally configuring AWS SDKs and tools](https://docs.aws.amazon.com/credref/latest/refdocs/creds-config-files.html) + guide. +* [Python 3.10.0](https://www.python.org/) or later. +* You must have an Aurora DSQL cluster. For information about creating an Aurora DSQL cluster, see the + [Getting started with Aurora DSQL](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/getting-started.html) + guide. +* If connecting as a non-admin user, ensure the user is linked to an IAM role and is granted access to the `myschema` + schema. See the + [Using database roles with IAM roles](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/using-database-and-iam-roles.html) + guide. + +### Download the Amazon root certificate from the official trust store + +Download the Amazon root certificate from the official trust store: + +``` +wget https://www.amazontrust.com/repository/AmazonRootCA1.pem -O root.pem +``` + +### Set up environment for examples + +1. Create and activate a Python virtual environment: + +```bash +python3 -m venv .venv +source .venv/bin/activate # Linux, macOS +# or +.venv\Scripts\activate # Windows +``` + +2. Install the required packages for running the examples: + +```bash +pip install -r requirements.txt +``` + +### Run the code + +#### What the Examples Do + +The example demonstrates the following operations: + +- Opening a connection to an Aurora DSQL cluster +- Creating a table +- Inserting and querying data + +The example is designed to work with both admin and non-admin users: + +- When run as an admin user, it uses the `public` schema +- When run as a non-admin user, it uses the `myschema` schema + +**Note:** running the example will use actual resources in your AWS account and may incur charges. + +The connection pool examples demonstrate: +- Creating a connection pool for Aurora DSQL +- Using async context managers for connection management +- Performing database operations through the pool +- Running multiple concurrent database operations +- Using asyncio.gather() for parallel execution +- Proper resource management with connection pools + +#### Environment Cluster Details + +Set environment variables for your cluster details: + +```bash +# e.g. "admin" +export CLUSTER_USER="" + +# e.g. "foo0bar1baz2quux3quuux4.dsql.us-east-1.on.aws" +export CLUSTER_ENDPOINT="" + +# e.g. "us-east-1" +export REGION="" +``` + +#### Run the example: + +```bash +# Run example directly +python src/example.py + +# Run example using pytest +pytest ./test/test_example.py + +# Run all using pytest +pytest ./test +``` + +The examples contain comments explaining the code and the operations being performed. + +## Additional resources + +* [Amazon Aurora DSQL Documentation](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/what-is-aurora-dsql.html) +* [Amazon Aurora DSQL Python Connector Documentation](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/SECTION_program-with-dsql-connector-for-python.html) +* [Asyncpg Documentation](https://magicstack.github.io/asyncpg/current/) +* [AWS SDK for Python (Boto3) Documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) + +--- + +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/python/asyncpg/pyproject.toml b/python/asyncpg/pyproject.toml new file mode 100644 index 00000000..748eaf71 --- /dev/null +++ b/python/asyncpg/pyproject.toml @@ -0,0 +1,4 @@ +[tool.pytest.ini_options] +pythonpath = [ + "src" +] diff --git a/python/asyncpg/requirements.txt b/python/asyncpg/requirements.txt new file mode 100644 index 00000000..f92e0aa2 --- /dev/null +++ b/python/asyncpg/requirements.txt @@ -0,0 +1,5 @@ +aurora-dsql-python-connector>=0.2.0 +boto3>=1.35.74 +asyncpg>=0.30.0 +pytest>=8.0 +pytest-asyncio>=1.2.0 diff --git a/python/asyncpg/src/example.py b/python/asyncpg/src/example.py new file mode 100644 index 00000000..85e7bfff --- /dev/null +++ b/python/asyncpg/src/example.py @@ -0,0 +1,101 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import asyncio +import os + +import aurora_dsql_asyncpg as dsql + + +async def create_connection(cluster_user, cluster_endpoint, region): + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "database": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": 5432, + "region": region, + "ssl": "verify-full", + "sslrootcert": ssl_cert_path, + } + + # Make a connection to the cluster + conn = await dsql.connect(**conn_params) + + if cluster_user == "admin": + schema = "public" + else: + schema = "myschema" + + try: + await conn.execute(f"SET search_path = {schema};") + except Exception as e: + await conn.close() + raise e + + return conn + + +async def exercise_connection(conn): + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS owner( + id uuid NOT NULL DEFAULT gen_random_uuid(), + name varchar(30) NOT NULL, + city varchar(80) NOT NULL, + telephone varchar(20) DEFAULT NULL, + PRIMARY KEY (id)) + """ + ) + + # Insert some rows + await conn.execute( + "INSERT INTO owner(name, city, telephone) VALUES($1, $2, $3)", + "John Doe", + "Anytown", + "555-555-1999", + ) + + row = await conn.fetchrow("SELECT * FROM owner WHERE name=$1", "John Doe") + + # Verify the result we got is what we inserted before + assert row[0] is not None + assert row[1] == "John Doe" + assert row[2] == "Anytown" + assert row[3] == "555-555-1999" + + # Clean up the table after the example. If we run the example again + # we do not have to worry about data inserted by previous runs + await conn.execute("DELETE FROM owner WHERE name = $1", "John Doe") + + +async def main(): + conn = None + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + + conn = await create_connection(cluster_user, cluster_endpoint, region) + await exercise_connection(conn) + finally: + if conn is not None: + await conn.close() + + print("Connection exercised successfully") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/asyncpg/src/example_with_connection_pool.py b/python/asyncpg/src/example_with_connection_pool.py new file mode 100644 index 00000000..92f0da4c --- /dev/null +++ b/python/asyncpg/src/example_with_connection_pool.py @@ -0,0 +1,64 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import asyncio +import os + +import aurora_dsql_asyncpg as dsql + + +async def connect_with_pool(cluster_user, cluster_endpoint, region): + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + pool_params = { + "database": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": 5432, + "region": region, + "ssl": "verify-full", + "sslrootcert": ssl_cert_path, + "min_size": 2, + "max_size": 5, + } + + pool = await dsql.create_pool(**pool_params) + try: + async with pool.acquire() as conn: + result = await conn.fetchval("SELECT 1") + assert result == 1 + finally: + await pool.close() + + +async def main(): + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + await connect_with_pool(cluster_user, cluster_endpoint, region) + + finally: + pass + + print("Pool exercised successfully") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/asyncpg/src/example_with_connection_pool_concurrent.py b/python/asyncpg/src/example_with_connection_pool_concurrent.py new file mode 100644 index 00000000..d39d3859 --- /dev/null +++ b/python/asyncpg/src/example_with_connection_pool_concurrent.py @@ -0,0 +1,79 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import asyncio +import os + +import aurora_dsql_asyncpg as dsql + + +async def worker_task(pool, worker_id): + """Simulate concurrent database operations.""" + + async with pool.acquire() as conn: + result = await conn.fetchval("SELECT $1::int", worker_id) + return result + + +async def connect_with_pool_concurrent_connections( + cluster_user, cluster_endpoint, region +): + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + pool_params = { + "database": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": 5432, + "region": region, + "ssl": "verify-full", + "sslrootcert": ssl_cert_path, + "min_size": 5, + "max_size": 10, + } + + pool = None + try: + pool = await dsql.create_pool(**pool_params) + # Run multiple concurrent workers + num_workers = 5 + tasks = [worker_task(pool, i) for i in range(num_workers)] + results = await asyncio.gather(*tasks) + for result in results: + print(result) + finally: + if pool is not None: + await pool.close() + + +async def main(): + + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + + await connect_with_pool_concurrent_connections( + cluster_user, cluster_endpoint, region + ) + + finally: + pass + + print("Concurrent pool operations completed successfully") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/asyncpg/test/test_example.py b/python/asyncpg/test/test_example.py new file mode 100644 index 00000000..f4a9955b --- /dev/null +++ b/python/asyncpg/test/test_example.py @@ -0,0 +1,16 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example import main + + +# Smoke tests that our example works fine +@pytest.mark.asyncio +async def test_example(): + try: + await main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/asyncpg/test/test_example_with_pool.py b/python/asyncpg/test/test_example_with_pool.py new file mode 100644 index 00000000..89357976 --- /dev/null +++ b/python/asyncpg/test/test_example_with_pool.py @@ -0,0 +1,16 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_with_connection_pool import main + + +# Smoke tests that our example works fine +@pytest.mark.asyncio +async def test_example_with_pool(): + try: + await main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/asyncpg/test/test_example_with_pool_concurrent.py b/python/asyncpg/test/test_example_with_pool_concurrent.py new file mode 100644 index 00000000..912f0ab9 --- /dev/null +++ b/python/asyncpg/test/test_example_with_pool_concurrent.py @@ -0,0 +1,16 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_with_connection_pool_concurrent import main + + +# Smoke tests that our example works fine +@pytest.mark.asyncio +async def test_example_with_pool_concurrent(): + try: + await main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg/README.md b/python/psycopg/README.md index 53b4d969..8d6df46b 100644 --- a/python/psycopg/README.md +++ b/python/psycopg/README.md @@ -20,6 +20,13 @@ The example demonstrates a flexible connection approach that works for both admi The code automatically detects the user type and adjusts its behavior accordingly. +### Recommended vs. Alternative + +The default examples uses the preferred method leveraging the +[DSQL Python Connector](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/SECTION_program-with-dsql-connector-for-python.html), +but an [alternative example](src/alternatives/example_boto.py) that connects by creating boto3 client (not recommended) +is provided. + ## ⚠️ Important * Running this code might result in charges to your AWS account. @@ -50,7 +57,7 @@ connections should be used where possible to ensure data security during transmi configured as described in the [Globally configuring AWS SDKs and tools](https://docs.aws.amazon.com/credref/latest/refdocs/creds-config-files.html) guide. -* [Python 3.8.0](https://www.python.org/) or later. +* [Python 3.10.0](https://www.python.org/) or later. * You must have an Aurora DSQL cluster. For information about creating an Aurora DSQL cluster, see the [Getting started with Aurora DSQL](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/getting-started.html) guide. @@ -86,6 +93,8 @@ pip install -r requirements.txt ### Run the code +#### What the Examples Do + The example demonstrates the following operations: - Opening a connection to an Aurora DSQL cluster @@ -99,6 +108,15 @@ The example is designed to work with both admin and non-admin users: **Note:** running the example will use actual resources in your AWS account and may incur charges. +The extended examples demonstrate: +- Creating a connection pool for Aurora DSQL +- Using async context managers for connection management +- Performing database operations through the pool +- Running multiple concurrent database operations +- Proper resource management with connection pools + +#### Environment Cluster Details + Set environment variables for your cluster details: ```bash @@ -112,17 +130,25 @@ export CLUSTER_ENDPOINT="" export REGION="" ``` -Run the example: +#### Run the example: ```bash +# Run example directly python src/example.py + +# Run example using pytest +pytest ./test/test_example.py + +# Run all using pytest +pytest ./test ``` -The example contains comments explaining the code and the operations being performed. +The examples contain comments explaining the code and the operations being performed. ## Additional resources * [Amazon Aurora DSQL Documentation](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/what-is-aurora-dsql.html) +* [Amazon Aurora DSQL Python Connector Documentation](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/SECTION_program-with-dsql-connector-for-python.html) * [Psycopg Documentation](https://www.psycopg.org/psycopg3/docs/) * [AWS SDK for Python (Boto3) Documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) diff --git a/python/psycopg/requirements.txt b/python/psycopg/requirements.txt index 254d91bd..7776da25 100644 --- a/python/psycopg/requirements.txt +++ b/python/psycopg/requirements.txt @@ -1,3 +1,6 @@ +aurora-dsql-python-connector>=0.2.0 boto3>=1.35.74 -psycopg[binary]>=3 +psycopg[binary]>=3.1.0 +psycopg-pool>=3.2.6 pytest>=8 +pytest-asyncio>=1.2.0 diff --git a/python/psycopg/src/alternatives/example_boto.py b/python/psycopg/src/alternatives/example_boto.py new file mode 100644 index 00000000..ab4120e2 --- /dev/null +++ b/python/psycopg/src/alternatives/example_boto.py @@ -0,0 +1,120 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 + +--- + +boto3 client alternative to using the aurora-dsql-python-connector + +this method is NOT recommended, but should be used when custom drivers or infrastructure are +incompatible with the dsql connector +""" + +import boto3 +import psycopg +import os +import sys +from psycopg import pq + + +def create_connection(cluster_user, cluster_endpoint, region): + # Generate a fresh password token for each connection, to ensure the token is not expired + # when the connection is established + client = boto3.client("dsql", region_name=region) + + if cluster_user == "admin": + password_token = client.generate_db_connect_admin_auth_token(cluster_endpoint, region) + else: + password_token = client.generate_db_connect_auth_token(cluster_endpoint, region) + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "sslmode": "verify-full", + "sslrootcert": ssl_cert_path, + "password": password_token + } + + # Use the more efficient connection method if it's supported. + if pq.version() >= 170000: + conn_params["sslnegotiation"] = "direct" + + # Make a connection to the cluster + conn = psycopg.connect(**conn_params) + + if cluster_user == "admin": + schema = "public" + else: + schema = "myschema" + + try: + with conn.cursor() as cur: + cur.execute(f"SET search_path = {schema};") + conn.commit() + except Exception as e: + conn.close() + raise e + + return conn + + +def exercise_connection(conn): + conn.set_autocommit(True) + + cur = conn.cursor() + + cur.execute(""" + CREATE TABLE IF NOT EXISTS owner( + id uuid NOT NULL DEFAULT gen_random_uuid(), + name varchar(30) NOT NULL, + city varchar(80) NOT NULL, + telephone varchar(20) DEFAULT NULL, + PRIMARY KEY (id)) + """) + + # Insert some rows + cur.execute("INSERT INTO owner(name, city, telephone) VALUES('John Doe', 'Anytown', '555-555-1999')") + + cur.execute("SELECT * FROM owner WHERE name='John Doe'") + row = cur.fetchone() + + # Verify the result we got is what we inserted before + assert row[0] != None + assert row[1] == "John Doe" + assert row[2] == "Anytown" + assert row[3] == "555-555-1999" + + # Clean up the table after the example. If we run the example again + # we do not have to worry about data inserted by previous runs + cur.execute("DELETE FROM owner where name = 'John Doe'") + + +def main(): + conn = None + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert cluster_endpoint is not None, "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + + conn = create_connection(cluster_user, cluster_endpoint, region) + exercise_connection(conn) + finally: + if conn is not None: + conn.close() + + print("Connection exercised successfully") + + +if __name__ == "__main__": + main() diff --git a/python/psycopg/src/example.py b/python/psycopg/src/example.py index 51b65022..49979802 100644 --- a/python/psycopg/src/example.py +++ b/python/psycopg/src/example.py @@ -1,19 +1,14 @@ -import boto3 -import psycopg +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + import os -import sys from psycopg import pq +import aurora_dsql_psycopg as dsql def create_connection(cluster_user, cluster_endpoint, region): - # Generate a fresh password token for each connection, to ensure the token is not expired - # when the connection is established - client = boto3.client("dsql", region_name=region) - - if cluster_user == "admin": - password_token = client.generate_db_connect_admin_auth_token(cluster_endpoint, region) - else: - password_token = client.generate_db_connect_auth_token(cluster_endpoint, region) ssl_cert_path = "./root.pem" if not os.path.isfile(ssl_cert_path): @@ -24,9 +19,9 @@ def create_connection(cluster_user, cluster_endpoint, region): "user": cluster_user, "host": cluster_endpoint, "port": "5432", + "region": region, "sslmode": "verify-full", "sslrootcert": ssl_cert_path, - "password": password_token } # Use the more efficient connection method if it's supported. @@ -34,7 +29,7 @@ def create_connection(cluster_user, cluster_endpoint, region): conn_params["sslnegotiation"] = "direct" # Make a connection to the cluster - conn = psycopg.connect(**conn_params) + conn = dsql.connect(**conn_params) if cluster_user == "admin": schema = "public" @@ -57,23 +52,27 @@ def exercise_connection(conn): cur = conn.cursor() - cur.execute(""" + cur.execute( + """ CREATE TABLE IF NOT EXISTS owner( id uuid NOT NULL DEFAULT gen_random_uuid(), name varchar(30) NOT NULL, city varchar(80) NOT NULL, telephone varchar(20) DEFAULT NULL, PRIMARY KEY (id)) - """) + """ + ) # Insert some rows - cur.execute("INSERT INTO owner(name, city, telephone) VALUES('John Doe', 'Anytown', '555-555-1999')") + cur.execute( + "INSERT INTO owner(name, city, telephone) VALUES('John Doe', 'Anytown', '555-555-1999')" + ) cur.execute("SELECT * FROM owner WHERE name='John Doe'") row = cur.fetchone() # Verify the result we got is what we inserted before - assert row[0] != None + assert row[0] is not None assert row[1] == "John Doe" assert row[2] == "Anytown" assert row[3] == "555-555-1999" @@ -90,7 +89,9 @@ def main(): assert cluster_user is not None, "CLUSTER_USER environment variable is not set" cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) - assert cluster_endpoint is not None, "CLUSTER_ENDPOINT environment variable is not set" + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" region = os.environ.get("REGION", None) assert region is not None, "REGION environment variable is not set" diff --git a/python/psycopg/src/example_async.py b/python/psycopg/src/example_async.py new file mode 100644 index 00000000..e33e4480 --- /dev/null +++ b/python/psycopg/src/example_async.py @@ -0,0 +1,110 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import os +from psycopg import pq +import aurora_dsql_psycopg as dsql + + +async def create_connection(cluster_user, cluster_endpoint, region): + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "region": region, + "sslmode": "verify-full", + "sslrootcert": ssl_cert_path, + } + + # Use the more efficient connection method if it's supported. + if pq.version() >= 170000: + conn_params["sslnegotiation"] = "direct" + + # Make a connection to the cluster + conn = await dsql.DSQLAsyncConnection.connect(**conn_params) + + if cluster_user == "admin": + schema = "public" + else: + schema = "myschema" + + try: + async with conn.cursor() as cur: + await cur.execute(f"SET search_path = {schema};") + await conn.commit() + except Exception as e: + await conn.close() + raise e + + return conn + + +async def exercise_connection(conn): + await conn.set_autocommit(True) + + async with conn.cursor() as cur: + await cur.execute( + """ + CREATE TABLE IF NOT EXISTS owner( + id uuid NOT NULL DEFAULT gen_random_uuid(), + name varchar(30) NOT NULL, + city varchar(80) NOT NULL, + telephone varchar(20) DEFAULT NULL, + PRIMARY KEY (id)) + """ + ) + + # Insert some rows + await cur.execute( + "INSERT INTO owner(name, city, telephone) VALUES('John Doe', 'Anytown', '555-555-1999')" + ) + + await cur.execute("SELECT * FROM owner WHERE name='John Doe'") + row = await cur.fetchone() + + # Verify the result we got is what we inserted before + assert row[0] is not None + assert row[1] == "John Doe" + assert row[2] == "Anytown" + assert row[3] == "555-555-1999" + + # Clean up the table after the example. If we run the example again + # we do not have to worry about data inserted by previous runs + await cur.execute("DELETE FROM owner where name = 'John Doe'") + + +async def main(): + conn = None + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + + conn = await create_connection(cluster_user, cluster_endpoint, region) + await exercise_connection(conn) + finally: + if conn is not None: + await conn.close() + + print("Connection exercised successfully") + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/python/psycopg/src/example_with_connection_pool.py b/python/psycopg/src/example_with_connection_pool.py new file mode 100644 index 00000000..55a7f68e --- /dev/null +++ b/python/psycopg/src/example_with_connection_pool.py @@ -0,0 +1,68 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import os +from psycopg_pool import ConnectionPool as PsycopgPool +import aurora_dsql_psycopg as dsql + + +def connect_with_pool(cluster_user, cluster_endpoint, region): + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "region": region, + "sslmode": "verify-full", + "sslrootcert": ssl_cert_path, + } + + pool = PsycopgPool( + "", # Empty conninfo + connection_class=dsql.DSQLConnection, + kwargs=conn_params, # Pass params as kwargs + min_size=2, + max_size=8, + max_lifetime=3300, + ) + + # Use the pool as a context manager + with pool as p: + # Request a connection from the pool + with p.connection() as conn: + # Execute a query + with conn.cursor() as cur: + cur.execute("SELECT 1") + result = cur.fetchone() + print(f"Query result: {result}") + assert result[0] == 1 + + +def main(): + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + connect_with_pool(cluster_user, cluster_endpoint, region) + finally: + pass + + print("Connection pool exercised successfully") + + +if __name__ == "__main__": + main() diff --git a/python/psycopg/src/example_with_connection_pool_async.py b/python/psycopg/src/example_with_connection_pool_async.py new file mode 100644 index 00000000..2c47f9e3 --- /dev/null +++ b/python/psycopg/src/example_with_connection_pool_async.py @@ -0,0 +1,67 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import os +from psycopg_pool import AsyncConnectionPool as PsycopgPoolAsync +import aurora_dsql_psycopg as dsql + + +async def connect_with_pool(cluster_user, cluster_endpoint, region): + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "region": region, + "sslmode": "verify-full", + "sslrootcert": ssl_cert_path, + } + + async with PsycopgPoolAsync( + "", + connection_class=dsql.DSQLAsyncConnection, + kwargs=conn_params, # Pass params as kwargs + min_size=2, + max_size=10, + max_lifetime=3300, + ) as pool: + + async with pool.connection() as conn: + async with conn.cursor() as cur: + await cur.execute("SELECT 1") + result = await cur.fetchone() + print(f"Query result: {result}") + assert result[0] == 1 + + +async def main(): + + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + await connect_with_pool(cluster_user, cluster_endpoint, region) + finally: + pass + + print("Async connection pool exercised successfully") + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/python/psycopg/src/example_with_connection_pool_concurrent.py b/python/psycopg/src/example_with_connection_pool_concurrent.py new file mode 100644 index 00000000..fedabefb --- /dev/null +++ b/python/psycopg/src/example_with_connection_pool_concurrent.py @@ -0,0 +1,92 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import os +import threading +from psycopg_pool import ConnectionPool as PsycopgPool +import aurora_dsql_psycopg as dsql + + +def connect_with_pool_concurrent_connections(cluster_user, cluster_endpoint, region): + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "region": region, + "sslmode": "verify-full", + "sslrootcert": ssl_cert_path, + } + + pool = PsycopgPool( + "", # Empty conninfo + connection_class=dsql.DSQLConnection, + kwargs=conn_params, # Pass params as kwargs + min_size=2, + max_size=8, + max_lifetime=3300, + open=True, + ) + + # Shared list to collect exceptions from worker threads + exceptions = [] + + def worker(thread_id): + try: + with pool.connection() as conn: + with conn.cursor() as cur: + cur.execute("SELECT %s", (thread_id,)) + result = cur.fetchone() + print(f"Thread {thread_id} result: {result}") + assert result[0] == thread_id + + except Exception as e: + print(f"Thread {thread_id} failed: {e}") + exceptions.append((thread_id, e)) # Store exception with thread ID + + NUM_THREADS = 8 + threads = [] + for i in range(NUM_THREADS): + thread = threading.Thread(target=worker, args=(i + 1,)) + threads.append(thread) + thread.start() + + # Wait for all threads + for thread in threads: + thread.join() + + # Check if any threads had exceptions + if exceptions: + print(f"Errors occurred in {len(exceptions)} threads:") + for thread_id, exc in exceptions: + print(f" Thread {thread_id}: {exc}") + raise RuntimeError(f"One or more worker threads failed: {exceptions}") + +def main(): + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + connect_with_pool_concurrent_connections(cluster_user, cluster_endpoint, region) + finally: + pass + + print("Connection pool with concurrent connections exercised successfully") + + +if __name__ == "__main__": + main() diff --git a/python/psycopg/test/alternatives/test_example_boto.py b/python/psycopg/test/alternatives/test_example_boto.py new file mode 100644 index 00000000..4950fe0f --- /dev/null +++ b/python/psycopg/test/alternatives/test_example_boto.py @@ -0,0 +1,16 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from alternatives.example_boto import main + +import pytest + + +# Smoke tests that our example works fine +def test_example(): + try: + main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg/test/test_example.py b/python/psycopg/test/test_example.py index c1ff694d..e9c2ca1a 100644 --- a/python/psycopg/test/test_example.py +++ b/python/psycopg/test/test_example.py @@ -1,3 +1,8 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + from example import main import pytest diff --git a/python/psycopg/test/test_example_async.py b/python/psycopg/test/test_example_async.py new file mode 100644 index 00000000..b330ecf2 --- /dev/null +++ b/python/psycopg/test/test_example_async.py @@ -0,0 +1,16 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_async import main + + +# Smoke tests that our async example works fine +@pytest.mark.asyncio +async def test_example_async(): + try: + await main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg/test/test_example_with_pool.py b/python/psycopg/test/test_example_with_pool.py new file mode 100644 index 00000000..f957f9be --- /dev/null +++ b/python/psycopg/test/test_example_with_pool.py @@ -0,0 +1,15 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_with_connection_pool import main + + +# Smoke tests that our example works fine +def test_example_with_pool(): + try: + main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg/test/test_example_with_pool_async.py b/python/psycopg/test/test_example_with_pool_async.py new file mode 100644 index 00000000..2a4f0960 --- /dev/null +++ b/python/psycopg/test/test_example_with_pool_async.py @@ -0,0 +1,16 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_with_connection_pool_async import main + + +# Smoke tests that our async example works fine +@pytest.mark.asyncio +async def test_example_with_pool_async(): + try: + await main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg/test/test_example_with_pool_concurrent.py b/python/psycopg/test/test_example_with_pool_concurrent.py new file mode 100644 index 00000000..73f0d9b8 --- /dev/null +++ b/python/psycopg/test/test_example_with_pool_concurrent.py @@ -0,0 +1,15 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_with_connection_pool_concurrent import main + + +# Smoke tests that our example works fine +def test_example_with_pool_concurrent(): + try: + main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg2/README.md b/python/psycopg2/README.md index 5b55f6b9..ea86c05d 100644 --- a/python/psycopg2/README.md +++ b/python/psycopg2/README.md @@ -81,12 +81,13 @@ source .venv/bin/activate # Linux, macOS 2. Install the required packages for running the examples: ```bash -pip install "boto3>=1.35.74" -pip install --prefer-binary "psycopg2-binary>=2.9" +pip install -r requirements.txt ``` ### Run the code +#### What the Examples Do + The example demonstrates the following operations: - Opening a connection to an Aurora DSQL cluster @@ -100,6 +101,15 @@ The example is designed to work with both admin and non-admin users: **Note:** running the example will use actual resources in your AWS account and may incur charges. +The extended examples demonstrate: +- Creating a connection pool for Aurora DSQL +- Using async context managers for connection management +- Performing database operations through the pool +- Running multiple concurrent database operations +- Proper resource management with connection pools + +#### Environment Cluster Details + Set environment variables for your cluster details: ```bash @@ -113,17 +123,25 @@ export CLUSTER_ENDPOINT="" export REGION="" ``` -Run the example: +#### Run the example: ```bash +# Run example directly python src/example.py + +# Run example using pytest +pytest ./test/test_example.py + +# Run all using pytest +pytest ./test ``` -The example contains comments explaining the code and the operations being performed. +The examples contain comments explaining the code and the operations being performed. ## Additional resources * [Amazon Aurora DSQL Documentation](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/what-is-aurora-dsql.html) +* [Amazon Aurora DSQL Python Connector Documentation](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/SECTION_program-with-dsql-connector-for-python.html) * [Psycopg2 Documentation](https://www.psycopg.org/docs/) * [AWS SDK for Python (Boto3) Documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) diff --git a/python/psycopg2/requirements.txt b/python/psycopg2/requirements.txt index a26cb7de..84ceb617 100644 --- a/python/psycopg2/requirements.txt +++ b/python/psycopg2/requirements.txt @@ -1,3 +1,4 @@ +aurora-dsql-python-connector>=0.2.0 boto3>=1.35.74 psycopg2-binary>=2.9 pytest>=8 diff --git a/python/psycopg2/src/alternatives/example_boto.py b/python/psycopg2/src/alternatives/example_boto.py new file mode 100644 index 00000000..da3323e4 --- /dev/null +++ b/python/psycopg2/src/alternatives/example_boto.py @@ -0,0 +1,116 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 + +--- + +boto3 client alternative to using the aurora-dsql-python-connector + +this method is NOT recommended, but should be used when custom drivers or infrastructure are +incompatible with the dsql connector +""" + +import boto3 +import psycopg2 +import psycopg2.extensions +import os +import sys + + +def create_connection(cluster_user, cluster_endpoint, region): + # Generate a fresh password token for each connection, to ensure the token is not expired + # when the connection is established + client = boto3.client("dsql", region_name=region) + + if cluster_user == "admin": + password_token = client.generate_db_connect_admin_auth_token(cluster_endpoint, region) + else: + password_token = client.generate_db_connect_auth_token(cluster_endpoint, region) + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "sslmode": "verify-full", + "sslrootcert": "./root.pem", + "password": password_token + } + + # Use the more efficient connection method if it's supported. + if psycopg2.extensions.libpq_version() >= 170000: + conn_params["sslnegotiation"] = "direct" + + # Make a connection to the cluster + conn = psycopg2.connect(**conn_params) + + if cluster_user == "admin": + schema = "public" + else: + schema = "myschema" + + try: + with conn.cursor() as cur: + cur.execute(f"SET search_path = {schema};") + conn.commit() + except Exception as e: + conn.close() + raise e + + return conn + + +def exercise_connection(conn): + conn.set_session(autocommit=True) + + cur = conn.cursor() + + cur.execute(""" + CREATE TABLE IF NOT EXISTS owner( + id uuid NOT NULL DEFAULT gen_random_uuid(), + name varchar(30) NOT NULL, + city varchar(80) NOT NULL, + telephone varchar(20) DEFAULT NULL, + PRIMARY KEY (id)) + """) + + # Insert some rows + cur.execute("INSERT INTO owner(name, city, telephone) VALUES('John Doe', 'Anytown', '555-555-1999')") + + cur.execute("SELECT * FROM owner WHERE name='John Doe'") + row = cur.fetchone() + + # Verify the result we got is what we inserted before + assert row[0] != None + assert row[1] == "John Doe" + assert row[2] == "Anytown" + assert row[3] == "555-555-1999" + + # Clean up the table after the example. If we run the example again + # we do not have to worry about data inserted by previous runs + cur.execute("DELETE FROM owner where name = 'John Doe'") + + +def main(): + conn = None + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert cluster_endpoint is not None, "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + + conn = create_connection(cluster_user, cluster_endpoint, region) + exercise_connection(conn) + finally: + if conn is not None: + conn.close() + + print("Connection exercised successfully") + + +if __name__ == "__main__": + main() diff --git a/python/psycopg2/src/example.py b/python/psycopg2/src/example.py index d2069852..83744d9c 100644 --- a/python/psycopg2/src/example.py +++ b/python/psycopg2/src/example.py @@ -1,28 +1,24 @@ -import boto3 +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import os import psycopg2 import psycopg2.extensions -import os -import sys +import aurora_dsql_psycopg2 as dsql def create_connection(cluster_user, cluster_endpoint, region): - # Generate a fresh password token for each connection, to ensure the token is not expired - # when the connection is established - client = boto3.client("dsql", region_name=region) - - if cluster_user == "admin": - password_token = client.generate_db_connect_admin_auth_token(cluster_endpoint, region) - else: - password_token = client.generate_db_connect_auth_token(cluster_endpoint, region) conn_params = { "dbname": "postgres", "user": cluster_user, "host": cluster_endpoint, "port": "5432", + "region": region, "sslmode": "verify-full", "sslrootcert": "./root.pem", - "password": password_token } # Use the more efficient connection method if it's supported. @@ -30,7 +26,7 @@ def create_connection(cluster_user, cluster_endpoint, region): conn_params["sslnegotiation"] = "direct" # Make a connection to the cluster - conn = psycopg2.connect(**conn_params) + conn = dsql.connect(**conn_params) if cluster_user == "admin": schema = "public" @@ -53,23 +49,29 @@ def exercise_connection(conn): cur = conn.cursor() - cur.execute(""" + cur.execute("DROP TABLE IF EXISTS owner") + + cur.execute( + """ CREATE TABLE IF NOT EXISTS owner( id uuid NOT NULL DEFAULT gen_random_uuid(), name varchar(30) NOT NULL, city varchar(80) NOT NULL, telephone varchar(20) DEFAULT NULL, PRIMARY KEY (id)) - """) + """ + ) # Insert some rows - cur.execute("INSERT INTO owner(name, city, telephone) VALUES('John Doe', 'Anytown', '555-555-1999')") + cur.execute( + "INSERT INTO owner(name, city, telephone) VALUES('John Doe', 'Anytown', '555-555-1999')" + ) cur.execute("SELECT * FROM owner WHERE name='John Doe'") row = cur.fetchone() # Verify the result we got is what we inserted before - assert row[0] != None + assert row[0] is not None assert row[1] == "John Doe" assert row[2] == "Anytown" assert row[3] == "555-555-1999" @@ -86,7 +88,9 @@ def main(): assert cluster_user is not None, "CLUSTER_USER environment variable is not set" cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) - assert cluster_endpoint is not None, "CLUSTER_ENDPOINT environment variable is not set" + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" region = os.environ.get("REGION", None) assert region is not None, "REGION environment variable is not set" diff --git a/python/psycopg2/src/example_with_connection_pool.py b/python/psycopg2/src/example_with_connection_pool.py new file mode 100644 index 00000000..f4c96bbe --- /dev/null +++ b/python/psycopg2/src/example_with_connection_pool.py @@ -0,0 +1,68 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import os +import aurora_dsql_psycopg2 as dsql + + +def connect_with_pool(cluster_user, cluster_endpoint, region): + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "region": region, + "sslmode": "verify-full", + "sslrootcert": ssl_cert_path, + } + + pool = dsql.AuroraDSQLThreadedConnectionPool( + minconn=2, + maxconn=8, + **conn_params, + ) + + # Use the pool as a context manager + with pool as p: + # Request a connection from the pool + conn = p.getconn() + try: + # Execute a query + with conn.cursor() as cur: + cur.execute("SELECT 1") + result = cur.fetchone() + print(f"Query result: {result}") + assert result[0] == 1 + finally: + # Return connection to pool + p.putconn(conn) + + +def main(): + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + connect_with_pool(cluster_user, cluster_endpoint, region) + finally: + pass + + print("Connection pool exercised successfully") + + +if __name__ == "__main__": + main() diff --git a/python/psycopg2/src/example_with_connection_pool_concurrent.py b/python/psycopg2/src/example_with_connection_pool_concurrent.py new file mode 100644 index 00000000..c1cdbf4b --- /dev/null +++ b/python/psycopg2/src/example_with_connection_pool_concurrent.py @@ -0,0 +1,90 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import os +import threading +import aurora_dsql_psycopg2 as dsql + + +def connect_with_pool_concurrent_connections(cluster_user, cluster_endpoint, region): + + ssl_cert_path = "./root.pem" + if not os.path.isfile(ssl_cert_path): + raise FileNotFoundError(f"SSL certificate file not found: {ssl_cert_path}") + + conn_params = { + "dbname": "postgres", + "user": cluster_user, + "host": cluster_endpoint, + "port": "5432", + "region": region, + "sslmode": "verify-full", + "sslrootcert": ssl_cert_path, + } + + pool = dsql.AuroraDSQLThreadedConnectionPool( + minconn=2, + maxconn=8, + **conn_params, + ) + + # Shared list to collect exceptions from worker threads + exceptions = [] + + def worker(thread_id): + try: + conn = pool.getconn() + try: + with conn.cursor() as cur: + cur.execute("SELECT %s", (thread_id,)) + result = cur.fetchone() + print(f"Thread {thread_id} result: {result}") + assert result[0] == thread_id + finally: + pool.putconn(conn) + except Exception as e: + print(f"Thread {thread_id} failed: {e}") + exceptions.append((thread_id, e)) # Store exception with thread ID + + NUM_THREADS = 8 + threads = [] + for i in range(NUM_THREADS): + thread = threading.Thread(target=worker, args=(i + 1,)) + threads.append(thread) + thread.start() + + # Wait for all threads + for thread in threads: + thread.join() + + # Check if any threads had exceptions + if exceptions: + print(f"Errors occurred in {len(exceptions)} threads:") + for thread_id, exc in exceptions: + print(f" Thread {thread_id}: {exc}") + raise RuntimeError(f"One or more worker threads failed: {exceptions}") + + +def main(): + try: + cluster_user = os.environ.get("CLUSTER_USER", None) + assert cluster_user is not None, "CLUSTER_USER environment variable is not set" + + cluster_endpoint = os.environ.get("CLUSTER_ENDPOINT", None) + assert ( + cluster_endpoint is not None + ), "CLUSTER_ENDPOINT environment variable is not set" + + region = os.environ.get("REGION", None) + assert region is not None, "REGION environment variable is not set" + connect_with_pool_concurrent_connections(cluster_user, cluster_endpoint, region) + finally: + pass + + print("Connection pool with concurrent connections exercised successfully") + + +if __name__ == "__main__": + main() diff --git a/python/psycopg2/test/alternatives/test_example_boto.py b/python/psycopg2/test/alternatives/test_example_boto.py new file mode 100644 index 00000000..4950fe0f --- /dev/null +++ b/python/psycopg2/test/alternatives/test_example_boto.py @@ -0,0 +1,16 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from alternatives.example_boto import main + +import pytest + + +# Smoke tests that our example works fine +def test_example(): + try: + main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg2/test/test_example.py b/python/psycopg2/test/test_example.py index c1ff694d..1fd5356b 100644 --- a/python/psycopg2/test/test_example.py +++ b/python/psycopg2/test/test_example.py @@ -1,6 +1,10 @@ -from example import main +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" import pytest +from example import main # Smoke tests that our example works fine diff --git a/python/psycopg2/test/test_example_with_pool.py b/python/psycopg2/test/test_example_with_pool.py new file mode 100644 index 00000000..f957f9be --- /dev/null +++ b/python/psycopg2/test/test_example_with_pool.py @@ -0,0 +1,15 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_with_connection_pool import main + + +# Smoke tests that our example works fine +def test_example_with_pool(): + try: + main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}") diff --git a/python/psycopg2/test/test_example_with_pool_concurrent.py b/python/psycopg2/test/test_example_with_pool_concurrent.py new file mode 100644 index 00000000..73f0d9b8 --- /dev/null +++ b/python/psycopg2/test/test_example_with_pool_concurrent.py @@ -0,0 +1,15 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +import pytest +from example_with_connection_pool_concurrent import main + + +# Smoke tests that our example works fine +def test_example_with_pool_concurrent(): + try: + main() + except Exception as e: + pytest.fail(f"Unexpected exception: {e}")