Skip to content

Commit be52c51

Browse files
authored
Parse and store BackendKeyData message (#87)
PostgreSQL sends BackendKeyData (`K`) once during startup, after AuthenticationOk and before ReadyForQuery, containing the backend process ID and secret key. These were previously silently dropped as unsupported messages. Now parsed and stored in `_SessionLoggedIn` for future use by query cancellation (CancelRequest). Design: #72
1 parent 89afb38 commit be52c51

File tree

8 files changed

+115
-5
lines changed

8 files changed

+115
-5
lines changed

CLAUDE.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ This design makes illegal state transitions call `_IllegalState()` (panic) by de
6969
5. `_CommandCompleteMessage` triggers result delivery to receiver
7070
6. `_ReadyForQueryMessage` dequeues completed operation, transitions to `_QueryReady` (idle) or `_QueryNotReady` (non-idle)
7171

72-
Only one operation is in-flight at a time. The queue serializes execution. `query_queue` and `query_state` are non-underscore-prefixed fields on `_SessionLoggedIn` because the `_QueryState` implementations need cross-type access (Pony private fields are type-private).
72+
Only one operation is in-flight at a time. The queue serializes execution. `query_queue`, `query_state`, `backend_pid`, and `backend_secret_key` are non-underscore-prefixed fields on `_SessionLoggedIn` because other types in this package need cross-type access (Pony private fields are type-private).
7373

7474
### Protocol Layer
7575

@@ -80,7 +80,7 @@ Only one operation is in-flight at a time. The queue serializes execution. `quer
8080
- `_ResponseParser` primitive: incremental parser consuming from a `Reader` buffer. Returns one parsed message per call, `None` if incomplete, errors on junk.
8181
- `_ResponseMessageParser` primitive: routes parsed messages to the current session state's callbacks. Processes messages synchronously within a query cycle (looping until `ReadyForQuery` or buffer exhaustion), then yields via `s._process_again()` between cycles. This prevents behaviors like `close()` from interleaving between result delivery and query dequeuing. If a callback triggers shutdown during the loop, `on_shutdown` clears the read buffer, causing the next parse to return `None` and exit the loop.
8282

83-
**Supported message types:** AuthenticationOk, AuthenticationMD5Password, CommandComplete, DataRow, EmptyQueryResponse, ErrorResponse, ReadyForQuery, RowDescription, ParseComplete, BindComplete, NoData, CloseComplete, ParameterDescription, PortalSuspended. Extended query acknowledgment messages (ParseComplete, BindComplete, NoData, etc.) are parsed but silently consumed — they fall through the `_ResponseMessageParser` match without routing since the state machine tracks query lifecycle through data-carrying messages only.
83+
**Supported message types:** AuthenticationOk, AuthenticationMD5Password, BackendKeyData, CommandComplete, DataRow, EmptyQueryResponse, ErrorResponse, ReadyForQuery, RowDescription, ParseComplete, BindComplete, NoData, CloseComplete, ParameterDescription, PortalSuspended. BackendKeyData is parsed and stored in `_SessionLoggedIn` (`backend_pid`, `backend_secret_key`) for future query cancellation. Extended query acknowledgment messages (ParseComplete, BindComplete, NoData, etc.) are parsed but silently consumed — they fall through the `_ResponseMessageParser` match without routing since the state machine tracks query lifecycle through data-carrying messages only.
8484

8585
### Public API Types
8686

postgres/_backend_messages.pony

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,19 @@ primitive _AuthenticationOkMessage
1414
authenticated.
1515
"""
1616

17+
class val _BackendKeyDataMessage
18+
"""
19+
Message from the backend containing the process ID and secret key for this
20+
session. Sent once during startup, after AuthenticationOk and before
21+
ReadyForQuery. Required for query cancellation via CancelRequest.
22+
"""
23+
let process_id: I32
24+
let secret_key: I32
25+
26+
new val create(process_id': I32, secret_key': I32) =>
27+
process_id = process_id'
28+
secret_key = secret_key'
29+
1730
class val _CommandCompleteMessage
1831
"""
1932
Messagr from the backend that indicates that a command has finished running.

postgres/_message_type.pony

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,8 @@ primitive _MessageType
2525
fun ready_for_query(): U8 =>
2626
'Z'
2727

28+
fun backend_key_data(): U8 =>
29+
'K'
30+
2831
fun row_description(): U8 =>
2932
'T'

postgres/_response_message_parser.pony

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ primitive _ResponseMessageParser
4242
return
4343
| let msg: _RowDescriptionMessage =>
4444
s.state.on_row_description(s, msg)
45+
| let msg: _BackendKeyDataMessage =>
46+
s.state.on_backend_key_data(s, msg)
4547
| let msg: _EmptyQueryResponseMessage =>
4648
s.state.on_empty_query_response(s)
4749
| None =>

postgres/_response_parser.pony

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ type _AuthenticationMessages is
77

88
type _ResponseParserResult is
99
( _AuthenticationMessages
10+
| _BackendKeyDataMessage
1011
| _CommandCompleteMessage
1112
| _DataRowMessage
1213
| _EmptyQueryResponseMessage
@@ -146,6 +147,11 @@ primitive _ResponseParser
146147
// and parse the parameter description payload
147148
let payload = buffer.block(payload_size)?
148149
return _parameter_description(consume payload)?
150+
| _MessageType.backend_key_data() =>
151+
buffer.skip(5)?
152+
let process_id = buffer.i32_be()?
153+
let secret_key = buffer.i32_be()?
154+
return _BackendKeyDataMessage(process_id, secret_key)
149155
else
150156
buffer.skip(message_size)?
151157
return _UnsupportedMessage

postgres/_test.pony

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,10 @@ actor \nodoc\ Main is TestList
5050
test(_TestResponseParserCloseCompleteMessage)
5151
test(_TestResponseParserParameterDescriptionMessage)
5252
test(_TestResponseParserPortalSuspendedMessage)
53+
test(_TestResponseParserBackendKeyDataMessage)
5354
test(_TestResponseParserDigitMessageTypeNotJunk)
5455
test(_TestResponseParserMultipleMessagesParseCompleteFirst)
56+
test(_TestResponseParserMultipleMessagesBackendKeyDataFirst)
5557
test(_TestFrontendMessageParse)
5658
test(_TestFrontendMessageParseWithTypes)
5759
test(_TestFrontendMessageBind)

postgres/_test_response_parser.pony

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,75 @@ class \nodoc\ iso
634634
h.fail("Wrong message for ReadyForQuery.")
635635
end
636636

637+
class \nodoc\ iso _TestResponseParserBackendKeyDataMessage is UnitTest
638+
"""
639+
Verify that BackendKeyData messages are parsed correctly, extracting the
640+
process ID and secret key.
641+
"""
642+
fun name(): String =>
643+
"ResponseParser/BackendKeyDataMessage"
644+
645+
fun apply(h: TestHelper) ? =>
646+
let pid: I32 = 12345
647+
let secret: I32 = 67890
648+
let bytes = _IncomingBackendKeyDataTestMessage(pid, secret).bytes()
649+
let r: Reader = Reader.>append(bytes)
650+
651+
match _ResponseParser(r)?
652+
| let m: _BackendKeyDataMessage =>
653+
h.assert_eq[I32](pid, m.process_id)
654+
h.assert_eq[I32](secret, m.secret_key)
655+
else
656+
h.fail("Wrong message returned.")
657+
end
658+
659+
class \nodoc\ iso
660+
_TestResponseParserMultipleMessagesBackendKeyDataFirst
661+
is UnitTest
662+
"""
663+
Verify correct buffer advancement after a BackendKeyData message followed
664+
by a ReadyForQuery message.
665+
"""
666+
fun name(): String =>
667+
"ResponseParser/MultipleMessages/BackendKeyDataFirst"
668+
669+
fun apply(h: TestHelper) ? =>
670+
let r: Reader = Reader
671+
r.append(_IncomingBackendKeyDataTestMessage(42, 99).bytes())
672+
r.append(_IncomingReadyForQueryTestMessage('I').bytes())
673+
674+
match _ResponseParser(r)?
675+
| let m: _BackendKeyDataMessage =>
676+
h.assert_eq[I32](42, m.process_id)
677+
h.assert_eq[I32](99, m.secret_key)
678+
else
679+
h.fail("Wrong message returned for first message.")
680+
end
681+
682+
match _ResponseParser(r)?
683+
| let m: _ReadyForQueryMessage =>
684+
if not m.idle() then
685+
h.fail("Expected idle status.")
686+
end
687+
else
688+
h.fail("Wrong message returned for second message.")
689+
end
690+
691+
class \nodoc\ val _IncomingBackendKeyDataTestMessage
692+
let _bytes: Array[U8] val
693+
694+
new val create(process_id: I32, secret_key: I32) =>
695+
let wb: Writer = Writer
696+
wb.u8(_MessageType.backend_key_data())
697+
wb.u32_be(12)
698+
wb.i32_be(process_id)
699+
wb.i32_be(secret_key)
700+
701+
_bytes = WriterToByteArray(wb)
702+
703+
fun bytes(): Array[U8] val =>
704+
_bytes
705+
637706
class \nodoc\ val _IncomingJunkTestMessage
638707
"""
639708
Creates a junk message where "junk" is currently defined as having a message

postgres/session.pony

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -328,11 +328,13 @@ class _SessionLoggedIn is _AuthenticatedState
328328
managed by a sub-state machine (`_QueryState`) that tracks whether a query
329329
is in flight, what protocol is active, and owns per-query accumulation data.
330330
"""
331-
// query_queue and query_state are not underscore-prefixed because the
332-
// _QueryState implementations need to access them, and Pony private fields
333-
// are type-private.
331+
// query_queue, query_state, backend_pid, and backend_secret_key are not
332+
// underscore-prefixed because other types in this package need access, and
333+
// Pony private fields are type-private.
334334
let query_queue: Array[_QueueItem] = query_queue.create()
335335
var query_state: _QueryState
336+
var backend_pid: I32 = 0
337+
var backend_secret_key: I32 = 0
336338
let _notify: SessionStatusNotify
337339
let _readbuf: Reader
338340

@@ -341,6 +343,10 @@ class _SessionLoggedIn is _AuthenticatedState
341343
_readbuf = readbuf'
342344
query_state = _QueryNotReady
343345

346+
fun ref on_backend_key_data(s: Session ref, msg: _BackendKeyDataMessage) =>
347+
backend_pid = msg.process_id
348+
backend_secret_key = msg.secret_key
349+
344350
fun ref on_ready_for_query(s: Session ref, msg: _ReadyForQueryMessage) =>
345351
query_state.on_ready_for_query(s, this, msg)
346352

@@ -1004,6 +1010,12 @@ interface _SessionState
10041010
Called when a data row is received from the server.
10051011
"""
10061012

1013+
fun ref on_backend_key_data(s: Session ref, msg: _BackendKeyDataMessage)
1014+
"""
1015+
Called when the server sends BackendKeyData during startup. Contains the
1016+
process ID and secret key needed for query cancellation.
1017+
"""
1018+
10071019
fun ref on_row_description(s: Session ref, msg: _RowDescriptionMessage)
10081020
"""
10091021
Called when a row description is receivedfrom the server.
@@ -1182,6 +1194,9 @@ trait _NotAuthenticated
11821194
A session that has yet to be authenticated. Before being authenticated, then
11831195
all "query related" commands should not be received.
11841196
"""
1197+
fun ref on_backend_key_data(s: Session ref, msg: _BackendKeyDataMessage) =>
1198+
_IllegalState()
1199+
11851200
fun ref on_command_complete(s: Session ref, msg: _CommandCompleteMessage) =>
11861201
_IllegalState()
11871202

0 commit comments

Comments
 (0)