Skip to content

Commit 0a0ec6c

Browse files
committed
Add COPY TO STDOUT support
Implement the server-driven COPY OUT protocol, completing COPY support alongside the existing COPY IN. The server pushes data via CopyData messages after a COPY ... TO STDOUT query, which the new CopyOutReceiver interface delivers through pg_copy_data/pg_copy_complete/pg_copy_failed callbacks. The implementation follows the same state machine pattern as COPY IN: _QueuedCopyOut queued item, _CopyOutInFlight query state, three new backend message types (CopyOutResponse, CopyData, CopyDone) parsed and routed through the existing _ResponseParser/_ResponseMessageParser pipeline, and full shutdown drain support to prevent double-notification.
1 parent 8c08932 commit 0a0ec6c

15 files changed

+1518
-12
lines changed

.release-notes/add-copy-out.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
## Add COPY TO STDOUT support
2+
3+
You can now bulk-export data from PostgreSQL using `COPY ... TO STDOUT`. Call `session.copy_out()` with a COPY SQL statement and a `CopyOutReceiver` to start the operation. The server drives the flow — data arrives via `pg_copy_data` callbacks, and `pg_copy_complete` fires when all data has been delivered.
4+
5+
```pony
6+
actor Exporter is (SessionStatusNotify & ResultReceiver & CopyOutReceiver)
7+
var _buffer: Array[U8] iso = recover iso Array[U8] end
8+
9+
be pg_session_authenticated(session: Session) =>
10+
session.copy_out("COPY my_table TO STDOUT", this)
11+
12+
be pg_copy_data(session: Session, data: Array[U8] val) =>
13+
_buffer.append(data)
14+
15+
be pg_copy_complete(session: Session, count: USize) =>
16+
let received = String.from_iso_array(
17+
_buffer = recover iso Array[U8] end)
18+
_env.out.print("Exported " + count.string() + " rows")
19+
_env.out.print(received)
20+
21+
be pg_copy_failed(session: Session,
22+
failure: (ErrorResponseMessage | ClientQueryError))
23+
=>
24+
// handle error
25+
```
26+
27+
Data format depends on the COPY command — the default is tab-delimited text with newline row terminators. Data chunks do not necessarily align with row boundaries; the receiver should buffer chunks if row-level processing is needed.

CLAUDE.md

Lines changed: 23 additions & 10 deletions
Large diffs are not rendered by default.

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ Some functionality that isn't yet supported is:
5151
* Supplying connection configuration to the server
5252
* Pipelining queries
5353
* Function calls
54-
* COPY operations
5554

5655
Note the appearance of an item on the above list isn't a guarantee that it will be supported in the future.
5756

examples/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ Asynchronous notifications using PostgreSQL's LISTEN/NOTIFY mechanism. Subscribe
3838

3939
Bulk data loading using `COPY ... FROM STDIN`. Creates a table, loads three rows of tab-delimited text data via `Session.copy_in()`, verifies the data with a SELECT, then drops the table. Demonstrates the pull-based `CopyInReceiver` interface: `pg_copy_ready` fires after each `send_copy_data`, and `finish_copy` completes the operation.
4040

41+
## copy-out
42+
43+
Bulk data export using `COPY ... TO STDOUT`. Creates a table, inserts three rows, exports them via `Session.copy_out()`, and prints the received data. Demonstrates the push-based `CopyOutReceiver` interface: the server drives the flow, calling `pg_copy_data` for each chunk, then `pg_copy_complete` when finished.
44+
4145
## notice
4246

4347
Server notice handling using `pg_notice`. Executes `DROP TABLE IF EXISTS` on a nonexistent table, which triggers a PostgreSQL `NoticeResponse`, and prints the notice fields (severity, code, message). Shows how `SessionStatusNotify.pg_notice` delivers non-fatal informational messages from the server.
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
use "cli"
2+
use "collections"
3+
use lori = "lori"
4+
// in your code this `use` statement would be:
5+
// use "postgres"
6+
use "../../postgres"
7+
8+
actor Main
9+
new create(env: Env) =>
10+
let server_info = ServerInfo(env.vars)
11+
let auth = lori.TCPConnectAuth(env.root)
12+
13+
let client = Client(auth, server_info, env.out)
14+
15+
// This example demonstrates COPY OUT for bulk data export. It creates a table,
16+
// inserts three rows, uses COPY TO STDOUT to export them, prints the received
17+
// data, verifies the row count, then drops the table.
18+
//
19+
// The COPY OUT protocol is server-driven: after the session sends the COPY
20+
// query, the server pushes data via pg_copy_data callbacks. When all data is
21+
// sent, pg_copy_complete fires with the row count.
22+
actor Client is (SessionStatusNotify & ResultReceiver & CopyOutReceiver)
23+
let _session: Session
24+
let _out: OutStream
25+
var _phase: USize = 0
26+
var _copy_data: Array[U8] iso = recover iso Array[U8] end
27+
28+
new create(auth: lori.TCPConnectAuth, info: ServerInfo, out: OutStream) =>
29+
_out = out
30+
_session = Session(
31+
ServerConnectInfo(auth, info.host, info.port),
32+
DatabaseConnectInfo(info.username, info.password, info.database),
33+
this)
34+
35+
be close() =>
36+
_session.close()
37+
38+
be pg_session_authenticated(session: Session) =>
39+
_out.print("Authenticated.")
40+
_phase = 0
41+
session.execute(
42+
SimpleQuery("DROP TABLE IF EXISTS copy_out_example"), this)
43+
44+
be pg_session_authentication_failed(
45+
s: Session,
46+
reason: AuthenticationFailureReason)
47+
=>
48+
_out.print("Failed to authenticate.")
49+
50+
be pg_copy_data(session: Session, data: Array[U8] val) =>
51+
_copy_data.append(data)
52+
53+
be pg_copy_complete(session: Session, count: USize) =>
54+
let received: String val = String.from_iso_array(
55+
_copy_data = recover iso Array[U8] end)
56+
_out.print("COPY complete: " + count.string() + " rows exported.")
57+
_out.print("Received data:")
58+
_out.print(received)
59+
// Drop the table
60+
_out.print("Dropping table...")
61+
_session.execute(
62+
SimpleQuery("DROP TABLE copy_out_example"), this)
63+
64+
be pg_copy_failed(session: Session,
65+
failure: (ErrorResponseMessage | ClientQueryError))
66+
=>
67+
match failure
68+
| let e: ErrorResponseMessage =>
69+
_out.print("COPY failed: [" + e.severity + "] " + e.code + ": "
70+
+ e.message)
71+
| let e: ClientQueryError =>
72+
_out.print("COPY failed: client error")
73+
end
74+
close()
75+
76+
be pg_query_result(session: Session, result: Result) =>
77+
_phase = _phase + 1
78+
79+
match _phase
80+
| 1 =>
81+
// Table dropped (or didn't exist). Create it.
82+
_out.print("Creating table...")
83+
_session.execute(
84+
SimpleQuery(
85+
"""
86+
CREATE TABLE copy_out_example (
87+
name VARCHAR(50) NOT NULL,
88+
value INT NOT NULL
89+
)
90+
"""),
91+
this)
92+
| 2 =>
93+
// Table created. Insert rows.
94+
_out.print("Inserting rows...")
95+
_session.execute(
96+
SimpleQuery(
97+
"INSERT INTO copy_out_example VALUES " +
98+
"('alice', 10), ('bob', 20), ('charlie', 30)"),
99+
this)
100+
| 3 =>
101+
// Rows inserted. Start COPY OUT.
102+
_out.print("Starting COPY OUT...")
103+
_session.copy_out(
104+
"COPY copy_out_example TO STDOUT", this)
105+
| 4 =>
106+
// Table dropped. Done.
107+
_out.print("Done.")
108+
close()
109+
end
110+
111+
be pg_query_failed(session: Session, query: Query,
112+
failure: (ErrorResponseMessage | ClientQueryError))
113+
=>
114+
match failure
115+
| let e: ErrorResponseMessage =>
116+
_out.print("Query failed: [" + e.severity + "] " + e.code + ": "
117+
+ e.message)
118+
| let e: ClientQueryError =>
119+
_out.print("Query failed: client error")
120+
end
121+
close()
122+
123+
class val ServerInfo
124+
let host: String
125+
let port: String
126+
let username: String
127+
let password: String
128+
let database: String
129+
130+
new val create(vars: (Array[String] val | None)) =>
131+
let e = EnvVars(vars)
132+
host = try e("POSTGRES_HOST")? else "127.0.0.1" end
133+
port = try e("POSTGRES_PORT")? else "5432" end
134+
username = try e("POSTGRES_USERNAME")? else "postgres" end
135+
password = try e("POSTGRES_PASSWORD")? else "postgres" end
136+
database = try e("POSTGRES_DATABASE")? else "postgres" end

postgres/_backend_messages.pony

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,34 @@ class val _CopyInResponseMessage
194194
format = format'
195195
column_formats = column_formats'
196196

197+
class val _CopyOutResponseMessage
198+
"""
199+
Message from the backend indicating it is ready to send COPY data.
200+
Contains the overall format (0=text, 1=binary) and per-column format codes.
201+
"""
202+
let format: U8
203+
let column_formats: Array[U8] val
204+
205+
new val create(format': U8, column_formats': Array[U8] val) =>
206+
format = format'
207+
column_formats = column_formats'
208+
209+
class val _CopyDataMessage
210+
"""
211+
Message from the backend containing a chunk of COPY data during a
212+
COPY TO STDOUT operation.
213+
"""
214+
let data: Array[U8] val
215+
216+
new val create(data': Array[U8] val) =>
217+
data = data'
218+
219+
primitive _CopyDoneMessage
220+
"""
221+
Message from the backend indicating the end of the COPY data stream
222+
during a COPY TO STDOUT operation.
223+
"""
224+
197225
primitive _PortalSuspendedMessage
198226
"""
199227
Message from the backend indicating that an Execute command has been

postgres/_message_type.pony

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,14 @@ primitive _MessageType
4040
fun copy_in_response(): U8 =>
4141
'G'
4242

43+
fun copy_out_response(): U8 =>
44+
'H'
45+
46+
fun copy_data(): U8 =>
47+
'd'
48+
49+
fun copy_done(): U8 =>
50+
'c'
51+
4352
fun row_description(): U8 =>
4453
'T'

postgres/_response_message_parser.pony

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ primitive _ResponseMessageParser
5959
s.state.on_notice(s, msg)
6060
| let msg: _CopyInResponseMessage =>
6161
s.state.on_copy_in_response(s, msg)
62+
| let msg: _CopyOutResponseMessage =>
63+
s.state.on_copy_out_response(s, msg)
64+
| let msg: _CopyDataMessage =>
65+
s.state.on_copy_data(s, msg)
66+
| _CopyDoneMessage =>
67+
s.state.on_copy_done(s)
6268
| let msg: _ParameterStatusMessage =>
6369
s.state.on_parameter_status(s, msg)
6470
| let msg: _EmptyQueryResponseMessage =>

postgres/_response_parser.pony

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ type _ResponseParserResult is
2424
| _ParameterDescriptionMessage
2525
| _NotificationResponseMessage
2626
| _CopyInResponseMessage
27+
| _CopyOutResponseMessage
28+
| _CopyDataMessage
29+
| _CopyDoneMessage
2730
| _PortalSuspendedMessage
2831
| _ParameterStatusMessage
2932
| _UnsupportedMessage
@@ -218,6 +221,21 @@ primitive _ResponseParser
218221
// and parse the CopyInResponse payload in an isolated reader
219222
let copy_payload = buffer.block(payload_size)?
220223
return _copy_in_response(consume copy_payload)?
224+
| _MessageType.copy_out_response() =>
225+
// Slide past the header...
226+
buffer.skip(5)?
227+
// and parse the CopyOutResponse payload in an isolated reader
228+
let copy_out_payload = buffer.block(payload_size)?
229+
return _copy_out_response(consume copy_out_payload)?
230+
| _MessageType.copy_data() =>
231+
// Slide past the header...
232+
buffer.skip(5)?
233+
// and extract the raw data payload
234+
let copy_data_payload = buffer.block(payload_size)?
235+
return _CopyDataMessage(consume copy_data_payload)
236+
| _MessageType.copy_done() =>
237+
buffer.skip(message_size)?
238+
return _CopyDoneMessage
221239
else
222240
buffer.skip(message_size)?
223241
return _UnsupportedMessage
@@ -427,6 +445,23 @@ primitive _ResponseParser
427445

428446
_CopyInResponseMessage(format, consume col_fmts)
429447

448+
fun _copy_out_response(payload: Array[U8] val)
449+
: _CopyOutResponseMessage ?
450+
=>
451+
"""
452+
Parse a CopyOutResponse message. Same wire format as CopyInResponse.
453+
"""
454+
let reader: Reader = Reader.>append(payload)
455+
let format = reader.u8()?
456+
let num_cols = reader.u16_be()?.usize()
457+
let col_fmts: Array[U8] iso = recover iso Array[U8](num_cols) end
458+
459+
for i in Range(0, num_cols) do
460+
col_fmts.push(reader.u16_be()?.u8())
461+
end
462+
463+
_CopyOutResponseMessage(format, consume col_fmts)
464+
430465
fun _parameter_description(payload: Array[U8] val)
431466
: _ParameterDescriptionMessage ?
432467
=>

postgres/_test.pony

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,15 @@ actor \nodoc\ Main is TestList
158158
test(_TestCopyInAfterSessionClosed)
159159
test(_TestCopyInInsert)
160160
test(_TestCopyInAbortRollback)
161+
test(_TestResponseParserCopyOutResponseMessage)
162+
test(_TestResponseParserCopyDataMessage)
163+
test(_TestResponseParserCopyDoneMessage)
164+
test(_TestCopyOutSuccess)
165+
test(_TestCopyOutEmpty)
166+
test(_TestCopyOutServerError)
167+
test(_TestCopyOutShutdownDrainsCopyQueue)
168+
test(_TestCopyOutAfterSessionClosed)
169+
test(_TestCopyOutExport)
161170

162171
class \nodoc\ iso _TestAuthenticate is UnitTest
163172
"""

0 commit comments

Comments
 (0)