Skip to content

Commit 46a74ef

Browse files
committed
chore: use cancel_safe from psycopg's v3.3.2
1 parent 3ae4538 commit 46a74ef

17 files changed

Lines changed: 160 additions & 27 deletions

.github/workflows/integration_tests.yml

Lines changed: 85 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,105 @@
11
name: Integration Tests
22

33
on:
4-
workflow_dispatch:
54
push:
65
branches:
76
- main
7+
- chore/run-integration-tests-on-codebuild
8+
paths-ignore:
9+
- '**/*.md'
10+
- '**/*.jpg'
11+
- '**/README.txt'
12+
- '**/LICENSE.txt'
13+
- 'docs/**'
14+
- 'ISSUE_TEMPLATE/**'
15+
- '**/remove-old-artifacts.yml'
16+
pull_request_target:
17+
branches:
18+
- main
19+
paths-ignore:
20+
- '**/*.md'
21+
- '**/*.jpg'
22+
- '**/README.txt'
23+
- '**/LICENSE.txt'
24+
- 'docs/**'
25+
- 'ISSUE_TEMPLATE/**'
26+
- '**/remove-old-artifacts.yml'
27+
workflow_dispatch:
28+
inputs:
29+
ref:
30+
description: 'Git ref (branch, tag, or SHA) to test'
31+
required: false
32+
type: string
33+
repository:
34+
description: 'Repository to test in the form owner/repo (use for fork branches)'
35+
required: false
36+
type: string
837

938
permissions:
1039
id-token: write # This is required for requesting the JWT
1140
contents: read # This is required for actions/checkout
1241

1342
jobs:
43+
approve:
44+
# Auto-approve for non-fork scenarios (push, manual dispatch, or PR from the same repo)
45+
if: >
46+
github.event_name == 'push' ||
47+
github.event_name == 'workflow_dispatch' ||
48+
github.event.pull_request.head.repo.full_name == github.repository
49+
runs-on: ubuntu-latest
50+
steps:
51+
- run: echo "Approved - not a fork PR"
52+
53+
approve-fork:
54+
# Require manual maintainer approval (via the integration-tests environment) for fork PRs
55+
if: >
56+
github.event_name == 'pull_request_target' &&
57+
github.event.pull_request.head.repo.full_name != github.repository
58+
runs-on: ubuntu-latest
59+
environment: integration-tests
60+
steps:
61+
- run: echo "Fork PR approved by maintainer"
62+
1463
lts-integration-tests:
1564
name: Run LTS Integration Tests
65+
needs: [ approve, approve-fork ]
66+
if: |
67+
always() &&
68+
(needs.approve.result == 'success' || needs.approve-fork.result == 'success') &&
69+
!(needs.approve.result == 'failure' || needs.approve-fork.result == 'failure')
1670
runs-on: ubuntu-latest
1771
strategy:
1872
fail-fast: false
1973
matrix:
20-
python-version: [ "3.11", "3.12", "3.13" ]
21-
environment: [ "mysql", "pg" ]
74+
python-version: [ "3.11" ]
75+
environment: [ "pg" ]
2276

2377
steps:
2478
- name: 'Clone repository'
2579
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
80+
with:
81+
ref: ${{ github.event.pull_request.head.sha || github.sha }}
2682

2783
- name: 'Set up JDK 8'
2884
uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5
2985
with:
3086
distribution: 'corretto'
3187
java-version: 8
3288

89+
- name: 'Set up Python ${{ matrix.python-version }}'
90+
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
91+
with:
92+
python-version: ${{ matrix.python-version }}
93+
3394
- name: Install poetry
3495
shell: bash
3596
run: |
36-
pipx install poetry==1.8.2
37-
poetry config virtualenvs.prefer-active-python true
97+
python -m pip install --upgrade pip
98+
python -m pip install pipx
99+
python -m pipx install poetry==1.8.2
100+
PIPX_BIN_DIR=$(python -m pipx environment --value PIPX_BIN_DIR)
101+
echo "$PIPX_BIN_DIR" >> "$GITHUB_PATH"
102+
"$PIPX_BIN_DIR/poetry" config virtualenvs.prefer-active-python true
38103
39104
- name: Install dependencies
40105
run: poetry install
@@ -71,24 +136,35 @@ jobs:
71136
strategy:
72137
fail-fast: false
73138
matrix:
74-
python-version: [ "3.11", "3.12", "3.13" ]
75-
environment: [ "mysql", "pg" ]
139+
python-version: [ "3.11" ]
140+
environment: [ "pg" ]
76141

77142
steps:
78143
- name: 'Clone repository'
79144
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
145+
with:
146+
ref: ${{ github.event.pull_request.head.sha || github.sha }}
80147

81148
- name: 'Set up JDK 8'
82149
uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5
83150
with:
84151
distribution: 'corretto'
85152
java-version: 8
86153

154+
- name: 'Set up Python ${{ matrix.python-version }}'
155+
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
156+
with:
157+
python-version: ${{ matrix.python-version }}
158+
87159
- name: Install poetry
88160
shell: bash
89161
run: |
90-
pipx install poetry==1.8.2
91-
poetry config virtualenvs.prefer-active-python true
162+
python -m pip install --upgrade pip
163+
python -m pip install pipx
164+
python -m pipx install poetry==1.8.2
165+
PIPX_BIN_DIR=$(python -m pipx environment --value PIPX_BIN_DIR)
166+
echo "$PIPX_BIN_DIR" >> "$GITHUB_PATH"
167+
"$PIPX_BIN_DIR/poetry" config virtualenvs.prefer-active-python true
92168
93169
- name: Install dependencies
94170
run: poetry install

aws_advanced_python_wrapper/aurora_connection_tracker_plugin.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
Optional, Set)
2222

2323
from aws_advanced_python_wrapper.utils.notifications import HostEvent
24+
from aws_advanced_python_wrapper.utils import services_container
2425
from aws_advanced_python_wrapper.utils.utils import Utils
2526

2627
if TYPE_CHECKING:
@@ -52,6 +53,7 @@ class OpenedConnectionTracker:
5253
_shutdown_event: ClassVar[threading.Event] = threading.Event()
5354
_safe_to_check_closed_classes: ClassVar[Set[str]] = {"psycopg"}
5455
_default_sleep_time: ClassVar[int] = 30
56+
_invalidate_executor_name: ClassVar[str] = "OpenedConnectionTrackerInvalidate"
5557

5658
@classmethod
5759
def _start_prune_thread(cls):
@@ -166,9 +168,12 @@ def invalidate_all_connections(self, host_info: Optional[HostInfo] = None, host:
166168
with self._lock:
167169
connection_set: Optional[WeakSet] = self._opened_connections.get(instance_endpoint)
168170
connections_list = list(connection_set) if connection_set is not None else None
171+
if connection_set is not None:
172+
connection_set.clear()
173+
self._opened_connections.pop(instance_endpoint, None)
169174

170-
if connections_list is not None:
171-
self._log_connection_set(instance_endpoint, connection_set)
175+
if connections_list:
176+
self._log_connection_set(instance_endpoint, connections_list)
172177
self._invalidate_connections(connections_list)
173178

174179
def remove_connection_tracking(self, host_info: HostInfo, connection: Connection | None):
@@ -211,9 +216,8 @@ def _task(connections_list: list):
211216
pass
212217

213218
def _invalidate_connections(self, connections_list: list):
214-
invalidate_connection_thread: Thread = Thread(daemon=True, target=self._task,
215-
args=[connections_list]) # type: ignore
216-
invalidate_connection_thread.start()
219+
executor = services_container.get_thread_pool(self._invalidate_executor_name, drain_first=True)
220+
executor.submit(self._task, connections_list)
217221

218222
def log_opened_connections(self):
219223
with self._lock:

aws_advanced_python_wrapper/host_list_provider.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,12 @@ def __init__(self, dialect: db_dialect.TopologyAwareDatabaseDialect, props: Prop
464464

465465
self.instance_template: HostInfo = instance_template
466466
self._max_timeout_sec = WrapperProperties.AUXILIARY_QUERY_TIMEOUT_SEC.get_int(props)
467-
self._thread_pool = services_container.get_thread_pool(self._executor_name)
467+
# Need to drain topology query pools before monitor services shut down.
468+
self._thread_pool = services_container.get_thread_pool(self._executor_name, drain_first=True)
469+
470+
@classmethod
471+
def release_resources(cls, wait: bool = True) -> bool:
472+
return services_container.release_thread_pool(cls._executor_name, wait=wait)
468473

469474
def _validate_host_pattern(self, host: str):
470475
if not self._rds_utils.is_dns_pattern_valid(host):
@@ -495,8 +500,9 @@ def query_for_topology(
495500
:return: a tuple of :py:class:`HostInfo` objects representing the database topology. If the query results did not include a writer instance,
496501
an empty tuple will be returned.
497502
"""
503+
thread_pool = services_container.get_thread_pool(self._executor_name, drain_first=True)
498504
query_for_topology_func_with_timeout = preserve_transaction_status_with_timeout(
499-
self._thread_pool, self._max_timeout_sec, driver_dialect, conn)(self._query_for_topology)
505+
thread_pool, self._max_timeout_sec, driver_dialect, conn)(self._query_for_topology)
500506
x = query_for_topology_func_with_timeout(conn)
501507
return x
502508

aws_advanced_python_wrapper/pg_driver_dialect.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ def is_closed(self, conn: Connection) -> bool:
7373

7474
def abort_connection(self, conn: Connection):
7575
if isinstance(conn, psycopg.Connection):
76+
try:
77+
conn.cancel_safe()
78+
except Exception:
79+
# Best effort cancel
80+
pass
7681
conn.close()
7782
return
7883
raise UnsupportedOperationError(

aws_advanced_python_wrapper/utils/services_container.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import threading
1818
from concurrent.futures import ThreadPoolExecutor
1919
from datetime import timedelta
20-
from typing import Dict, Optional
20+
from typing import Dict, Optional, Set
2121

2222
from aws_advanced_python_wrapper.allowed_and_blocked_hosts import \
2323
AllowedAndBlockedHosts
@@ -37,6 +37,10 @@ def __init__(self) -> None:
3737
self._storage_service: Optional[StorageService] = None
3838
self._monitor_service: Optional[MonitorService] = None
3939
self._thread_pools: Dict[str, ThreadPoolExecutor] = {}
40+
# Some service pools must be drained BEFORE the monitor service is released and connections are closed.
41+
# This prevents worker threads like the topology util threads from continuing to using connections
42+
# after they are closed, causing segfaults.
43+
self._drain_first_pools: Set[str] = set()
4044
self._lock = threading.Lock()
4145

4246
def _ensure_initialized(self) -> None:
@@ -63,19 +67,24 @@ def monitor_service(self) -> MonitorService:
6367
self._ensure_initialized()
6468
return self._monitor_service # type: ignore
6569

66-
def get_thread_pool(self, name: str, max_workers: Optional[int] = None) -> ThreadPoolExecutor:
70+
def get_thread_pool(self, name: str, max_workers: Optional[int] = None, drain_first: bool = False) -> ThreadPoolExecutor:
6771
pool = self._thread_pools.get(name)
6872
if pool is not None:
73+
if drain_first:
74+
self._drain_first_pools.add(name)
6975
return pool
7076
with self._lock:
7177
if name not in self._thread_pools:
7278
self._thread_pools[name] = ThreadPoolExecutor(
7379
max_workers=max_workers, thread_name_prefix=name)
80+
if drain_first:
81+
self._drain_first_pools.add(name)
7482
return self._thread_pools[name]
7583

7684
def release_thread_pool(self, name: str, wait: bool = True) -> bool:
7785
with self._lock:
7886
pool = self._thread_pools.pop(name, None)
87+
self._drain_first_pools.discard(name)
7988
if pool is not None:
8089
try:
8190
pool.shutdown(wait=wait)
@@ -85,6 +94,18 @@ def release_thread_pool(self, name: str, wait: bool = True) -> bool:
8594
return False
8695

8796
def release_resources(self) -> None:
97+
# Some thread pools need to be drained first before shutting down the monitor services.
98+
# This prevents segfaults when monitor services shut down and close all the active monitoring connections.
99+
with self._lock:
100+
drain_names = list(self._drain_first_pools)
101+
for name in drain_names:
102+
pool = self._thread_pools.get(name)
103+
if pool is not None:
104+
try:
105+
pool.shutdown(wait=True)
106+
except Exception as e:
107+
logger.debug("CoreServices.ErrorShuttingDownPool", name, e)
108+
88109
if self._monitor_service is not None:
89110
try:
90111
self._monitor_service.release_resources()
@@ -114,6 +135,7 @@ def release_resources(self) -> None:
114135
except Exception as e:
115136
logger.debug("CoreServices.ErrorShuttingDownPool", name, e)
116137
self._thread_pools.clear()
138+
self._drain_first_pools.clear()
117139

118140

119141
_instance = _ServicesContainer()
@@ -132,8 +154,8 @@ def get_monitor_service() -> MonitorService:
132154
return _instance.monitor_service
133155

134156

135-
def get_thread_pool(name: str, max_workers: Optional[int] = None) -> ThreadPoolExecutor:
136-
return _instance.get_thread_pool(name, max_workers)
157+
def get_thread_pool(name: str, max_workers: Optional[int] = None, drain_first: bool = False) -> ThreadPoolExecutor:
158+
return _instance.get_thread_pool(name, max_workers, drain_first)
137159

138160

139161
def release_thread_pool(name: str, wait: bool = True) -> bool:

poetry.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ flake8-type-checking = ">=2.9,<4.0"
4141
isort = "^5.13.2"
4242
pep8-naming = ">=0.14.1,<0.16.0"
4343
SQLAlchemy = "^2.0.30"
44-
psycopg = "^3.3.1"
45-
psycopg-binary = "^3.3.1"
44+
psycopg = "^3.3.2"
45+
psycopg-binary = "^3.3.2"
4646
mysql-connector-python = "^9.5.0"
4747
django = "^5.0"
4848
django-stubs = "^5.2.8"
@@ -60,8 +60,8 @@ pytest-html-merger = ">=0.0.10,<0.1.1"
6060
toxiproxy-python = "^0.1.1"
6161
parameterized = "^0.9.0"
6262
tabulate = ">=0.9,<0.11"
63-
psycopg = "^3.3.1"
64-
psycopg-binary = "^3.3.1"
63+
psycopg = "^3.3.2"
64+
psycopg-binary = "^3.3.2"
6565
mysql-connector-python = "^9.5.0"
6666
opentelemetry-exporter-otlp = "^1.22.0"
6767
opentelemetry-exporter-otlp-proto-grpc = "^1.22.0"

tests/integration/container/test_aurora_failover.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def props(self):
7575
"connect_timeout": 10,
7676
"monitoring-connect_timeout": 5,
7777
"monitoring-socket_timeout": 5,
78+
"auxiliary_query_timeout_sec": 3,
7879
"autocommit": True,
7980
"cluster_id": "cluster1"
8081
})

tests/integration/container/test_autoscaling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def rds_utils(self):
5151

5252
@pytest.fixture
5353
def props(self):
54-
p: Properties = Properties({"plugins": "read_write_splitting", "connect_timeout": 10, "autocommit": True})
54+
p: Properties = Properties({"plugins": "read_write_splitting", "connect_timeout": 10, "auxiliary_query_timeout_sec": 3, "autocommit": True})
5555

5656
if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in TestEnvironment.get_current().get_features() \
5757
or TestEnvironmentFeatures.TELEMETRY_METRICS_ENABLED in TestEnvironment.get_current().get_features():

tests/integration/container/test_aws_secrets_manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ def test_failover_with_secrets_manager(
221221
"connect_timeout": 10,
222222
"monitoring-connect_timeout": 5,
223223
"monitoring-socket_timeout": 5,
224+
"auxiliary_query_timeout_sec": 3,
224225
"topology_refresh_ms": 10,
225226
})
226227

0 commit comments

Comments
 (0)