Skip to content

Commit 6e11c58

Browse files
Dev-iLclaude
andcommitted
Add statement-lifecycle regression test; ruff-format an existing test
Backs up the DEALLOCATE-removal in src/connection/impls.rs with a pg_prepared_statements-based check that prepared=false statements don't leak server-side, plus a dual that confirms parameterised prepared=true queries do persist through deadpool's StatementCache. Ride-along: a ruff-format reflow inside test_value_converter.py that the pre-commit hook had been wanting to apply. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent acb95b3 commit 6e11c58

2 files changed

Lines changed: 166 additions & 3 deletions

File tree

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
"""Server-side prepared-statement lifecycle regression.
2+
3+
Backs up the source-side change in `src/connection/impls.rs` that dropped the
4+
explicit `DEALLOCATE PREPARE` after non-cached prepares and started relying on
5+
tokio-postgres' `Drop for StatementInner` to send `Close('S', name) + Sync`
6+
when the last `Arc<StatementInner>` clone is dropped.
7+
8+
If that wiring ever regresses (e.g. an outstanding clone keeps a Statement
9+
alive past the consumer's `Result`), `pg_prepared_statements` will start
10+
holding entries we never explicitly cleared, and the server-side resource
11+
slowly grows. This test catches that by making sure a burst of non-cached
12+
prepares lands at the same backend connection at zero prepared statements
13+
after the calls return.
14+
"""
15+
16+
import pytest
17+
from psqlpy import ConnectionPool
18+
19+
pytestmark = pytest.mark.anyio
20+
21+
22+
async def _backend_pid(conn: object) -> int:
23+
"""Return the postgres backend PID for the given psqlpy Connection."""
24+
result = await conn.execute("SELECT pg_backend_pid()", prepared=False) # type: ignore[attr-defined]
25+
return int(result.result()[0]["pg_backend_pid"])
26+
27+
28+
async def _prepared_count_for_pid(
29+
pool: ConnectionPool,
30+
pid: int,
31+
) -> int:
32+
"""Count entries in `pg_prepared_statements` for the given backend PID.
33+
34+
Uses a separate connection to avoid the question being asked through the
35+
same prepared-statement cache it's measuring. `pg_prepared_statements` is
36+
a per-session view, so we look at the target session from the outside via
37+
`dblink`-free SQL: a regular query that filters on the saved PID by
38+
asking postgres to introspect its own state for that PID.
39+
"""
40+
other = await pool.connection()
41+
# `pg_prepared_statements` is per-session; from another session we have to
42+
# walk via the postgres stat views. There is no cross-session way to
43+
# enumerate another session's prepared statements with plain SQL — but we
44+
# can run the count in-band on the original connection by passing it a
45+
# query that doesn't itself enter the prepared-statement cache. We do
46+
# that in the caller; this helper is unused for the cross-session
47+
# variant, kept here for documentation only.
48+
_ = pid
49+
_ = other
50+
raise NotImplementedError(
51+
"pg_prepared_statements is per-session; query it on the same conn.",
52+
)
53+
54+
55+
async def test_non_cached_prepare_does_not_leak_server_side(
56+
postgres_host: str,
57+
postgres_user: str,
58+
postgres_password: str,
59+
postgres_port: int,
60+
postgres_dbname: str,
61+
) -> None:
62+
"""Non-cached prepares drop their Statement and send Close('S').
63+
64+
Sequence:
65+
1. Open one pooled connection.
66+
2. Run `SELECT 1` with `prepared=False` 50 times in a row on the same
67+
connection.
68+
3. From the same connection, count rows in `pg_prepared_statements`.
69+
If the DEALLOCATE-removal kept its end of the bargain (Statement
70+
Drop → Close), the count is zero. If statements leak, the count
71+
grows roughly with the number of calls.
72+
"""
73+
pool = ConnectionPool(
74+
username=postgres_user,
75+
password=postgres_password,
76+
host=postgres_host,
77+
port=postgres_port,
78+
db_name=postgres_dbname,
79+
max_db_pool_size=2,
80+
)
81+
try:
82+
connection = await pool.connection()
83+
84+
for _ in range(50):
85+
await connection.execute("SELECT 1", prepared=False)
86+
87+
# Same connection — `pg_prepared_statements` is per-session, so the
88+
# query has to ride the same backend. Use `prepared=False` here too
89+
# to avoid the introspection query itself populating the cache we're
90+
# measuring.
91+
leaked = await connection.execute(
92+
"SELECT count(*)::bigint AS n FROM pg_prepared_statements",
93+
prepared=False,
94+
)
95+
rows = leaked.result()
96+
assert len(rows) == 1, rows
97+
assert rows[0]["n"] == 0, (
98+
f"Expected 0 prepared statements after non-cached prepares, found "
99+
f"{rows[0]['n']}. This means tokio-postgres' Drop for "
100+
f"StatementInner did not send Close('S') — the DEALLOCATE-removal "
101+
f"in src/connection/impls.rs regressed."
102+
)
103+
finally:
104+
pool.close()
105+
106+
107+
async def test_cached_prepare_retains_statements_while_held(
108+
postgres_host: str,
109+
postgres_user: str,
110+
postgres_password: str,
111+
postgres_port: int,
112+
postgres_dbname: str,
113+
) -> None:
114+
"""`prepared=True` with parameters keeps the named statement alive.
115+
116+
Dual of the previous test for the path that *does* go through deadpool's
117+
`prepare_cached`: queries that carry parameters route through the
118+
StatementBuilder, which prepares a named statement and the
119+
`deadpool_postgres::StatementCache` holds an `Arc<Statement>` clone. The
120+
Statement is therefore not dropped after each call, and the cached
121+
server-side prepared statement persists for the lifetime of the pooled
122+
connection. Re-executing the same query string should reuse the same
123+
cache entry, so `pg_prepared_statements` for that statement text shows
124+
exactly one row no matter how many times we execute.
125+
126+
The no-parameter path (covered by `execute_no_params`) uses tokio's
127+
unnamed-prepared-statement shortcut and never populates the cache, so
128+
we deliberately use parameters here.
129+
"""
130+
pool = ConnectionPool(
131+
username=postgres_user,
132+
password=postgres_password,
133+
host=postgres_host,
134+
port=postgres_port,
135+
db_name=postgres_dbname,
136+
max_db_pool_size=2,
137+
)
138+
try:
139+
connection = await pool.connection()
140+
141+
# Parameterised → goes through StatementBuilder → prepare_cached.
142+
for _ in range(20):
143+
await connection.execute(
144+
"SELECT $1::int4 AS v",
145+
parameters=[7],
146+
prepared=True,
147+
)
148+
149+
result = await connection.execute(
150+
"SELECT count(*)::bigint AS n FROM pg_prepared_statements "
151+
"WHERE statement LIKE 'SELECT $1::int4 AS v'",
152+
prepared=False,
153+
)
154+
rows = result.result()
155+
assert len(rows) == 1
156+
assert rows[0]["n"] == 1, (
157+
f"Expected exactly 1 cached prepared statement for the "
158+
f"parameterised query, found {rows[0]['n']}. Either the "
159+
f"deadpool StatementCache stopped reusing per-query entries or "
160+
f"the prepared=True path stopped going through prepare_cached."
161+
)
162+
finally:
163+
pool.close()

python/tests/test_value_converter.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -659,9 +659,9 @@ async def test_char_internal_type_byte_spectrum(
659659
value = decoded[i]
660660
assert isinstance(value, str)
661661
assert len(value) == 1
662-
assert (
663-
ord(value) == b
664-
), f"byte 0x{b:02x} round-tripped to ord(value)=0x{ord(value):02x}"
662+
assert ord(value) == b, (
663+
f"byte 0x{b:02x} round-tripped to ord(value)=0x{ord(value):02x}"
664+
)
665665
assert decoded[len(bytes_under_test)] is None
666666

667667

0 commit comments

Comments
 (0)