diff --git a/.release-notes/next-release.md b/.release-notes/next-release.md index dd9959e..1f71ddb 100644 --- a/.release-notes/next-release.md +++ b/.release-notes/next-release.md @@ -114,7 +114,7 @@ end ## Add SSL/TLS negotiation support -You can now encrypt connections to PostgreSQL using SSL/TLS. Pass `SSLRequired(sslctx)` to `Session.create()` to enable SSL negotiation before authentication. The default `SSLDisabled` preserves the existing plaintext behavior. +You can now encrypt connections to PostgreSQL using SSL/TLS. Pass `SSLRequired(sslctx)` via `ServerConnectInfo` to `Session.create()` to enable SSL negotiation before authentication. The default `SSLDisabled` preserves the existing plaintext behavior. ```pony use "ssl/net" @@ -129,14 +129,11 @@ end // Connect with SSL let session = Session( - auth, + ServerConnectInfo(auth, host, port, SSLRequired(sslctx)), notify, - host, - port, username, password, - database, - SSLRequired(sslctx)) + database) ``` If the server accepts SSL, the connection is encrypted before authentication begins. If the server refuses, `pg_session_connection_failed` fires. @@ -246,3 +243,44 @@ rs1 == rs2 // true No code changes are needed — `Session.close()` handles this automatically. +## Add query cancellation support + +You can now cancel a running query by calling `session.cancel()`. This sends a PostgreSQL CancelRequest on a separate connection, requesting the server to abort the in-flight query. Cancellation is best-effort — the server may or may not honor it. If cancelled, the query's `ResultReceiver` receives `pg_query_failed` with an `ErrorResponseMessage` containing SQLSTATE `57014` (query_canceled). + +```pony +be pg_session_authenticated(session: Session) => + session.execute(SimpleQuery("SELECT pg_sleep(60)"), receiver) + session.cancel() + +be pg_query_failed(session: Session, query: Query, + failure: (ErrorResponseMessage | ClientQueryError)) +=> + match failure + | let err: ErrorResponseMessage => + if err.code == "57014" then + // query was successfully cancelled + end + end +``` + +`cancel()` is safe to call at any time — it is a no-op if no query is in flight. When the session uses `SSLRequired`, the cancel connection uses SSL as well. + +## Change Session constructor to accept ServerConnectInfo + +`Session.create()` now takes a `ServerConnectInfo` as its first parameter instead of individual connection arguments. `ServerConnectInfo` groups auth, host, service, and SSL mode into a single immutable value. + +Before: + +```pony +let session = Session( + auth, notify, host, port, username, password, database) +``` + +After: + +```pony +let session = Session( + ServerConnectInfo(auth, host, port), + notify, username, password, database) +``` + diff --git a/CHANGELOG.md b/CHANGELOG.md index bfeff32..25042dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,11 +21,13 @@ All notable changes to this project will be documented in this file. This projec - Add equality comparison for Field ([PR #85](https://github.com/ponylang/postgres/pull/85)) - Add equality comparison for Row ([PR #85](https://github.com/ponylang/postgres/pull/85)) - Add equality comparison for Rows ([PR #85](https://github.com/ponylang/postgres/pull/85)) +- Add query cancellation support ([PR #89](https://github.com/ponylang/postgres/pull/89)) ### Changed - Change ResultReceiver and Result to use Query union type instead of SimpleQuery ([PR #70](https://github.com/ponylang/postgres/pull/70)) - Change ResultReceiver and PrepareReceiver callbacks to take Session as first parameter ([PR #84](https://github.com/ponylang/postgres/pull/84)) +- Change Session constructor to accept ServerConnectInfo ([PR #89](https://github.com/ponylang/postgres/pull/89)) - Fix typo in SesssionNeverOpened ([PR #59](https://github.com/ponylang/postgres/pull/59)) ## [0.2.2] - 2025-07-16 diff --git a/CLAUDE.md b/CLAUDE.md index 2f14b4f..2f64d0f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -31,7 +31,7 @@ Managed via `corral`. ### Session State Machine -`Session` actor is the main entry point. Implements `lori.TCPConnectionActor` and `lori.ClientLifecycleEventReceiver`. State transitions via `_SessionState` interface with four concrete states: +`Session` actor is the main entry point. Constructor takes `ServerConnectInfo` (auth, host, service, ssl_mode) as its first parameter. Implements `lori.TCPConnectionActor` and `lori.ClientLifecycleEventReceiver`. The stored `ServerConnectInfo` is accessible via `server_connect_info()` for use by `_CancelSender`. State transitions via `_SessionState` interface with four concrete states: ``` _SessionUnopened --connect (no SSL)--> _SessionConnected @@ -71,10 +71,12 @@ This design makes illegal state transitions call `_IllegalState()` (panic) by de Only one operation is in-flight at a time. The queue serializes execution. `query_queue`, `query_state`, `backend_pid`, and `backend_secret_key` are non-underscore-prefixed fields on `_SessionLoggedIn` because other types in this package need cross-type access (Pony private fields are type-private). +**Query cancellation:** `session.cancel()` requests cancellation of the currently executing query by opening a separate TCP connection via `_CancelSender` and sending a `CancelRequest`. The `cancel` method on `_SessionState` follows the same "never illegal" contract as `close` — it is a no-op in all states except `_SessionLoggedIn`, where it fires only when a query is in flight (not in `_QueryReady` or `_QueryNotReady`). Cancellation is best-effort; the server may or may not honor it. If cancelled, the query's `ResultReceiver` receives `pg_query_failed` with an `ErrorResponseMessage` (SQLSTATE 57014). + ### Protocol Layer **Frontend (client → server):** -- `_FrontendMessage` primitive: `startup()`, `password()`, `query()`, `parse()`, `bind()`, `describe_portal()`, `describe_statement()`, `execute_msg()`, `close_statement()`, `sync()`, `ssl_request()`, `terminate()` — builds raw byte arrays with big-endian wire format +- `_FrontendMessage` primitive: `startup()`, `password()`, `query()`, `parse()`, `bind()`, `describe_portal()`, `describe_statement()`, `execute_msg()`, `close_statement()`, `sync()`, `ssl_request()`, `cancel_request()`, `terminate()` — builds raw byte arrays with big-endian wire format **Backend (server → client):** - `_ResponseParser` primitive: incremental parser consuming from a `Reader` buffer. Returns one parsed message per call, `None` if incomplete, errors on junk. @@ -95,6 +97,7 @@ Only one operation is in-flight at a time. The queue serializes execution. `quer - `ResultReceiver` interface (tag) — `pg_query_result(Session, Result)`, `pg_query_failed(Session, Query, (ErrorResponseMessage | ClientQueryError))` - `PrepareReceiver` interface (tag) — `pg_statement_prepared(Session, name)`, `pg_prepare_failed(Session, name, (ErrorResponseMessage | ClientQueryError))` - `ClientQueryError` trait — `SessionNeverOpened`, `SessionClosed`, `SessionNotAuthenticated`, `DataError` +- `ServerConnectInfo` — val class grouping connection parameters (auth, host, service, ssl_mode). Passed to `Session.create()` as the first parameter. Also used by `_CancelSender`. - `SSLMode` — union type `(SSLDisabled | SSLRequired)`. `SSLDisabled` is the default (plaintext). `SSLRequired` wraps an `SSLContext val` for TLS negotiation. - `ErrorResponseMessage` — full PostgreSQL error with all standard fields - `AuthenticationFailureReason` = `(InvalidAuthenticationSpecification | InvalidPassword)` @@ -111,6 +114,10 @@ In `_RowsBuilder._field_to_type()`: - Everything else → `String` - NULL → `None` +### Query Cancellation + +`_CancelSender` actor — fire-and-forget actor that sends a `CancelRequest` on a separate TCP connection. PostgreSQL requires cancel requests on a different connection from the one executing the query. No response is expected on the cancel connection — the result (if any) arrives as an `ErrorResponse` on the original session connection. When the session uses `SSLRequired`, the cancel connection performs SSL negotiation before sending the CancelRequest — mirroring the main session's connection setup. If the server refuses SSL or the TLS handshake fails, the cancel is silently abandoned. Created by `_SessionLoggedIn.cancel()` using the session's `ServerConnectInfo`, `backend_pid`, and `backend_secret_key`. Design: [discussion #88](https://github.com/ponylang/postgres/discussions/88). + ### Mort Primitives `_IllegalState` and `_Unreachable` in `_mort.pony`. Print file/line to stderr via FFI and exit. Issue URL: `https://github.com/ponylang/postgres/issues`. @@ -129,6 +136,8 @@ Tests live in the main `postgres/` package (private test classes). - `_TestSSLNegotiationRefused` — mock server responds 'N' to SSLRequest; verifies `pg_session_connection_failed` fires - `_TestSSLNegotiationJunkResponse` — mock server responds with junk byte to SSLRequest; verifies session shuts down - `_TestSSLNegotiationSuccess` — mock server responds 'S', both sides upgrade to TLS, sends AuthOk+ReadyForQuery; verifies full SSL→auth flow +- `_TestCancelQueryInFlight` — mock server accepts two connections; first authenticates with BackendKeyData(pid, key) and receives query; second receives CancelRequest and verifies 16-byte format with correct magic number, pid, and key +- `_TestSSLCancelQueryInFlight` — same as `_TestCancelQueryInFlight` but with SSL on both connections; verifies that `_CancelSender` performs SSL negotiation before sending CancelRequest - `_TestField*Equality*` / `_TestFieldInequality` — example-based reflexive, structural, symmetric equality and inequality tests for Field - `_TestRowEquality` / `_TestRowInequality` — example-based equality and inequality tests for Row - `_TestRowsEquality` / `_TestRowsInequality` — example-based equality and inequality tests for Rows @@ -145,7 +154,8 @@ Tests live in the main `postgres/` package (private test classes). - PreparedStatement/PrepareAndClose, PreparedStatement/PrepareFails, PreparedStatement/PrepareAfterClose - PreparedStatement/CloseNonexistent, PreparedStatement/PrepareDuplicateName - PreparedStatement/MixedWithSimpleAndPrepared -- SSL/Connect, SSL/Authenticate, SSL/Query, SSL/Refused +- Cancel/Query +- SSL/Connect, SSL/Authenticate, SSL/Query, SSL/Refused, SSL/Cancel Test helpers: `_ConnectionTestConfiguration` reads env vars with defaults. Several test message builder classes (`_Incoming*TestMessage`) construct raw protocol bytes for unit tests. @@ -298,8 +308,9 @@ Can arrive between any other messages (must always handle): ## File Layout ``` -postgres/ # Main package (30 files) +postgres/ # Main package (32 files) session.pony # Session actor + state machine traits + query sub-state machine + server_connect_info.pony # ServerConnectInfo val class (auth, host, service, ssl_mode) ssl_mode.pony # SSLDisabled, SSLRequired, SSLMode types simple_query.pony # SimpleQuery class prepared_query.pony # PreparedQuery class @@ -315,6 +326,7 @@ postgres/ # Main package (30 files) field_data_types.pony # FieldDataTypes union row.pony # Row class rows.pony # Rows, RowIterator, _RowsBuilder + _cancel_sender.pony # Fire-and-forget cancel request actor _frontend_message.pony # Client-to-server messages _backend_messages.pony # Server-to-client message types _message_type.pony # Protocol message type codes @@ -337,5 +349,6 @@ examples/ssl-query/ssl-query-example.pony # SSL-encrypted query with SSLRequired examples/prepared-query/prepared-query-example.pony # PreparedQuery with params and NULL examples/named-prepared-query/named-prepared-query-example.pony # Named prepared statements with reuse examples/crud/crud-example.pony # Multi-query CRUD workflow +examples/cancel/cancel-example.pony # Query cancellation with pg_sleep .ci-dockerfiles/pg-ssl/ # Dockerfile for SSL-enabled PostgreSQL CI container ``` diff --git a/examples/README.md b/examples/README.md index 42edeaf..17e1226 100644 --- a/examples/README.md +++ b/examples/README.md @@ -18,6 +18,10 @@ Named prepared statements using `Session.prepare()` and `NamedPreparedQuery`. Pr SSL-encrypted query using `SSLRequired`. Same workflow as `query` but with TLS negotiation enabled. Demonstrates how to create an `SSLContext`, wrap it in `SSLRequired`, and pass it to `Session`. Requires a PostgreSQL server configured to accept SSL connections. +## cancel + +Query cancellation using `Session.cancel()`. Executes a long-running query (`SELECT pg_sleep(10)`), cancels it, and handles the resulting `ErrorResponseMessage` with SQLSTATE `57014` (query_canceled) in the `ResultReceiver`. Shows that cancellation is best-effort and arrives as a query failure, not a separate callback. + ## crud Multi-query workflow mixing `SimpleQuery` and `PreparedQuery`. Creates a table, inserts rows with parameterized INSERTs, selects them back, deletes, and drops the table. Demonstrates all three `Result` types (`ResultSet`, `RowModifying`, `SimpleResult`) and `ErrorResponseMessage` error handling. diff --git a/examples/cancel/cancel-example.pony b/examples/cancel/cancel-example.pony new file mode 100644 index 0000000..98e0d45 --- /dev/null +++ b/examples/cancel/cancel-example.pony @@ -0,0 +1,78 @@ +use "cli" +use "collections" +use lori = "lori" +// in your code this `use` statement would be: +// use "postgres" +use "../../postgres" + +actor Main + new create(env: Env) => + let server_info = ServerInfo(env.vars) + let auth = lori.TCPConnectAuth(env.root) + + let client = Client(auth, server_info, env.out) + +actor Client is (SessionStatusNotify & ResultReceiver) + let _session: Session + let _out: OutStream + + new create(auth: lori.TCPConnectAuth, info: ServerInfo, out: OutStream) => + _out = out + _session = Session( + ServerConnectInfo(auth, info.host, info.port), + this, + info.username, + info.password, + info.database) + + be close() => + _session.close() + + be pg_session_authenticated(session: Session) => + _out.print("Authenticated.") + _out.print("Sending long-running query....") + let q = SimpleQuery("SELECT pg_sleep(10)") + session.execute(q, this) + + _out.print("Cancelling query....") + session.cancel() + + be pg_session_authentication_failed( + s: Session, + reason: AuthenticationFailureReason) + => + _out.print("Failed to authenticate.") + + be pg_query_result(session: Session, result: Result) => + _out.print("Query completed (was not cancelled).") + close() + + be pg_query_failed(session: Session, query: Query, + failure: (ErrorResponseMessage | ClientQueryError)) + => + match failure + | let err: ErrorResponseMessage => + if err.code == "57014" then + _out.print("Query was cancelled (SQLSTATE 57014).") + else + _out.print("Query failed with SQLSTATE " + err.code + ".") + end + | let ce: ClientQueryError => + _out.print("Query failed with client error.") + end + close() + +class val ServerInfo + let host: String + let port: String + let username: String + let password: String + let database: String + + new val create(vars: (Array[String] val | None)) => + let e = EnvVars(vars) + host = try e("POSTGRES_HOST")? else "127.0.0.1" end + port = try e("POSTGRES_PORT")? else "5432" end + username = try e("POSTGRES_USERNAME")? else "postgres" end + password = try e("POSTGRES_PASSWORD")? else "postgres" end + database = try e("POSTGRES_DATABASE")? else "postgres" end diff --git a/examples/crud/crud-example.pony b/examples/crud/crud-example.pony index ad2cfe5..688abeb 100644 --- a/examples/crud/crud-example.pony +++ b/examples/crud/crud-example.pony @@ -25,10 +25,8 @@ actor Client is (SessionStatusNotify & ResultReceiver) new create(auth: lori.TCPConnectAuth, info: ServerInfo, out: OutStream) => _out = out _session = Session( - auth, + ServerConnectInfo(auth, info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) diff --git a/examples/named-prepared-query/named-prepared-query-example.pony b/examples/named-prepared-query/named-prepared-query-example.pony index 2511b99..165e8f9 100644 --- a/examples/named-prepared-query/named-prepared-query-example.pony +++ b/examples/named-prepared-query/named-prepared-query-example.pony @@ -20,10 +20,8 @@ actor Client is (SessionStatusNotify & ResultReceiver & PrepareReceiver) new create(auth: lori.TCPConnectAuth, info: ServerInfo, out: OutStream) => _out = out _session = Session( - auth, + ServerConnectInfo(auth, info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) diff --git a/examples/prepared-query/prepared-query-example.pony b/examples/prepared-query/prepared-query-example.pony index e26f85b..a13eb7d 100644 --- a/examples/prepared-query/prepared-query-example.pony +++ b/examples/prepared-query/prepared-query-example.pony @@ -19,10 +19,8 @@ actor Client is (SessionStatusNotify & ResultReceiver) new create(auth: lori.TCPConnectAuth, info: ServerInfo, out: OutStream) => _out = out _session = Session( - auth, + ServerConnectInfo(auth, info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) diff --git a/examples/query/query-example.pony b/examples/query/query-example.pony index 617d26d..77d62ba 100644 --- a/examples/query/query-example.pony +++ b/examples/query/query-example.pony @@ -19,10 +19,8 @@ actor Client is (SessionStatusNotify & ResultReceiver) new create(auth: lori.TCPConnectAuth, info: ServerInfo, out: OutStream) => _out = out _session = Session( - auth, + ServerConnectInfo(auth, info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) diff --git a/examples/ssl-query/ssl-query-example.pony b/examples/ssl-query/ssl-query-example.pony index a7b6a91..8ef60c6 100644 --- a/examples/ssl-query/ssl-query-example.pony +++ b/examples/ssl-query/ssl-query-example.pony @@ -37,14 +37,11 @@ actor Client is (SessionStatusNotify & ResultReceiver) => _out = out _session = Session( - auth, + ServerConnectInfo(auth, info.host, info.port, SSLRequired(sslctx)), this, - info.host, - info.port, info.username, info.password, - info.database, - SSLRequired(sslctx)) + info.database) be close() => _session.close() diff --git a/postgres/_cancel_sender.pony b/postgres/_cancel_sender.pony new file mode 100644 index 0000000..62ba275 --- /dev/null +++ b/postgres/_cancel_sender.pony @@ -0,0 +1,76 @@ +use lori = "lori" +use "ssl/net" + +actor _CancelSender is (lori.TCPConnectionActor & lori.ClientLifecycleEventReceiver) + """ + Fire-and-forget actor that sends a CancelRequest on a separate TCP + connection. PostgreSQL requires cancel requests on a different connection + from the one executing the query. No response is expected on the cancel + connection — the result (if any) arrives as an ErrorResponse on the + original session connection. + + When `SSLRequired` is active, performs SSL negotiation before sending the + CancelRequest — mirroring what the main Session connection does. If the + server refuses SSL or the TLS handshake fails, the cancel is silently + abandoned (fire-and-forget semantics). + """ + var _tcp_connection: lori.TCPConnection = lori.TCPConnection.none() + let _process_id: I32 + let _secret_key: I32 + let _info: ServerConnectInfo + + new create(info: ServerConnectInfo, process_id: I32, secret_key: I32) => + _process_id = process_id + _secret_key = secret_key + _info = info + _tcp_connection = lori.TCPConnection.client( + info.auth, info.host, info.service, "", this, this) + + fun ref _connection(): lori.TCPConnection => + _tcp_connection + + fun ref _on_connected() => + match _info.ssl_mode + | SSLDisabled => + _send_cancel_and_close() + | let _: SSLRequired => + // CVE-2021-23222 mitigation: expect exactly 1 byte for SSL response. + try _tcp_connection.expect(1)? end + _tcp_connection.send(_FrontendMessage.ssl_request()) + end + + fun ref _on_received(data: Array[U8] iso) => + // Only called during SSL negotiation — server responds 'S' or 'N'. + try + if data(0)? == 'S' then + match _info.ssl_mode + | let req: SSLRequired => + match _tcp_connection.start_tls(req.ctx, _info.host) + | None => None // Handshake started, wait for _on_tls_ready + | let _: lori.StartTLSError => + _tcp_connection.close() + end + end + else + // 'N' or junk — server refused SSL, silently give up + _tcp_connection.close() + end + else + _tcp_connection.close() + end + + fun ref _on_tls_ready() => + // Reset expect from 1 back to 0 (same pattern as _SessionSSLNegotiating) + try _tcp_connection.expect(0)? end + _send_cancel_and_close() + + fun ref _on_tls_failure() => + // Fire-and-forget: TLS handshake failed, silently give up. + // Lori follows this with _on_closed() (default no-op), so no + // additional cleanup needed. + None + + fun ref _send_cancel_and_close() => + _tcp_connection.send( + _FrontendMessage.cancel_request(_process_id, _secret_key)) + _tcp_connection.close() diff --git a/postgres/_frontend_message.pony b/postgres/_frontend_message.pony index cb65dc2..a848970 100644 --- a/postgres/_frontend_message.pony +++ b/postgres/_frontend_message.pony @@ -369,6 +369,36 @@ primitive _FrontendMessage [] end + fun cancel_request(process_id: I32, secret_key: I32): Array[U8] val => + """ + Build a CancelRequest message. Sent on a separate connection to request + cancellation of a running query. No message type byte — same pattern as + startup() and ssl_request(). + + Format: Int32(16) Int32(80877102) Int32(process_id) Int32(secret_key) + The magic number 80877102 = 1234 << 16 | 5678. + """ + try + recover val + let msg: Array[U8] = Array[U8].init(0, 16) + ifdef bigendian then + msg.update_u32(0, U32(16))? + msg.update_u32(4, U32(80877102))? + msg.update_u32(8, process_id.u32())? + msg.update_u32(12, secret_key.u32())? + else + msg.update_u32(0, U32(16).bswap())? + msg.update_u32(4, U32(80877102).bswap())? + msg.update_u32(8, process_id.u32().bswap())? + msg.update_u32(12, secret_key.u32().bswap())? + end + msg + end + else + _Unreachable() + [] + end + fun terminate(): Array[U8] val => """ Build a Terminate message. Sent before closing the TCP connection to diff --git a/postgres/_test.pony b/postgres/_test.pony index 87c060c..5fe6a4b 100644 --- a/postgres/_test.pony +++ b/postgres/_test.pony @@ -106,6 +106,11 @@ actor \nodoc\ Main is TestList _TestFieldSymmetricProperty)) test(Property1UnitTest[Row](_TestRowReflexiveProperty)) test(Property1UnitTest[Rows](_TestRowsReflexiveProperty)) + test(_TestFrontendMessageCancelRequest) + test(_TestCancelQueryInFlight) + test(_TestSSLCancelQueryInFlight) + test(_TestCancelPgSleep) + test(_TestCancelSSLPgSleep) class \nodoc\ iso _TestAuthenticate is UnitTest """ @@ -120,10 +125,8 @@ class \nodoc\ iso _TestAuthenticate is UnitTest let info = _ConnectionTestConfiguration(h.env.vars) let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), _AuthenticateTestNotify(h, true), - info.host, - info.port, info.username, info.password, info.database) @@ -144,10 +147,8 @@ class \nodoc\ iso _TestAuthenticateFailure is UnitTest let info = _ConnectionTestConfiguration(h.env.vars) let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), _AuthenticateTestNotify(h, false), - info.host, - info.port, info.username, info.password + " " + info.password, info.database) @@ -184,10 +185,8 @@ class \nodoc\ iso _TestConnect is UnitTest let info = _ConnectionTestConfiguration(h.env.vars) let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), _ConnectTestNotify(h, true), - info.host, - info.port, info.username, info.password, info.database) @@ -210,10 +209,8 @@ class \nodoc\ iso _TestConnectFailure is UnitTest let host = ifdef linux then "127.0.0.2" else "localhost" end let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), host, info.port.reverse()), _ConnectTestNotify(h, false), - host, - info.port.reverse(), info.username, info.password, info.database) @@ -318,10 +315,8 @@ actor \nodoc\ _JunkSendingTestListener is lori.TCPListenerActor fun ref _on_listening() => // Now that we are listening, start a client session Session( - lori.TCPConnectAuth(_h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(_h.env.root), _host, _port), _HandlingJunkTestNotify(_h), - _host, - _port, "postgres", "postgres", "postgres") @@ -457,10 +452,8 @@ actor \nodoc\ _DoesntAnswerTestListener is lori.TCPListenerActor fun ref _on_listening() => // Now that we are listening, start a client session Session( - lori.TCPConnectAuth(_h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(_h.env.root), _host, _port), _DoesntAnswerClient(_h), - _host, - _port, "postgres", "postgres", "postgres") @@ -611,10 +604,8 @@ actor \nodoc\ _ZeroRowSelectTestListener is lori.TCPListenerActor fun ref _on_listening() => Session( - lori.TCPConnectAuth(_h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(_h.env.root), _host, _port), _ZeroRowSelectTestClient(_h), - _host, - _port, "postgres", "postgres", "postgres") @@ -755,10 +746,8 @@ actor \nodoc\ _PrepareShutdownTestListener is lori.TCPListenerActor fun ref _on_listening() => Session( - lori.TCPConnectAuth(_h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(_h.env.root), _host, _port), _PrepareShutdownTestClient(_h), - _host, - _port, "postgres", "postgres", "postgres") @@ -836,10 +825,8 @@ actor \nodoc\ _TerminateSentTestListener is lori.TCPListenerActor fun ref _on_listening() => Session( - lori.TCPConnectAuth(_h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(_h.env.root), _host, _port), _TerminateSentTestNotify(_h), - _host, - _port, "postgres", "postgres", "postgres") @@ -956,14 +943,11 @@ actor \nodoc\ _SSLRefusedTestListener is lori.TCPListenerActor fun ref _on_listening() => Session( - lori.TCPConnectAuth(_h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(_h.env.root), _host, _port, SSLRequired(_sslctx)), _SSLRefusedTestNotify(_h), - _host, - _port, - "postgres", "postgres", "postgres", - SSLRequired(_sslctx)) + "postgres") fun ref _on_listen_failure() => _h.fail("Unable to listen") @@ -1060,14 +1044,11 @@ actor \nodoc\ _SSLJunkTestListener is lori.TCPListenerActor fun ref _on_listening() => Session( - lori.TCPConnectAuth(_h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(_h.env.root), _host, _port, SSLRequired(_sslctx)), _SSLJunkTestNotify(_h), - _host, - _port, - "postgres", "postgres", "postgres", - SSLRequired(_sslctx)) + "postgres") fun ref _on_listen_failure() => _h.fail("Unable to listen") @@ -1190,14 +1171,11 @@ actor \nodoc\ _SSLSuccessTestListener is lori.TCPListenerActor fun ref _on_listening() => Session( - lori.TCPConnectAuth(_h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(_h.env.root), _host, _port, SSLRequired(_client_sslctx)), _SSLSuccessTestNotify(_h), - _host, - _port, - "postgres", "postgres", "postgres", - SSLRequired(_client_sslctx)) + "postgres") fun ref _on_listen_failure() => _h.fail("Unable to listen") @@ -1257,14 +1235,11 @@ class \nodoc\ iso _TestSSLConnect is UnitTest end let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.ssl_host, info.ssl_port, SSLRequired(sslctx)), _ConnectTestNotify(h, true), - info.ssl_host, - info.ssl_port, info.username, info.password, - info.database, - SSLRequired(sslctx)) + info.database) h.dispose_when_done(session) h.long_test(5_000_000_000) @@ -1287,14 +1262,11 @@ class \nodoc\ iso _TestSSLAuthenticate is UnitTest end let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.ssl_host, info.ssl_port, SSLRequired(sslctx)), _AuthenticateTestNotify(h, true), - info.ssl_host, - info.ssl_port, info.username, info.password, - info.database, - SSLRequired(sslctx)) + info.database) h.dispose_when_done(session) h.long_test(5_000_000_000) @@ -1319,14 +1291,11 @@ class \nodoc\ iso _TestSSLQueryResults is UnitTest let client = _ResultsIncludeOriginatingQueryReceiver(h) let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.ssl_host, info.ssl_port, SSLRequired(sslctx)), client, - info.ssl_host, - info.ssl_port, info.username, info.password, - info.database, - SSLRequired(sslctx)) + info.database) h.dispose_when_done(session) h.long_test(5_000_000_000) @@ -1351,14 +1320,385 @@ class \nodoc\ iso _TestSSLRefused is UnitTest end let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port, SSLRequired(sslctx)), _ConnectTestNotify(h, false), - info.host, - info.port, info.username, info.password, - info.database, - SSLRequired(sslctx)) + info.database) h.dispose_when_done(session) h.long_test(5_000_000_000) + +// Cancel query unit test + +class \nodoc\ iso _TestCancelQueryInFlight is UnitTest + """ + Verifies that calling cancel() when a query is in flight opens a separate + TCP connection and sends a valid CancelRequest message containing the + correct process ID and secret key from BackendKeyData. + """ + fun name(): String => + "CancelQueryInFlight" + + fun apply(h: TestHelper) => + let host = "127.0.0.1" + let port = "7675" + + let listener = _CancelTestListener( + lori.TCPListenAuth(h.env.root), + host, + port, + h) + + h.dispose_when_done(listener) + h.long_test(5_000_000_000) + +actor \nodoc\ _CancelTestClient is (SessionStatusNotify & ResultReceiver) + let _h: TestHelper + + new create(h: TestHelper) => + _h = h + + be pg_session_connection_failed(s: Session) => + _h.fail("Unable to establish connection.") + _h.complete(false) + + be pg_session_authenticated(session: Session) => + session.execute(SimpleQuery("SELECT pg_sleep(100)"), this) + session.cancel() + + be pg_session_authentication_failed( + session: Session, + reason: AuthenticationFailureReason) + => + _h.fail("Unable to authenticate.") + _h.complete(false) + + be pg_query_result(session: Session, result: Result) => + None + + be pg_query_failed(session: Session, query: Query, + failure: (ErrorResponseMessage | ClientQueryError)) + => + None + +actor \nodoc\ _CancelTestListener is lori.TCPListenerActor + var _tcp_listener: lori.TCPListener = lori.TCPListener.none() + let _server_auth: lori.TCPServerAuth + let _h: TestHelper + let _host: String + let _port: String + var _connection_count: USize = 0 + + new create(listen_auth: lori.TCPListenAuth, + host: String, + port: String, + h: TestHelper) + => + _host = host + _port = port + _h = h + _server_auth = lori.TCPServerAuth(listen_auth) + _tcp_listener = lori.TCPListener(listen_auth, host, port, this) + + fun ref _listener(): lori.TCPListener => + _tcp_listener + + fun ref _on_accept(fd: U32): _CancelTestServer => + _connection_count = _connection_count + 1 + _CancelTestServer(_server_auth, fd, _h, _connection_count > 1) + + fun ref _on_listening() => + let session = Session( + ServerConnectInfo(lori.TCPConnectAuth(_h.env.root), _host, _port), + _CancelTestClient(_h), + "postgres", + "postgres", + "postgres") + _h.dispose_when_done(session) + + fun ref _on_listen_failure() => + _h.fail("Unable to listen") + _h.complete(false) + +actor \nodoc\ _CancelTestServer + is (lori.TCPConnectionActor & lori.ServerLifecycleEventReceiver) + """ + Mock server that handles two connections: the first is the main session + (authenticates and becomes ready), the second is the cancel sender + (verifies CancelRequest format and content). + """ + var _tcp_connection: lori.TCPConnection = lori.TCPConnection.none() + let _h: TestHelper + let _is_cancel_connection: Bool + var _authed: Bool = false + + new create(auth: lori.TCPServerAuth, fd: U32, h: TestHelper, + is_cancel: Bool) + => + _h = h + _is_cancel_connection = is_cancel + _tcp_connection = lori.TCPConnection.server(auth, fd, this, this) + + fun ref _connection(): lori.TCPConnection => + _tcp_connection + + fun ref _on_received(data: Array[U8] iso) => + if _is_cancel_connection then + // Verify CancelRequest: 16 bytes total + // Int32(16) Int32(80877102) Int32(pid=12345) Int32(key=67890) + if data.size() != 16 then + _h.fail("CancelRequest should be 16 bytes, got " + + data.size().string()) + _h.complete(false) + return + end + + try + // Verify length field: big-endian 16 + if (data(0)? != 0) or (data(1)? != 0) or (data(2)? != 0) + or (data(3)? != 16) then + _h.fail("CancelRequest length field is incorrect") + _h.complete(false) + return + end + + // Verify magic number: big-endian 80877102 = 0x04D2162E + if (data(4)? != 4) or (data(5)? != 210) or (data(6)? != 22) + or (data(7)? != 46) then + _h.fail("CancelRequest magic number is incorrect") + _h.complete(false) + return + end + + // Verify pid: big-endian 12345 = 0x00003039 + if (data(8)? != 0) or (data(9)? != 0) or (data(10)? != 48) + or (data(11)? != 57) then + _h.fail("CancelRequest process_id is incorrect") + _h.complete(false) + return + end + + // Verify key: big-endian 67890 = 0x00010932 + if (data(12)? != 0) or (data(13)? != 1) or (data(14)? != 9) + or (data(15)? != 50) then + _h.fail("CancelRequest secret_key is incorrect") + _h.complete(false) + return + end + + _h.complete(true) + else + _h.fail("Error reading CancelRequest bytes") + _h.complete(false) + end + else + if not _authed then + _authed = true + let auth_ok = _IncomingAuthenticationOkTestMessage.bytes() + let bkd = _IncomingBackendKeyDataTestMessage(12345, 67890).bytes() + let ready = _IncomingReadyForQueryTestMessage('I').bytes() + // Send all auth messages in a single write so the Session processes + // them atomically. If sent separately, TCP may deliver them in + // different reads, causing ReadyForQuery to arrive after the client + // has already called cancel() (which would see _QueryNotReady). + let combined: Array[U8] val = recover val + let arr = Array[U8] + arr.append(auth_ok) + arr.append(bkd) + arr.append(ready) + arr + end + _tcp_connection.send(combined) + end + // After auth, receive query data and hold (don't respond) + end + +// SSL cancel query unit test + +class \nodoc\ iso _TestSSLCancelQueryInFlight is UnitTest + """ + Verifies that calling cancel() on an SSL session opens a separate + SSL-negotiated TCP connection and sends a valid CancelRequest message + containing the correct process ID and secret key. + """ + fun name(): String => + "SSLCancelQueryInFlight" + + fun apply(h: TestHelper) ? => + let host = "127.0.0.1" + let port = "7676" + + let cert_path = FilePath(FileAuth(h.env.root), + "assets/test-cert.pem") + let key_path = FilePath(FileAuth(h.env.root), + "assets/test-key.pem") + + let client_sslctx = recover val + SSLContext + .> set_client_verify(false) + .> set_server_verify(false) + end + + let server_sslctx = recover val + SSLContext + .> set_cert(cert_path, key_path)? + .> set_client_verify(false) + .> set_server_verify(false) + end + + let listener = _SSLCancelTestListener( + lori.TCPListenAuth(h.env.root), + host, + port, + h, + client_sslctx, + server_sslctx) + + h.dispose_when_done(listener) + h.long_test(5_000_000_000) + +actor \nodoc\ _SSLCancelTestListener is lori.TCPListenerActor + var _tcp_listener: lori.TCPListener = lori.TCPListener.none() + let _server_auth: lori.TCPServerAuth + let _h: TestHelper + let _host: String + let _port: String + let _client_sslctx: SSLContext val + let _server_sslctx: SSLContext val + var _connection_count: USize = 0 + + new create(listen_auth: lori.TCPListenAuth, + host: String, + port: String, + h: TestHelper, + client_sslctx: SSLContext val, + server_sslctx: SSLContext val) + => + _host = host + _port = port + _h = h + _client_sslctx = client_sslctx + _server_sslctx = server_sslctx + _server_auth = lori.TCPServerAuth(listen_auth) + _tcp_listener = lori.TCPListener(listen_auth, host, port, this) + + fun ref _listener(): lori.TCPListener => + _tcp_listener + + fun ref _on_accept(fd: U32): _SSLCancelTestServer => + _connection_count = _connection_count + 1 + _SSLCancelTestServer(_server_auth, _server_sslctx, fd, _h, + _connection_count > 1) + + fun ref _on_listening() => + let session = Session( + ServerConnectInfo(lori.TCPConnectAuth(_h.env.root), _host, _port, SSLRequired(_client_sslctx)), + _CancelTestClient(_h), + "postgres", + "postgres", + "postgres") + _h.dispose_when_done(session) + + fun ref _on_listen_failure() => + _h.fail("Unable to listen") + _h.complete(false) + +actor \nodoc\ _SSLCancelTestServer + is (lori.TCPConnectionActor & lori.ServerLifecycleEventReceiver) + """ + Mock SSL server that handles two connections: the first is the main session + (SSL negotiation + authenticate + ready), the second is the cancel sender + (SSL negotiation + verify CancelRequest format and content). + """ + var _tcp_connection: lori.TCPConnection = lori.TCPConnection.none() + let _sslctx: SSLContext val + let _h: TestHelper + let _is_cancel_connection: Bool + var _ssl_started: Bool = false + var _authed: Bool = false + + new create(auth: lori.TCPServerAuth, sslctx: SSLContext val, fd: U32, + h: TestHelper, is_cancel: Bool) + => + _sslctx = sslctx + _h = h + _is_cancel_connection = is_cancel + _tcp_connection = lori.TCPConnection.server(auth, fd, this, this) + + fun ref _connection(): lori.TCPConnection => + _tcp_connection + + fun ref _on_received(data: Array[U8] iso) => + if not _ssl_started then + // SSLRequest — respond 'S' and upgrade to TLS + let response: Array[U8] val = ['S'] + _tcp_connection.send(response) + match _tcp_connection.start_tls(_sslctx) + | None => _ssl_started = true + | let _: lori.StartTLSError => + _tcp_connection.close() + end + elseif _is_cancel_connection then + // Verify CancelRequest: 16 bytes total + // Int32(16) Int32(80877102) Int32(pid=12345) Int32(key=67890) + if data.size() != 16 then + _h.fail("CancelRequest should be 16 bytes, got " + + data.size().string()) + _h.complete(false) + return + end + + try + if (data(0)? != 0) or (data(1)? != 0) or (data(2)? != 0) + or (data(3)? != 16) then + _h.fail("CancelRequest length field is incorrect") + _h.complete(false) + return + end + + if (data(4)? != 4) or (data(5)? != 210) or (data(6)? != 22) + or (data(7)? != 46) then + _h.fail("CancelRequest magic number is incorrect") + _h.complete(false) + return + end + + if (data(8)? != 0) or (data(9)? != 0) or (data(10)? != 48) + or (data(11)? != 57) then + _h.fail("CancelRequest process_id is incorrect") + _h.complete(false) + return + end + + if (data(12)? != 0) or (data(13)? != 1) or (data(14)? != 9) + or (data(15)? != 50) then + _h.fail("CancelRequest secret_key is incorrect") + _h.complete(false) + return + end + + _h.complete(true) + else + _h.fail("Error reading CancelRequest bytes") + _h.complete(false) + end + else + if not _authed then + _authed = true + let auth_ok = _IncomingAuthenticationOkTestMessage.bytes() + let bkd = _IncomingBackendKeyDataTestMessage(12345, 67890).bytes() + let ready = _IncomingReadyForQueryTestMessage('I').bytes() + // Send all auth messages in a single write so the Session processes + // them atomically (same reason as _CancelTestServer). + let combined: Array[U8] val = recover val + let arr = Array[U8] + arr.append(auth_ok) + arr.append(bkd) + arr.append(ready) + arr + end + _tcp_connection.send(combined) + end + // After auth, receive query data and hold (don't respond) + end diff --git a/postgres/_test_frontend_message.pony b/postgres/_test_frontend_message.pony index 6ba11bf..73a1ea6 100644 --- a/postgres/_test_frontend_message.pony +++ b/postgres/_test_frontend_message.pony @@ -215,6 +215,22 @@ class \nodoc\ iso _TestFrontendMessageSSLRequest is UnitTest h.assert_array_eq[U8](expected, _FrontendMessage.ssl_request()) +class \nodoc\ iso _TestFrontendMessageCancelRequest is UnitTest + fun name(): String => + "FrontendMessage/CancelRequest" + + fun apply(h: TestHelper) => + // CancelRequest: Int32(16) Int32(80877102) Int32(pid) Int32(key) + // No message type byte, 16 bytes total. + // 80877102 = 0x04D2162E + // pid = 12345 = 0x00003039 + // key = 67890 = 0x00010932 + let expected: Array[U8] = + [ 0; 0; 0; 16; 4; 210; 22; 46; 0; 0; 48; 57; 0; 1; 9; 50 ] + + h.assert_array_eq[U8](expected, + _FrontendMessage.cancel_request(12345, 67890)) + class \nodoc\ iso _TestFrontendMessageTerminate is UnitTest fun name(): String => "FrontendMessage/Terminate" diff --git a/postgres/_test_query.pony b/postgres/_test_query.pony index 4f1a8bf..eaa37bd 100644 --- a/postgres/_test_query.pony +++ b/postgres/_test_query.pony @@ -1,5 +1,6 @@ use lori = "lori" use "pony_test" +use "ssl/net" class \nodoc\ iso _TestQueryResults is UnitTest fun name(): String => @@ -11,10 +12,8 @@ class \nodoc\ iso _TestQueryResults is UnitTest let client = _ResultsIncludeOriginatingQueryReceiver(h) let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), client, - info.host, - info.port, info.username, info.password, info.database) @@ -100,10 +99,8 @@ class \nodoc\ iso _TestQueryAfterAuthenticationFailure is UnitTest let info = _ConnectionTestConfiguration(h.env.vars) let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), _QueryAfterAuthenticationFailureNotify(h), - info.host, - info.port, info.username, info.password + " " + info.password, info.database) @@ -157,10 +154,8 @@ class \nodoc\ iso _TestQueryAfterConnectionFailure is UnitTest let host = ifdef linux then "127.0.0.2" else "localhost" end let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), host, info.port.reverse()), _QueryAfterConnectionFailureNotify(h), - host, - info.port.reverse(), info.username, info.password, info.database) @@ -209,10 +204,8 @@ class \nodoc\ iso _TestQueryAfterSessionHasBeenClosed is UnitTest let info = _ConnectionTestConfiguration(h.env.vars) let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), _QueryAfterSessionHasBeenClosedNotify(h), - info.host, - info.port, info.username, info.password, info.database) @@ -265,10 +258,8 @@ class \nodoc\ iso _TestQueryOfNonExistentTable is UnitTest let client = _NonExistentTableQueryReceiver(h) let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), client, - info.host, - info.port, info.username, info.password, info.database) @@ -389,10 +380,8 @@ actor \nodoc\ _AllSuccessQueryRunningClient is _queries = consume queries _session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) @@ -452,10 +441,8 @@ class \nodoc\ iso _TestEmptyQuery is UnitTest let client = _EmptyQueryReceiver(h) let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), client, - info.host, - info.port, info.username, info.password, info.database) @@ -512,10 +499,8 @@ class \nodoc\ iso _TestZeroRowSelect is UnitTest let client = _ZeroRowSelectReceiver(h) let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), client, - info.host, - info.port, info.username, info.password, info.database) @@ -598,10 +583,8 @@ actor \nodoc\ _MultiStatementMixedClient is _h = h _session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) @@ -703,10 +686,8 @@ class \nodoc\ iso _TestPreparedQueryResults is UnitTest let client = _PreparedQueryResultsReceiver(h) let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), client, - info.host, - info.port, info.username, info.password, info.database) @@ -796,10 +777,8 @@ class \nodoc\ iso _TestPreparedQueryNullParam is UnitTest let client = _PreparedQueryNullParamReceiver(h) let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), client, - info.host, - info.port, info.username, info.password, info.database) @@ -884,10 +863,8 @@ class \nodoc\ iso _TestPreparedQueryNonExistentTable is UnitTest let client = _PreparedQueryNonExistentTableReceiver(h) let session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), client, - info.host, - info.port, info.username, info.password, info.database) @@ -959,10 +936,8 @@ actor \nodoc\ _PreparedQueryInsertAndDeleteClient is _h = h _session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) @@ -1083,10 +1058,8 @@ actor \nodoc\ _PreparedQueryMixedClient is _h = h _session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) @@ -1206,10 +1179,8 @@ actor \nodoc\ _PrepareStatementClient is _h = h _session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) @@ -1266,10 +1237,8 @@ actor \nodoc\ _PrepareAndExecuteClient is _h = h _session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) @@ -1352,10 +1321,8 @@ actor \nodoc\ _PrepareAndExecuteMultipleClient is _h = h _session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) @@ -1460,10 +1427,8 @@ actor \nodoc\ _PrepareAndCloseClient is _h = h _session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) @@ -1533,10 +1498,8 @@ actor \nodoc\ _PrepareFailsClient is _h = h _session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) @@ -1599,10 +1562,8 @@ actor \nodoc\ _PrepareAfterCloseClient is _h = h _session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) @@ -1697,10 +1658,8 @@ actor \nodoc\ _CloseNonexistentClient is _h = h _session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) @@ -1754,10 +1713,8 @@ actor \nodoc\ _PrepareDuplicateNameClient is _h = h _session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) @@ -1827,10 +1784,8 @@ actor \nodoc\ _MixedAllThreeClient is _h = h _session = Session( - lori.TCPConnectAuth(h.env.root), + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), this, - info.host, - info.port, info.username, info.password, info.database) @@ -1917,3 +1872,107 @@ actor \nodoc\ _MixedAllThreeClient is be dispose() => _session.close() + +// Cancel integration test + +class \nodoc\ iso _TestCancelPgSleep is UnitTest + """ + Verifies that cancelling a long-running query on a real PostgreSQL server + produces a query failure with SQLSTATE 57014 (query_canceled). + """ + fun name(): String => + "integration/Cancel/Query" + + fun apply(h: TestHelper) => + let info = _ConnectionTestConfiguration(h.env.vars) + + let client = _CancelPgSleepClient(h) + + let session = Session( + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.host, info.port), + client, + info.username, + info.password, + info.database) + + h.dispose_when_done(session) + h.long_test(10_000_000_000) + +actor \nodoc\ _CancelPgSleepClient is + ( SessionStatusNotify + & ResultReceiver ) + let _h: TestHelper + let _query: SimpleQuery + + new create(h: TestHelper) => + _h = h + _query = SimpleQuery("SELECT pg_sleep(30)") + + be pg_session_authenticated(session: Session) => + session.execute(_query, this) + session.cancel() + + be pg_session_authentication_failed( + session: Session, + reason: AuthenticationFailureReason) + => + _h.fail("Unable to authenticate.") + _h.complete(false) + + be pg_query_result(session: Session, result: Result) => + _h.fail("Expected query to be cancelled, but got a result.") + _h.complete(false) + + be pg_query_failed(session: Session, query: Query, + failure: (ErrorResponseMessage | ClientQueryError)) + => + if query isnt _query then + _h.fail("Got failure for unexpected query.") + _h.complete(false) + return + end + + match failure + | let err: ErrorResponseMessage => + if err.code == "57014" then + _h.complete(true) + else + _h.fail("Expected SQLSTATE 57014 but got " + err.code) + _h.complete(false) + end + | let ce: ClientQueryError => + _h.fail("Expected ErrorResponseMessage but got ClientQueryError.") + _h.complete(false) + end + +// SSL cancel integration test + +class \nodoc\ iso _TestCancelSSLPgSleep is UnitTest + """ + Verifies that cancelling a long-running query on a real PostgreSQL server + over an SSL-encrypted connection produces a query failure with SQLSTATE + 57014 (query_canceled). + """ + fun name(): String => + "integration/SSL/Cancel" + + fun apply(h: TestHelper) => + let info = _ConnectionTestConfiguration(h.env.vars) + + let sslctx = recover val + SSLContext + .> set_client_verify(false) + .> set_server_verify(false) + end + + let client = _CancelPgSleepClient(h) + + let session = Session( + ServerConnectInfo(lori.TCPConnectAuth(h.env.root), info.ssl_host, info.ssl_port, SSLRequired(sslctx)), + client, + info.username, + info.password, + info.database) + + h.dispose_when_done(session) + h.long_test(10_000_000_000) diff --git a/postgres/server_connect_info.pony b/postgres/server_connect_info.pony new file mode 100644 index 0000000..19eebf7 --- /dev/null +++ b/postgres/server_connect_info.pony @@ -0,0 +1,19 @@ +use lori = "lori" + +class val ServerConnectInfo + """ + Connection parameters needed to reach the PostgreSQL server. Grouped because + they are always used together — individually they have no meaning. + """ + let auth: lori.TCPConnectAuth + let host: String + let service: String + let ssl_mode: SSLMode + + new val create(auth': lori.TCPConnectAuth, host': String, service': String, + ssl_mode': SSLMode = SSLDisabled) + => + auth = auth' + host = host' + service = service' + ssl_mode = ssl_mode' diff --git a/postgres/session.pony b/postgres/session.pony index 5f139a4..f815da1 100644 --- a/postgres/session.pony +++ b/postgres/session.pony @@ -5,23 +5,23 @@ use "ssl/net" actor Session is (lori.TCPConnectionActor & lori.ClientLifecycleEventReceiver) var state: _SessionState var _tcp_connection: lori.TCPConnection = lori.TCPConnection.none() + let _server_connect_info: ServerConnectInfo new create( - auth': lori.TCPConnectAuth, + server_connect_info': ServerConnectInfo, notify': SessionStatusNotify, - host': String, - service': String, user': String, password': String, - database': String, - ssl': SSLMode = SSLDisabled) + database': String) => + _server_connect_info = server_connect_info' state = _SessionUnopened(notify', user', password', database', - ssl', host') + server_connect_info'.ssl_mode, server_connect_info'.host) - _tcp_connection = lori.TCPConnection.client(auth', - host', - service', + _tcp_connection = lori.TCPConnection.client( + server_connect_info'.auth, + server_connect_info'.host, + server_connect_info'.service, "", this, this) @@ -47,6 +47,18 @@ actor Session is (lori.TCPConnectionActor & lori.ClientLifecycleEventReceiver) """ state.close_statement(this, name) + be cancel() => + """ + Request cancellation of the currently executing query. Opens a separate + TCP connection to send a PostgreSQL CancelRequest. Cancellation is + best-effort — the server may or may not honor it. If cancelled, the + query's ResultReceiver receives `pg_query_failed` with an ErrorResponse + (SQLSTATE 57014). Queued queries are not affected. + + Safe to call in any session state. No-op if no query is in flight. + """ + state.cancel(this) + be close() => """ Close the connection. Sends a Terminate message to the server before @@ -81,6 +93,9 @@ actor Session is (lori.TCPConnectionActor & lori.ClientLifecycleEventReceiver) fun ref _connection(): lori.TCPConnection => _tcp_connection + fun server_connect_info(): ServerConnectInfo => + _server_connect_info + // Possible session states class ref _SessionUnopened is _ConnectableState let _notify: SessionStatusNotify @@ -231,6 +246,9 @@ class ref _SessionSSLNegotiating fun ref close_statement(s: Session ref, name: String) => None + fun ref cancel(s: Session ref) => + None + fun ref close(s: Session ref) => _shutdown(s) @@ -365,6 +383,15 @@ class _SessionLoggedIn is _AuthenticatedState fun ref on_row_description(s: Session ref, msg: _RowDescriptionMessage) => query_state.on_row_description(s, this, msg) + fun ref cancel(s: Session ref) => + match query_state + | let _: _QueryReady => None + | let _: _QueryNotReady => None + else + _CancelSender(s.server_connect_info(), + backend_pid, backend_secret_key) + end + fun ref execute(s: Session ref, query: Query, receiver: ResultReceiver) @@ -943,6 +970,12 @@ interface _SessionState Called if the server requests we autheticate using the Postgres MD5 password scheme. """ + fun ref cancel(s: Session ref) + """ + The client requested query cancellation. Like `close`, this should never + be an illegal state — it should be silently ignored when not applicable. + """ + fun ref close(s: Session ref) """ The client received a message to close. Unlike `shutdown`, this should never @@ -1086,6 +1119,9 @@ trait _ConnectedState is _NotConnectableState fun ref process_responses(s: Session ref) => _ResponseMessageParser(s, readbuf()) + fun ref cancel(s: Session ref) => + None + fun ref close(s: Session ref) => shutdown(s) @@ -1127,6 +1163,9 @@ trait _UnconnectedState is (_NotAuthenticableState & _NotAuthenticated) fun ref process_responses(s: Session ref) => None + fun ref cancel(s: Session ref) => + None + fun ref close(s: Session ref) => None