Skip to content

Commit 56b1aae

Browse files
author
g2px1
committed
pgpool: safe connection recycling, async release path, COPY/cursor streaming docs
1 parent 12184a0 commit 56b1aae

13 files changed

Lines changed: 1791 additions & 508 deletions

File tree

docs/pool.md

Lines changed: 263 additions & 186 deletions
Large diffs are not rendered by default.

docs/results.md

Lines changed: 243 additions & 145 deletions
Large diffs are not rendered by default.

docs/transaction.md

Lines changed: 143 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,166 +1,213 @@
11
# PgTransaction
22

3-
`PgTransaction` provides coroutine-safe transaction management on top of `PgPool`.
4-
It wraps a single connection and supports `BEGIN`, `COMMIT`, `ROLLBACK`, and `ABORT` with full error transparency.
3+
`PgTransaction` provides coroutine-friendly transactional access to PostgreSQL.
54

6-
---
7-
8-
## Overview
5+
It:
96

10-
Each transaction owns one dedicated connection from the pool.
11-
All operations (`BEGIN`, `COMMIT`, `ROLLBACK`, `ABORT`) are asynchronous and return structured `QueryResult` or `bool`.
7+
- Acquires a dedicated connection from `PgPool`
8+
- Sends `BEGIN` with optional isolation / read-only / deferrable settings
9+
- Executes multiple queries on the same connection
10+
- Commits or rolls back
11+
- Returns (or retires) the connection back to the pool
1212

13-
After the transaction ends (committed, rolled back, or aborted), the connection is automatically released back to the
14-
pool.
13+
All operations are async and return `Awaitable<...>`.
1514

1615
---
1716

18-
## Example
17+
## Basic usage
1918

2019
```cpp
21-
#include "upq/PgTransaction.h"
22-
23-
using namespace usub;
24-
25-
uvent::task::Awaitable<void> transfer_example()
20+
task::Awaitable<void> do_transfer()
2621
{
27-
pg::PgTransaction txn;
22+
usub::pg::PgTransaction txn;
2823

29-
if (!co_await txn.begin())
24+
bool ok_begin = co_await txn.begin();
25+
if (!ok_begin)
3026
{
31-
std::cout << "Transaction begin failed\n";
32-
co_await txn.finish();
27+
std::cout << "[ERROR] BEGIN failed\n";
3328
co_return;
3429
}
3530

36-
auto res = co_await txn.query(
37-
"UPDATE users SET balance = balance - $1 WHERE id = $2 RETURNING balance;",
38-
100.0, 42
31+
auto r1 = co_await txn.query(
32+
"UPDATE accounts SET balance = balance - $1 WHERE id = $2 RETURNING balance;",
33+
100, 1
34+
);
35+
36+
auto r2 = co_await txn.query(
37+
"UPDATE accounts SET balance = balance + $1 WHERE id = $2 RETURNING balance;",
38+
100, 2
3939
);
4040

41-
if (!res.ok)
41+
if (!r1.ok || !r2.ok)
4242
{
43-
std::cout << "Query failed: " << res.error << "\n";
44-
co_await txn.abort(); // soft abort instead of rollback
45-
co_await txn.finish();
43+
std::cout << "[ERROR] transfer failed, rolling back\n";
44+
co_await txn.rollback();
4645
co_return;
4746
}
4847

49-
if (!co_await txn.commit())
48+
bool ok_commit = co_await txn.commit();
49+
if (!ok_commit)
5050
{
51-
std::cout << "Commit failed — aborting\n";
52-
co_await txn.abort();
51+
std::cout << "[ERROR] COMMIT failed\n";
52+
co_return;
5353
}
5454

55-
co_await txn.finish();
55+
std::cout << "[OK] transfer complete\n";
5656
co_return;
5757
}
5858
```
5959

6060
---
6161

62-
## API summary
62+
## Transaction config
6363

64-
| Method | Description |
65-
|-----------------------|---------------------------------------------------|
66-
| `begin()` | Starts a new transaction (`BEGIN`). |
67-
| `query(sql, args...)` | Executes a query within the active transaction. |
68-
| `commit()` | Commits the transaction (`COMMIT`). |
69-
| `rollback()` | Performs an explicit rollback (`ROLLBACK`). |
70-
| `abort()` | Performs a soft abort (`ABORT` if connected). |
71-
| `finish()` | Ensures cleanup; calls `abort()` if still active. |
64+
You can control isolation level, read-only mode, and deferrable mode:
7265

73-
---
66+
```cpp
67+
usub::pg::PgTransactionConfig cfg{
68+
.isolation = usub::pg::TxIsolationLevel::Serializable,
69+
.read_only = false,
70+
.deferrable = false
71+
};
72+
73+
usub::pg::PgTransaction txn(&usub::pg::PgPool::instance(), cfg);
74+
bool ok_begin = co_await txn.begin();
75+
```
7476
75-
## `abort()`
77+
Generated `BEGIN` looks like:
7678
77-
`abort()` safely terminates a transaction using a lightweight approach.
79+
* isolation level:
7880
79-
* If the connection is alive, sends the PostgreSQL command `ABORT`
80-
(an alias of `ROLLBACK`).
81-
* If the connection is already broken, marks the transaction as rolled back
82-
and releases the connection locally.
81+
* `READ COMMITTED`
82+
* `REPEATABLE READ`
83+
* `SERIALIZABLE`
84+
* mode: `READ WRITE` or `READ ONLY`
85+
* optionally `DEFERRABLE`
8386
84-
Used when:
87+
---
8588
86-
* Coroutine cancelled mid-transaction
87-
* Connection lost (`ConnectionClosed`)
88-
* You want to discard transaction without waiting for a full rollback
89+
## Querying inside a transaction
8990
9091
```cpp
91-
if (res.code == PgErrorCode::ConnectionClosed)
92+
auto qr = co_await txn.query(
93+
"UPDATE users SET name = $1 WHERE id = $2 RETURNING name;",
94+
"John", 1
95+
);
96+
97+
if (!qr.ok)
9298
{
93-
std::cout << "Connection lost — aborting transaction\n";
94-
co_await txn.abort();
95-
co_await txn.finish();
99+
std::cout << "[ERROR] update failed: " << qr.error << "\n";
100+
co_await txn.rollback();
96101
co_return;
97102
}
98103
```
99104

105+
* `txn.query(...)` calls `PgPool::query_on()` on the same pinned connection.
106+
* You always read the result via standard `QueryResult`.
107+
108+
If the underlying connection is dropped mid-transaction (e.g. network failure), `txn.query()` returns with:
109+
110+
* `ok = false`
111+
* `code = PgErrorCode::ConnectionClosed`
112+
and the transaction is automatically marked inactive.
113+
100114
---
101115

102-
## Error propagation
116+
## Commit / rollback
103117

104-
Transactions propagate structured errors with `PgErrorCode`:
118+
```cpp
119+
bool ok_commit = co_await txn.commit();
120+
if (!ok_commit)
121+
std::cout << "[ERROR] commit failed\n";
122+
```
105123

106-
| Condition | `PgErrorCode` | Notes |
107-
|----------------------|--------------------|--------------------------------|
108-
| Inactive transaction | `InvalidFuture` | No `BEGIN` or already finished |
109-
| Connection dropped | `ConnectionClosed` | Socket or PGconn lost mid-txn |
110-
| Server-side error | `ServerError` | SQLSTATE and details available |
111-
| Commit failure | `ServerError` | COMMIT rejected or timed out |
112-
| Abort failure | `SocketReadFailed` | Connection failed during abort |
124+
```cpp
125+
co_await txn.rollback();
126+
```
113127

114-
Example:
128+
There is also:
115129

116130
```cpp
117-
auto res = co_await txn.query("UPDATE accounts SET ...;");
118-
if (!res.ok) {
119-
std::cout
120-
<< "[TXN ERROR] code=" << (uint32_t)res.code
121-
<< " msg=" << res.error
122-
<< " sqlstate=" << res.server_sqlstate << std::endl;
123-
}
131+
co_await txn.abort(); // uses ABORT instead of ROLLBACK
132+
co_await txn.finish(); // "best effort cleanup": rollback if still active
124133
```
125134

126135
---
127136

128-
## `finish()`
137+
## Connection handoff back to the pool
129138

130-
`finish()` finalizes the transaction and always returns the connection to the pool.
131-
If the transaction is still active, it automatically calls `abort()`.
139+
Internally, `PgTransaction` uses a dedicated connection (acquired via `PgPool::acquire_connection()`).
132140

133-
This means you can safely call `finish()` in every exit path:
141+
When the transaction ends (`commit`, `rollback`, `finish`, `abort`):
142+
143+
* It calls `PgPool::release_connection_async(conn)`.
144+
145+
`release_connection_async` is important:
146+
147+
* It *awaits* internal cleanup on that connection.
148+
* It drains any remaining `PGresult`s from libpq.
149+
* Only after the connection is known to be idle/clean, it is re-queued into the pool for reuse.
150+
151+
If the connection is already in a bad state (disconnected, protocol error, etc.), the pool will not recycle it — it will
152+
retire it and decrement `live_count_`. The object will eventually be destroyed (closing the socket, calling `PQfinish`).
153+
154+
This design guarantees:
155+
156+
* A transaction will never return a “dirty” connection to the pool.
157+
* No other coroutine will later see `"another command is already in progress"` because of leftover results from your
158+
transaction, COMMIT, or ROLLBACK.
159+
160+
---
161+
162+
## Subtransactions (SAVEPOINT)
163+
164+
`PgTransaction` supports savepoints for partial rollback:
134165

135166
```cpp
136-
pg::PgTransaction txn;
137-
if (!co_await txn.begin()) co_return;
167+
auto sub = txn.make_subtx();
138168

139-
// ...
169+
bool ok_sub_begin = co_await sub.begin();
170+
if (!ok_sub_begin)
171+
{
172+
std::cout << "[ERROR] SAVEPOINT begin failed\n";
173+
// still inside main txn, but subtx didn't start
174+
}
175+
176+
auto r_inner = co_await sub.query(
177+
"UPDATE ledger SET amount = amount + $1 WHERE id = $2 RETURNING amount;",
178+
500, 42
179+
);
140180

141-
co_await txn.finish(); // always safe
181+
if (!r_inner.ok)
182+
{
183+
std::cout << "[WARN] inner update failed, rolling back subtx\n";
184+
co_await sub.rollback();
185+
}
186+
else
187+
{
188+
bool ok_sub_commit = co_await sub.commit();
189+
if (!ok_sub_commit)
190+
std::cout << "[WARN] subtx commit failed\n";
191+
}
142192
```
143193

144-
---
194+
Semantics:
145195

146-
## Lifecycle
196+
* `begin()` issues `SAVEPOINT <name>`
197+
* `commit()` issues `RELEASE SAVEPOINT <name>`
198+
* `rollback()` issues `ROLLBACK TO SAVEPOINT <name>`
147199

148-
| Stage | Method | Description |
149-
|----------|--------------|------------------------------------------------|
150-
| Start | `begin()` | Sends `BEGIN;` and marks active |
151-
| Execute | `query()` | Runs statements on same connection |
152-
| Commit | `commit()` | Sends `COMMIT;` |
153-
| Abort | `abort()` | Sends `ABORT;` or marks rolled back |
154-
| Rollback | `rollback()` | Sends `ROLLBACK;` explicitly |
155-
| Finish | `finish()` | Releases connection (uses `abort()` if active) |
200+
All these run on the same underlying `PGconn`.
156201

157202
---
158203

159-
## Design intent
160-
161-
`abort()` fills the gap between graceful rollback and hard disconnect recovery.
162-
It lets coroutines cancel transactions fast and deterministically without waiting for PostgreSQL response in broken or
163-
cancelled states.
204+
## Summary
164205

165-
`finish()` now consistently defers to `abort()` for safe cleanup,
166-
making the transaction API idempotent and fault-tolerant.
206+
| Feature | Description |
207+
|-----------------------------|----------------------------------------------------------------|
208+
| Dedicated connection | Each `PgTransaction` pins one physical connection |
209+
| Async begin/commit/rollback | All blocking points are coroutine suspension |
210+
| Safe return to pool | Connection is drained via `release_connection_async` |
211+
| Automatic invalidation | Lost connection marks the transaction as rolled back |
212+
| Subtransactions (SAVEPOINT) | Fine-grained rollback without aborting the parent transaction |
213+
| Structured results | All queries return `QueryResult` with `ok`, `code`, `error`, … |

0 commit comments

Comments
 (0)