Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ This design makes illegal state transitions call `_IllegalState()` (panic) by de
5. `_CommandCompleteMessage` triggers result delivery to receiver
6. `_ReadyForQueryMessage` dequeues completed operation, transitions to `_QueryReady` (idle) or `_QueryNotReady` (non-idle)

Only one operation is in-flight at a time. The queue serializes execution. `query_queue` and `query_state` are non-underscore-prefixed fields on `_SessionLoggedIn` because the `_QueryState` implementations need cross-type access (Pony private fields are type-private).
Only one operation is in-flight at a time. The queue serializes execution. `query_queue`, `query_state`, `backend_pid`, and `backend_secret_key` are non-underscore-prefixed fields on `_SessionLoggedIn` because other types in this package need cross-type access (Pony private fields are type-private).

### Protocol Layer

Expand All @@ -80,7 +80,7 @@ Only one operation is in-flight at a time. The queue serializes execution. `quer
- `_ResponseParser` primitive: incremental parser consuming from a `Reader` buffer. Returns one parsed message per call, `None` if incomplete, errors on junk.
- `_ResponseMessageParser` primitive: routes parsed messages to the current session state's callbacks. Processes messages synchronously within a query cycle (looping until `ReadyForQuery` or buffer exhaustion), then yields via `s._process_again()` between cycles. This prevents behaviors like `close()` from interleaving between result delivery and query dequeuing. If a callback triggers shutdown during the loop, `on_shutdown` clears the read buffer, causing the next parse to return `None` and exit the loop.

**Supported message types:** AuthenticationOk, AuthenticationMD5Password, CommandComplete, DataRow, EmptyQueryResponse, ErrorResponse, ReadyForQuery, RowDescription, ParseComplete, BindComplete, NoData, CloseComplete, ParameterDescription, PortalSuspended. Extended query acknowledgment messages (ParseComplete, BindComplete, NoData, etc.) are parsed but silently consumed — they fall through the `_ResponseMessageParser` match without routing since the state machine tracks query lifecycle through data-carrying messages only.
**Supported message types:** AuthenticationOk, AuthenticationMD5Password, BackendKeyData, CommandComplete, DataRow, EmptyQueryResponse, ErrorResponse, ReadyForQuery, RowDescription, ParseComplete, BindComplete, NoData, CloseComplete, ParameterDescription, PortalSuspended. BackendKeyData is parsed and stored in `_SessionLoggedIn` (`backend_pid`, `backend_secret_key`) for future query cancellation. Extended query acknowledgment messages (ParseComplete, BindComplete, NoData, etc.) are parsed but silently consumed — they fall through the `_ResponseMessageParser` match without routing since the state machine tracks query lifecycle through data-carrying messages only.

### Public API Types

Expand Down
13 changes: 13 additions & 0 deletions postgres/_backend_messages.pony
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ primitive _AuthenticationOkMessage
authenticated.
"""

class val _BackendKeyDataMessage
"""
Message from the backend containing the process ID and secret key for this
session. Sent once during startup, after AuthenticationOk and before
ReadyForQuery. Required for query cancellation via CancelRequest.
"""
let process_id: I32
let secret_key: I32

new val create(process_id': I32, secret_key': I32) =>
process_id = process_id'
secret_key = secret_key'

class val _CommandCompleteMessage
"""
Messagr from the backend that indicates that a command has finished running.
Expand Down
3 changes: 3 additions & 0 deletions postgres/_message_type.pony
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,8 @@ primitive _MessageType
fun ready_for_query(): U8 =>
'Z'

fun backend_key_data(): U8 =>
'K'

fun row_description(): U8 =>
'T'
2 changes: 2 additions & 0 deletions postgres/_response_message_parser.pony
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ primitive _ResponseMessageParser
return
| let msg: _RowDescriptionMessage =>
s.state.on_row_description(s, msg)
| let msg: _BackendKeyDataMessage =>
s.state.on_backend_key_data(s, msg)
| let msg: _EmptyQueryResponseMessage =>
s.state.on_empty_query_response(s)
| None =>
Expand Down
6 changes: 6 additions & 0 deletions postgres/_response_parser.pony
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type _AuthenticationMessages is

type _ResponseParserResult is
( _AuthenticationMessages
| _BackendKeyDataMessage
| _CommandCompleteMessage
| _DataRowMessage
| _EmptyQueryResponseMessage
Expand Down Expand Up @@ -146,6 +147,11 @@ primitive _ResponseParser
// and parse the parameter description payload
let payload = buffer.block(payload_size)?
return _parameter_description(consume payload)?
| _MessageType.backend_key_data() =>
buffer.skip(5)?
let process_id = buffer.i32_be()?
let secret_key = buffer.i32_be()?
return _BackendKeyDataMessage(process_id, secret_key)
else
buffer.skip(message_size)?
return _UnsupportedMessage
Expand Down
2 changes: 2 additions & 0 deletions postgres/_test.pony
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ actor \nodoc\ Main is TestList
test(_TestResponseParserCloseCompleteMessage)
test(_TestResponseParserParameterDescriptionMessage)
test(_TestResponseParserPortalSuspendedMessage)
test(_TestResponseParserBackendKeyDataMessage)
test(_TestResponseParserDigitMessageTypeNotJunk)
test(_TestResponseParserMultipleMessagesParseCompleteFirst)
test(_TestResponseParserMultipleMessagesBackendKeyDataFirst)
test(_TestFrontendMessageParse)
test(_TestFrontendMessageParseWithTypes)
test(_TestFrontendMessageBind)
Expand Down
69 changes: 69 additions & 0 deletions postgres/_test_response_parser.pony
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,75 @@ class \nodoc\ iso
h.fail("Wrong message for ReadyForQuery.")
end

class \nodoc\ iso _TestResponseParserBackendKeyDataMessage is UnitTest
"""
Verify that BackendKeyData messages are parsed correctly, extracting the
process ID and secret key.
"""
fun name(): String =>
"ResponseParser/BackendKeyDataMessage"

fun apply(h: TestHelper) ? =>
let pid: I32 = 12345
let secret: I32 = 67890
let bytes = _IncomingBackendKeyDataTestMessage(pid, secret).bytes()
let r: Reader = Reader.>append(bytes)

match _ResponseParser(r)?
| let m: _BackendKeyDataMessage =>
h.assert_eq[I32](pid, m.process_id)
h.assert_eq[I32](secret, m.secret_key)
else
h.fail("Wrong message returned.")
end

class \nodoc\ iso
_TestResponseParserMultipleMessagesBackendKeyDataFirst
is UnitTest
"""
Verify correct buffer advancement after a BackendKeyData message followed
by a ReadyForQuery message.
"""
fun name(): String =>
"ResponseParser/MultipleMessages/BackendKeyDataFirst"

fun apply(h: TestHelper) ? =>
let r: Reader = Reader
r.append(_IncomingBackendKeyDataTestMessage(42, 99).bytes())
r.append(_IncomingReadyForQueryTestMessage('I').bytes())

match _ResponseParser(r)?
| let m: _BackendKeyDataMessage =>
h.assert_eq[I32](42, m.process_id)
h.assert_eq[I32](99, m.secret_key)
else
h.fail("Wrong message returned for first message.")
end

match _ResponseParser(r)?
| let m: _ReadyForQueryMessage =>
if not m.idle() then
h.fail("Expected idle status.")
end
else
h.fail("Wrong message returned for second message.")
end

class \nodoc\ val _IncomingBackendKeyDataTestMessage
let _bytes: Array[U8] val

new val create(process_id: I32, secret_key: I32) =>
let wb: Writer = Writer
wb.u8(_MessageType.backend_key_data())
wb.u32_be(12)
wb.i32_be(process_id)
wb.i32_be(secret_key)

_bytes = WriterToByteArray(wb)

fun bytes(): Array[U8] val =>
_bytes

class \nodoc\ val _IncomingJunkTestMessage
"""
Creates a junk message where "junk" is currently defined as having a message
Expand Down
21 changes: 18 additions & 3 deletions postgres/session.pony
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,13 @@ class _SessionLoggedIn is _AuthenticatedState
managed by a sub-state machine (`_QueryState`) that tracks whether a query
is in flight, what protocol is active, and owns per-query accumulation data.
"""
// query_queue and query_state are not underscore-prefixed because the
// _QueryState implementations need to access them, and Pony private fields
// are type-private.
// query_queue, query_state, backend_pid, and backend_secret_key are not
// underscore-prefixed because other types in this package need access, and
// Pony private fields are type-private.
let query_queue: Array[_QueueItem] = query_queue.create()
var query_state: _QueryState
var backend_pid: I32 = 0
var backend_secret_key: I32 = 0
let _notify: SessionStatusNotify
let _readbuf: Reader

Expand All @@ -341,6 +343,10 @@ class _SessionLoggedIn is _AuthenticatedState
_readbuf = readbuf'
query_state = _QueryNotReady

fun ref on_backend_key_data(s: Session ref, msg: _BackendKeyDataMessage) =>
backend_pid = msg.process_id
backend_secret_key = msg.secret_key

fun ref on_ready_for_query(s: Session ref, msg: _ReadyForQueryMessage) =>
query_state.on_ready_for_query(s, this, msg)

Expand Down Expand Up @@ -1004,6 +1010,12 @@ interface _SessionState
Called when a data row is received from the server.
"""

fun ref on_backend_key_data(s: Session ref, msg: _BackendKeyDataMessage)
"""
Called when the server sends BackendKeyData during startup. Contains the
process ID and secret key needed for query cancellation.
"""

fun ref on_row_description(s: Session ref, msg: _RowDescriptionMessage)
"""
Called when a row description is receivedfrom the server.
Expand Down Expand Up @@ -1182,6 +1194,9 @@ trait _NotAuthenticated
A session that has yet to be authenticated. Before being authenticated, then
all "query related" commands should not be received.
"""
fun ref on_backend_key_data(s: Session ref, msg: _BackendKeyDataMessage) =>
_IllegalState()

fun ref on_command_complete(s: Session ref, msg: _CommandCompleteMessage) =>
_IllegalState()

Expand Down