Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .release-notes/add-row-streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
## Add row streaming support

Row streaming delivers query results in fixed-size batches instead of buffering all rows before delivery. This enables pull-based paged result consumption with bounded memory, ideal for large result sets.

A new `StreamingResultReceiver` interface provides three callbacks: `pg_stream_batch` delivers each batch of rows, `pg_stream_complete` signals all rows have been delivered, and `pg_stream_failed` reports errors. Three new `Session` methods control the flow:

```pony
// Start streaming with a window size of 100 rows per batch
session.stream(
PreparedQuery("SELECT * FROM big_table",
recover val Array[(String | None)] end),
100, my_receiver)

// In the receiver:
be pg_stream_batch(session: Session, rows: Rows) =>
// Process this batch
session.fetch_more() // Pull the next batch

be pg_stream_complete(session: Session) =>
// All rows delivered
```

Call `session.close_stream()` to end streaming early. Only `PreparedQuery` and `NamedPreparedQuery` are supported — streaming uses the extended query protocol's `Execute(max_rows)` + `PortalSuspended` mechanism.
16 changes: 9 additions & 7 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,18 @@ This design makes illegal state transitions call `_IllegalState()` (panic) by de

### Query Execution Flow

1. Client calls `session.execute(query, ResultReceiver)` where query is `SimpleQuery`, `PreparedQuery`, or `NamedPreparedQuery`; or `session.prepare(name, sql, PrepareReceiver)` to create a named statement; or `session.close_statement(name)` to destroy one; or `session.copy_in(sql, CopyInReceiver)` to start a COPY FROM STDIN operation; or `session.copy_out(sql, CopyOutReceiver)` to start a COPY TO STDOUT operation
2. `_SessionLoggedIn` queues operations as `_QueueItem` — a union of `_QueuedQuery` (execute), `_QueuedPrepare` (prepare), `_QueuedCloseStatement` (close_statement), `_QueuedCopyIn` (copy_in), and `_QueuedCopyOut` (copy_out)
1. Client calls `session.execute(query, ResultReceiver)` where query is `SimpleQuery`, `PreparedQuery`, or `NamedPreparedQuery`; or `session.prepare(name, sql, PrepareReceiver)` to create a named statement; or `session.close_statement(name)` to destroy one; or `session.copy_in(sql, CopyInReceiver)` to start a COPY FROM STDIN operation; or `session.copy_out(sql, CopyOutReceiver)` to start a COPY TO STDOUT operation; or `session.stream(query, window_size, StreamingResultReceiver)` to start a streaming query
2. `_SessionLoggedIn` queues operations as `_QueueItem` — a union of `_QueuedQuery` (execute), `_QueuedPrepare` (prepare), `_QueuedCloseStatement` (close_statement), `_QueuedCopyIn` (copy_in), `_QueuedCopyOut` (copy_out), and `_QueuedStreamingQuery` (stream)
3. The `_QueryState` sub-state machine manages operation lifecycle:
- `_QueryNotReady`: initial state after auth, before the first ReadyForQuery arrives
- `_QueryReady`: server is idle, `try_run_query` dispatches based on queue item type — `SimpleQuery` transitions to `_SimpleQueryInFlight`, `PreparedQuery` and `NamedPreparedQuery` transition to `_ExtendedQueryInFlight`, `_QueuedPrepare` transitions to `_PrepareInFlight`, `_QueuedCloseStatement` transitions to `_CloseStatementInFlight`, `_QueuedCopyIn` transitions to `_CopyInInFlight`, `_QueuedCopyOut` transitions to `_CopyOutInFlight`
- `_QueryReady`: server is idle, `try_run_query` dispatches based on queue item type — `SimpleQuery` transitions to `_SimpleQueryInFlight`, `PreparedQuery` and `NamedPreparedQuery` transition to `_ExtendedQueryInFlight`, `_QueuedPrepare` transitions to `_PrepareInFlight`, `_QueuedCloseStatement` transitions to `_CloseStatementInFlight`, `_QueuedCopyIn` transitions to `_CopyInInFlight`, `_QueuedCopyOut` transitions to `_CopyOutInFlight`, `_QueuedStreamingQuery` transitions to `_StreamingQueryInFlight`
- `_SimpleQueryInFlight`: owns per-query accumulation data (`_data_rows`, `_row_description`), delivers results on `CommandComplete`
- `_ExtendedQueryInFlight`: same data accumulation and result delivery as `_SimpleQueryInFlight` (duplicated because Pony traits can't have iso fields). Entered after sending Parse+Bind+Describe(portal)+Execute+Sync (unnamed) or Bind+Describe(portal)+Execute+Sync (named)
- `_PrepareInFlight`: handles Parse+Describe(statement)+Sync cycle. Notifies `PrepareReceiver` on success/failure via `ReadyForQuery`
- `_CloseStatementInFlight`: handles Close(statement)+Sync cycle. Fire-and-forget (no callback); errors silently absorbed
- `_CopyInInFlight`: handles COPY FROM STDIN data transfer. Sends the COPY query via simple query protocol, receives `CopyInResponse`, then uses pull-based flow: calls `pg_copy_ready` on the `CopyInReceiver` to request data. Client calls `send_copy_data` (sends CopyData + pulls again), `finish_copy` (sends CopyDone), or `abort_copy` (sends CopyFail). Server responds with CommandComplete+ReadyForQuery on success, or ErrorResponse+ReadyForQuery on failure
- `_CopyOutInFlight`: handles COPY TO STDOUT data reception. Sends the COPY query via simple query protocol, receives `CopyOutResponse` (silently consumed), then receives server-pushed `CopyData` messages (each delivered via `pg_copy_data` to the `CopyOutReceiver`), `CopyDone` (silently consumed), and finally `CommandComplete` (stores row count) + `ReadyForQuery` (delivers `pg_copy_complete`). On error, `ErrorResponse` delivers `pg_copy_failed` and the session remains usable
- `_StreamingQueryInFlight`: handles streaming row delivery. Entered after sending Parse+Bind+Describe(portal)+Execute(max_rows)+Flush (unnamed) or Bind+Describe(portal)+Execute(max_rows)+Flush (named). Uses Flush instead of Sync to keep the portal alive between batches. `PortalSuspended` triggers batch delivery via `pg_stream_batch`. Client calls `fetch_more()` (sends Execute+Flush) or `close_stream()` (sends Sync). `CommandComplete` delivers final batch and sends Sync. `ReadyForQuery` delivers `pg_stream_complete` and dequeues. On error, sends Sync (required because no Sync is pending during streaming) and delivers `pg_stream_failed`
4. Response data arrives: `_RowDescriptionMessage` sets column metadata, `_DataRowMessage` accumulates rows
5. `_CommandCompleteMessage` triggers result delivery to receiver
6. `_ReadyForQueryMessage` dequeues completed operation, transitions to `_QueryReady`
Expand All @@ -83,13 +84,13 @@ Only one operation is in-flight at a time. The queue serializes execution. `quer
### Protocol Layer

**Frontend (client → server):**
- `_FrontendMessage` primitive: `startup()`, `password()`, `query()`, `parse()`, `bind()`, `describe_portal()`, `describe_statement()`, `execute_msg()`, `close_statement()`, `sync()`, `ssl_request()`, `cancel_request()`, `terminate()`, `sasl_initial_response()`, `sasl_response()`, `copy_data()`, `copy_done()`, `copy_fail()` — builds raw byte arrays with big-endian wire format
- `_FrontendMessage` primitive: `startup()`, `password()`, `query()`, `parse()`, `bind()`, `describe_portal()`, `describe_statement()`, `execute_msg()`, `close_statement()`, `sync()`, `flush()`, `ssl_request()`, `cancel_request()`, `terminate()`, `sasl_initial_response()`, `sasl_response()`, `copy_data()`, `copy_done()`, `copy_fail()` — builds raw byte arrays with big-endian wire format

**Backend (server → client):**
- `_ResponseParser` primitive: incremental parser consuming from a `Reader` buffer. Returns one parsed message per call, `None` if incomplete, errors on junk.
- `_ResponseMessageParser` primitive: routes parsed messages to the current session state's callbacks. Processes messages synchronously within a query cycle (looping until `ReadyForQuery` or buffer exhaustion), then yields via `s._process_again()` between cycles. This prevents behaviors like `close()` from interleaving between result delivery and query dequeuing. If a callback triggers shutdown during the loop, `on_shutdown` clears the read buffer, causing the next parse to return `None` and exit the loop.

**Supported message types:** AuthenticationOk, AuthenticationMD5Password, AuthenticationSASL, AuthenticationSASLContinue, AuthenticationSASLFinal, BackendKeyData, CommandComplete, CopyInResponse, CopyOutResponse, CopyData, CopyDone, DataRow, EmptyQueryResponse, ErrorResponse, NoticeResponse, NotificationResponse, ParameterStatus, ReadyForQuery, RowDescription, ParseComplete, BindComplete, NoData, CloseComplete, ParameterDescription, PortalSuspended. BackendKeyData is parsed and stored in `_SessionLoggedIn` (`backend_pid`, `backend_secret_key`) for future query cancellation. NotificationResponse is parsed into `_NotificationResponseMessage` and routed to `_SessionLoggedIn.on_notification()`, which delivers `pg_notification` to `SessionStatusNotify`. NoticeResponse is parsed into `NoticeResponseMessage` (using shared `_ResponseFieldBuilder` / `_parse_response_fields` with ErrorResponse) and routed via `on_notice()` to `SessionStatusNotify.pg_notice()`. Notices are delivered in all connected states (including during authentication) since PostgreSQL can send them at any time. ParameterStatus is parsed into `_ParameterStatusMessage` and routed via `on_parameter_status()` to `SessionStatusNotify.pg_parameter_status()`, which delivers a `ParameterStatus` val. Like notices, parameter status messages are delivered in all connected states. Extended query acknowledgment messages (ParseComplete, BindComplete, NoData, etc.) are parsed but silently consumed — they fall through the `_ResponseMessageParser` match without routing since the state machine tracks query lifecycle through data-carrying messages only.
**Supported message types:** AuthenticationOk, AuthenticationMD5Password, AuthenticationSASL, AuthenticationSASLContinue, AuthenticationSASLFinal, BackendKeyData, CommandComplete, CopyInResponse, CopyOutResponse, CopyData, CopyDone, DataRow, EmptyQueryResponse, ErrorResponse, NoticeResponse, NotificationResponse, ParameterStatus, ReadyForQuery, RowDescription, ParseComplete, BindComplete, NoData, CloseComplete, ParameterDescription, PortalSuspended. BackendKeyData is parsed and stored in `_SessionLoggedIn` (`backend_pid`, `backend_secret_key`) for future query cancellation. NotificationResponse is parsed into `_NotificationResponseMessage` and routed to `_SessionLoggedIn.on_notification()`, which delivers `pg_notification` to `SessionStatusNotify`. NoticeResponse is parsed into `NoticeResponseMessage` (using shared `_ResponseFieldBuilder` / `_parse_response_fields` with ErrorResponse) and routed via `on_notice()` to `SessionStatusNotify.pg_notice()`. Notices are delivered in all connected states (including during authentication) since PostgreSQL can send them at any time. ParameterStatus is parsed into `_ParameterStatusMessage` and routed via `on_parameter_status()` to `SessionStatusNotify.pg_parameter_status()`, which delivers a `ParameterStatus` val. Like notices, parameter status messages are delivered in all connected states. PortalSuspended is parsed into `_PortalSuspendedMessage` and routed to `s.state.on_portal_suspended(s)` for streaming batch delivery. Extended query acknowledgment messages (ParseComplete, BindComplete, NoData, etc.) are parsed but silently consumed — they fall through the `_ResponseMessageParser` match without routing since the state machine tracks query lifecycle through data-carrying messages only.

### Public API Types

Expand All @@ -109,6 +110,7 @@ Only one operation is in-flight at a time. The queue serializes execution. `quer
- `PrepareReceiver` interface (tag) — `pg_statement_prepared(Session, name)`, `pg_prepare_failed(Session, name, (ErrorResponseMessage | ClientQueryError))`
- `CopyInReceiver` interface (tag) — `pg_copy_ready(Session)`, `pg_copy_complete(Session, count)`, `pg_copy_failed(Session, (ErrorResponseMessage | ClientQueryError))`. Pull-based: session calls `pg_copy_ready` after `copy_in` and after each `send_copy_data`, letting the client control data flow
- `CopyOutReceiver` interface (tag) — `pg_copy_data(Session, Array[U8] val)`, `pg_copy_complete(Session, count)`, `pg_copy_failed(Session, (ErrorResponseMessage | ClientQueryError))`. Push-based: server drives the flow, delivering data chunks via `pg_copy_data` and signaling completion via `pg_copy_complete`
- `StreamingResultReceiver` interface (tag) — `pg_stream_batch(Session, Rows)`, `pg_stream_complete(Session)`, `pg_stream_failed(Session, (PreparedQuery | NamedPreparedQuery), (ErrorResponseMessage | ClientQueryError))`. Pull-based: session delivers batches via `pg_stream_batch`; client calls `fetch_more()` for the next batch or `close_stream()` to end early
- `ClientQueryError` trait — `SessionNeverOpened`, `SessionClosed`, `SessionNotAuthenticated`, `DataError`
- `DatabaseConnectInfo` — val class grouping database authentication parameters (user, password, database). Passed to `Session.create()` alongside `ServerConnectInfo`.
- `ServerConnectInfo` — val class grouping connection parameters (auth, host, service, ssl_mode). Passed to `Session.create()` as the first parameter. Also used by `_CancelSender`.
Expand Down Expand Up @@ -143,15 +145,15 @@ Tests live in the main `postgres/` package (private test classes), organized acr

**Conventions**: `_test.pony` contains shared helpers (`_ConnectionTestConfiguration` for env vars, `_ConnectTestNotify`/`_AuthenticateTestNotify` reused by other files). `_test_response_parser.pony` contains `_Incoming*TestMessage` builder classes that construct raw protocol bytes for mock servers across all test files. `_test_mock_message_reader.pony` contains `_MockMessageReader` for extracting complete PostgreSQL frontend messages from TCP data in mock servers.

**Ports**: Mock server tests use ports in the 7669–7701 range and 9667–9668. **Port 7680 is reserved by Windows** (Update Delivery Optimization) and will fail to bind on WSL2 — do not use it.
**Ports**: Mock server tests use ports in the 7669–7706 range and 9667–9668. **Port 7680 is reserved by Windows** (Update Delivery Optimization) and will fail to bind on WSL2 — do not use it.

## Supported PostgreSQL Features

**SSL/TLS:** Optional SSL negotiation via `SSLRequired`. CVE-2021-23222 mitigated via `expect(1)` before SSLRequest. Design: [discussion #76](https://github.com/ponylang/postgres/discussions/76).

**Authentication:** MD5 password and SCRAM-SHA-256. No SCRAM-SHA-256-PLUS (channel binding), Kerberos, GSS, or certificate auth. Design: [discussion #83](https://github.com/ponylang/postgres/discussions/83).

**Protocol:** Simple query and extended query (parameterized via unnamed and named prepared statements). Parameters are text-format only; type OIDs are inferred by the server. LISTEN/NOTIFY, NoticeResponse, ParameterStatus, COPY FROM STDIN (pull-based), COPY TO STDOUT (push-based). No function calls. Full feature roadmap: [discussion #72](https://github.com/ponylang/postgres/discussions/72).
**Protocol:** Simple query and extended query (parameterized via unnamed and named prepared statements). Parameters are text-format only; type OIDs are inferred by the server. LISTEN/NOTIFY, NoticeResponse, ParameterStatus, COPY FROM STDIN (pull-based), COPY TO STDOUT (push-based), row streaming (windowed batch delivery via Execute(max_rows)+PortalSuspended). No function calls. Full feature roadmap: [discussion #72](https://github.com/ponylang/postgres/discussions/72).

**CI containers:** Stock `postgres:14.5` for plain (port 5432, SCRAM-SHA-256 default) and `ghcr.io/ponylang/postgres-ci-pg-ssl:latest` for SSL (port 5433, SSL + md5user); built via `build-ci-image.yml` workflow dispatch or locally via `.ci-dockerfiles/pg-ssl/build-and-push.bash`. MD5 integration tests connect to the SSL container (without using SSL) because only that container has the md5user.

Expand Down
4 changes: 4 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ Bulk data loading using `COPY ... FROM STDIN`. Creates a table, loads three rows

Bulk data export using `COPY ... TO STDOUT`. Creates a table, inserts three rows, exports them via `Session.copy_out()`, and prints the received data. Demonstrates the push-based `CopyOutReceiver` interface: the server drives the flow, calling `pg_copy_data` for each chunk, then `pg_copy_complete` when finished.

## streaming

Row streaming using `Session.stream()` with windowed batch delivery. Creates a table with 7 rows, streams them with `window_size=3` (producing batches of 3, 3, and 1), then drops the table. Demonstrates the pull-based `StreamingResultReceiver` interface: `pg_stream_batch` delivers each batch, `fetch_more()` requests the next, and `pg_stream_complete` signals completion.

## notice

Server notice handling using `pg_notice`. Executes `DROP TABLE IF EXISTS` on a nonexistent table, which triggers a PostgreSQL `NoticeResponse`, and prints the notice fields (severity, code, message). Shows how `SessionStatusNotify.pg_notice` delivers non-fatal informational messages from the server.
Expand Down
Loading