Skip to content

Commit ed9d27e

Browse files
committed
added params support
1 parent 015b806 commit ed9d27e

6 files changed

Lines changed: 258 additions & 3 deletions

File tree

docs/large-exports.md

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,61 @@ co_await tx.commit();
9090
The stream returned from a transaction does **not** commit on `close()` —
9191
lifetime is bound to `tx`.
9292
93+
### Bind parameters (`$1, $2, …`)
94+
95+
`stream` and `stream_reflect` accept variadic arguments after `batch_size`,
96+
forwarded to the cursor's underlying `SELECT` via libpq's extended protocol
97+
(`PQsendQueryParams`). Postgres binds them inside the `DECLARE … CURSOR FOR …`
98+
in a single round trip — no manual `PREPARE`, no string interpolation.
99+
100+
```cpp
101+
const int64_t lower = 100000;
102+
const int64_t upper = 100100;
103+
104+
auto sres = co_await pool.stream_reflect<BigRow>(
105+
"SELECT id, name, tag FROM big_table "
106+
"WHERE id BETWEEN $1 AND $2 ORDER BY id",
107+
1000, lower, upper);
108+
```
109+
110+
The first non-SQL argument is `batch_size` (required, no default in this
111+
overload — pass `1000` or whatever you'd otherwise default to). Everything
112+
after it is passed positionally as `$1`, `$2`, …. Argument count is checked
113+
at runtime via `assert` against `count_pg_params(sql)` — same path the
114+
existing `query_awaitable(sql, args...)` uses.
115+
116+
Mixing with the pipeline works the same as without parameters — the
117+
returned stream is still a normal `PgRowStream` / `PgTypedRowStream<T>`:
118+
119+
```cpp
120+
namespace ps = usub::pg::stream;
121+
122+
auto s = co_await pool.stream_reflect<BigRow>(
123+
"SELECT id, name, tag FROM big_table "
124+
"WHERE id > $1 AND tag IS NOT NULL ORDER BY id",
125+
2000, int64_t{50000});
126+
127+
int64_t sum = co_await(
128+
std::move(*s)
129+
| ps::filter([](const BigRow& r) { return r.id % 2 == 0; })
130+
| ps::take(1000)
131+
| ps::reduce(int64_t{0}, [](int64_t acc, BigRow r) { return acc + r.id; })
132+
);
133+
```
134+
135+
The same templated overloads are also on `PgTransaction::stream` and
136+
`PgTransaction::stream_reflect`.
137+
138+
#### What about `copy_to` / `copy_from`?
139+
140+
`COPY tablename TO STDOUT` does not accept parameters in its grammar, and
141+
`COPY (SELECT ... WHERE x = $1) TO STDOUT` cannot be combined cleanly with
142+
libpq's extended-protocol path because libpq does not switch into COPY mode
143+
after `PQsendQueryParams`. If you need a parameterised export, use
144+
`pool.stream(sql, batch_size, args…)` instead — the row-by-row path is the
145+
intended substitute. `COPY FROM STDIN` has no place for parameters at all,
146+
so `copy_from` / `copy_from_buffer` only take SQL.
147+
93148
---
94149

95150
## 2. `copy_to` — streaming COPY OUT
@@ -452,18 +507,26 @@ what `pg_dump` uses for a reason.
452507
task::Awaitable<expected<PgRowStream, PgOpError>>
453508
pool.stream(sql, batch_size = 1000);
454509

510+
template<typename... Args>
511+
task::Awaitable<expected<PgRowStream, PgOpError>>
512+
pool.stream(sql, batch_size, args...); // $1, $2, ... bound via PQsendQueryParams
513+
455514
template<class T>
456515
task::Awaitable<expected<PgTypedRowStream<T>, PgOpError>>
457516
pool.stream_reflect<T>(sql, batch_size = 1000);
458517

459-
// COPY OUT
518+
template<class T, typename... Args>
519+
task::Awaitable<expected<PgTypedRowStream<T>, PgOpError>>
520+
pool.stream_reflect<T>(sql, batch_size, args...);
521+
522+
// COPY OUT (no parameter binding — see section 1)
460523
template<class Sink> // (string_view) -> Awaitable<bool>
461524
task::Awaitable<PgCopyResult> pool.copy_to(sql, sink);
462525

463526
template<class LineSink> // (string_view line) -> Awaitable<bool>
464527
task::Awaitable<PgCopyResult> pool.copy_to_lines(sql, line_sink);
465528

466-
// COPY IN
529+
// COPY IN (no parameter binding — grammar does not allow it)
467530
template<class Source> // () -> Awaitable<string_view> (empty = EOF)
468531
task::Awaitable<PgCopyResult> pool.copy_from(sql, source);
469532

examples/large_exports.cpp

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,74 @@ task::Awaitable<void> demo_pipeline(PgPool& pool) {
330330
}
331331
}
332332

333+
task::Awaitable<void> demo_stream_with_params(PgPool& pool) {
334+
const int64_t lower = 100000;
335+
const int64_t upper = 100100;
336+
337+
auto sres = co_await pool.stream_reflect<BigRow>(
338+
"SELECT id, name, tag FROM big_table "
339+
"WHERE id BETWEEN $1 AND $2 ORDER BY id",
340+
1000, lower, upper);
341+
342+
if (!sres) {
343+
std::cerr << "[params] stream open failed: "
344+
<< sres.error().error << "\n";
345+
co_return;
346+
}
347+
348+
uint64_t got = 0;
349+
int64_t sum_id = 0;
350+
while (auto row = co_await sres->next()) {
351+
++got;
352+
sum_id += row->id;
353+
}
354+
co_await sres->close();
355+
356+
std::cout << "[params] rows in [" << lower << "," << upper << "]: "
357+
<< got << " sum_id=" << sum_id << "\n";
358+
359+
auto sres2 = co_await pool.stream(
360+
"SELECT id FROM big_table WHERE id % $1 = $2 LIMIT $3",
361+
500, int64_t{7}, int64_t{0}, int64_t{20});
362+
363+
if (!sres2) {
364+
std::cerr << "[params] second stream open failed: "
365+
<< sres2.error().error << "\n";
366+
co_return;
367+
}
368+
369+
uint64_t mult7 = 0;
370+
while (auto row = co_await sres2->next()) ++mult7;
371+
co_await sres2->close();
372+
373+
std::cout << "[params] multiples-of-7 sample: " << mult7 << "\n";
374+
375+
namespace ps = usub::pg::stream;
376+
377+
auto s3 = co_await pool.stream_reflect<BigRow>(
378+
"SELECT id, name, tag FROM big_table "
379+
"WHERE id > $1 AND tag IS NOT NULL ORDER BY id",
380+
2000, int64_t{50000});
381+
382+
if (!s3) {
383+
std::cerr << "[params] pipeline open failed: "
384+
<< s3.error().error << "\n";
385+
co_return;
386+
}
387+
388+
int64_t pipe_sum = co_await(
389+
std::move(*s3)
390+
| ps::filter([](const BigRow& r) { return r.id % 2 == 0; })
391+
| ps::take(1000)
392+
| ps::reduce(int64_t{0}, [](int64_t acc, BigRow r) {
393+
return acc + r.id;
394+
})
395+
);
396+
397+
std::cout << "[params] pipeline sum(even, id>50000, tagged, first 1000): "
398+
<< pipe_sum << "\n";
399+
}
400+
333401
task::Awaitable<void> run_all(PgPool& pool, usub::Uvent& uvent) {
334402
co_await seed(pool);
335403
co_await demo_row_stream(pool);
@@ -340,6 +408,7 @@ task::Awaitable<void> run_all(PgPool& pool, usub::Uvent& uvent) {
340408
co_await demo_copy_from_buffer(pool);
341409
co_await demo_snapshot_workers(pool);
342410
co_await demo_pipeline(pool);
411+
co_await demo_stream_with_params(pool);
343412
pool.close_all();
344413
uvent.stop();
345414
co_return;

include/upq/PgConnection.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -825,12 +825,33 @@ namespace usub::pg {
825825

826826
std::string make_cursor_name();
827827

828+
static bool is_safe_cursor_ident(const std::string &s);
829+
828830
usub::uvent::task::Awaitable<QueryResult>
829831
cursor_declare(const std::string &cursor_name, const std::string &sql);
830832

831833
usub::uvent::task::Awaitable<QueryResult>
832834
cursor_declare_in_tx(const std::string &cursor_name, const std::string &sql);
833835

836+
template<typename... Args>
837+
usub::uvent::task::Awaitable<QueryResult>
838+
cursor_declare_in_tx_params(const std::string &cursor_name,
839+
const std::string &sql,
840+
Args &&... args) {
841+
if (!is_safe_cursor_ident(cursor_name)) {
842+
QueryResult err{};
843+
err.ok = false;
844+
err.code = PgErrorCode::ProtocolCorrupt;
845+
err.error = "invalid cursor name";
846+
err.rows_valid = false;
847+
co_return err;
848+
}
849+
const std::string stmt =
850+
"DECLARE " + cursor_name + " NO SCROLL CURSOR FOR " + sql;
851+
co_return co_await exec_param_query_nonblocking(
852+
stmt, std::forward<Args>(args)...);
853+
}
854+
834855
usub::uvent::task::Awaitable<PgCursorChunk>
835856
cursor_fetch_chunk(const std::string &cursor_name, uint32_t count);
836857

include/upq/PgPool.h

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,52 @@ namespace usub::pg {
510510
usub::uvent::task::Awaitable<std::expected<PgRowStream, PgOpError> >
511511
stream(std::string sql, uint32_t batch_size = 1000);
512512

513+
template <typename... Args>
514+
requires (sizeof...(Args) > 0)
515+
usub::uvent::task::Awaitable<std::expected<PgRowStream, PgOpError> >
516+
stream(std::string sql, uint32_t batch_size, Args &&... args) {
517+
auto c = co_await acquire_connection();
518+
if (!c) co_return std::unexpected(c.error());
519+
520+
auto conn = *c;
521+
const std::string cursor_name = conn->make_cursor_name();
522+
523+
{
524+
QueryResult r = co_await conn->exec_simple_query_nonblocking("BEGIN;");
525+
if (!r.ok) {
526+
PgOpError err{r.code, r.error, r.err_detail};
527+
if (is_fatal_connection_error(r)) mark_dead(conn);
528+
else co_await release_connection_async(conn);
529+
co_return std::unexpected(std::move(err));
530+
}
531+
}
532+
533+
{
534+
QueryResult r = co_await conn->cursor_declare_in_tx_params(
535+
cursor_name, sql, std::forward<Args>(args)...);
536+
if (!r.ok) {
537+
PgOpError err{r.code, r.error, r.err_detail};
538+
if (is_fatal_connection_error(r)) {
539+
mark_dead(conn);
540+
} else {
541+
co_await conn->exec_simple_query_nonblocking("ROLLBACK;");
542+
co_await release_connection_async(conn);
543+
}
544+
co_return std::unexpected(std::move(err));
545+
}
546+
}
547+
548+
PgRowStream s;
549+
s.pool_ = this;
550+
s.conn_ = std::move(conn);
551+
s.cursor_name_ = cursor_name;
552+
s.batch_size_ = batch_size ? batch_size : 1;
553+
s.owns_tx_ = true;
554+
s.active_ = true;
555+
s.exhausted_ = false;
556+
co_return std::expected<PgRowStream, PgOpError>{std::in_place, std::move(s)};
557+
}
558+
513559
template <class T>
514560
usub::uvent::task::Awaitable<std::expected<PgTypedRowStream<T>, PgOpError> >
515561
stream_reflect(std::string sql, uint32_t batch_size = 1000) {
@@ -519,6 +565,17 @@ namespace usub::pg {
519565
std::in_place, PgTypedRowStream<T>{std::move(*s)}};
520566
}
521567

568+
template <class T, typename... Args>
569+
requires (sizeof...(Args) > 0)
570+
usub::uvent::task::Awaitable<std::expected<PgTypedRowStream<T>, PgOpError> >
571+
stream_reflect(std::string sql, uint32_t batch_size, Args &&... args) {
572+
auto s = co_await stream(std::move(sql), batch_size,
573+
std::forward<Args>(args)...);
574+
if (!s) co_return std::unexpected(s.error());
575+
co_return std::expected<PgTypedRowStream<T>, PgOpError>{
576+
std::in_place, PgTypedRowStream<T>{std::move(*s)}};
577+
}
578+
522579
template <class Sink>
523580
usub::uvent::task::Awaitable<PgCopyResult>
524581
copy_to(std::string sql, Sink sink) {

include/upq/PgTransaction.h

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,40 @@ namespace usub::pg {
393393
usub::uvent::task::Awaitable<std::expected<PgRowStream, PgOpError> >
394394
stream(std::string sql, uint32_t batch_size = 1000);
395395

396+
template <typename... Args>
397+
requires (sizeof...(Args) > 0)
398+
usub::uvent::task::Awaitable<std::expected<PgRowStream, PgOpError> >
399+
stream(std::string sql, uint32_t batch_size, Args &&... args) {
400+
if (!active_ || !conn_ || !conn_->connected()) {
401+
co_return std::unexpected(PgOpError{
402+
PgErrorCode::InvalidFuture, "transaction not active", {}});
403+
}
404+
405+
const std::string cursor_name = conn_->make_cursor_name();
406+
QueryResult r = co_await conn_->cursor_declare_in_tx_params(
407+
cursor_name, sql, std::forward<Args>(args)...);
408+
if (!r.ok) {
409+
PgOpError err{r.code, r.error, r.err_detail};
410+
if (is_fatal_connection_error(r)) {
411+
pool_->mark_dead(conn_);
412+
conn_.reset();
413+
active_ = false;
414+
rolled_back_ = true;
415+
}
416+
co_return std::unexpected(std::move(err));
417+
}
418+
419+
PgRowStream s;
420+
s.pool_ = nullptr;
421+
s.conn_ = conn_;
422+
s.cursor_name_ = cursor_name;
423+
s.batch_size_ = batch_size ? batch_size : 1;
424+
s.owns_tx_ = false;
425+
s.active_ = true;
426+
s.exhausted_ = false;
427+
co_return std::expected<PgRowStream, PgOpError>{std::in_place, std::move(s)};
428+
}
429+
396430
template <class T>
397431
usub::uvent::task::Awaitable<std::expected<PgTypedRowStream<T>, PgOpError> >
398432
stream_reflect(std::string sql, uint32_t batch_size = 1000) {
@@ -402,6 +436,17 @@ namespace usub::pg {
402436
std::in_place, PgTypedRowStream<T>{std::move(*s)}};
403437
}
404438

439+
template <class T, typename... Args>
440+
requires (sizeof...(Args) > 0)
441+
usub::uvent::task::Awaitable<std::expected<PgTypedRowStream<T>, PgOpError> >
442+
stream_reflect(std::string sql, uint32_t batch_size, Args &&... args) {
443+
auto s = co_await stream(std::move(sql), batch_size,
444+
std::forward<Args>(args)...);
445+
if (!s) co_return std::unexpected(s.error());
446+
co_return std::expected<PgTypedRowStream<T>, PgOpError>{
447+
std::in_place, PgTypedRowStream<T>{std::move(*s)}};
448+
}
449+
405450
template <class Sink>
406451
usub::uvent::task::Awaitable<PgCopyResult>
407452
copy_to(std::string sql, Sink sink) {

src/upq/PgConnection.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ namespace usub::pg {
523523
}
524524
}
525525

526-
static bool is_safe_cursor_ident(const std::string &s) {
526+
bool PgConnectionLibpq::is_safe_cursor_ident(const std::string &s) {
527527
if (s.empty() || s.size() > 63) return false;
528528
auto c0 = static_cast<unsigned char>(s[0]);
529529
if (!((c0 >= 'a' && c0 <= 'z') || (c0 >= 'A' && c0 <= 'Z') || c0 == '_'))

0 commit comments

Comments
 (0)