Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .release-notes/add-copy-out.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
## Add COPY TO STDOUT support

You can now bulk-export data from PostgreSQL using `COPY ... TO STDOUT`. Call `session.copy_out()` with a COPY SQL statement and a `CopyOutReceiver` to start the operation. The server drives the flow — data arrives via `pg_copy_data` callbacks, and `pg_copy_complete` fires when all data has been delivered.

```pony
actor Exporter is (SessionStatusNotify & ResultReceiver & CopyOutReceiver)
var _buffer: Array[U8] iso = recover iso Array[U8] end

be pg_session_authenticated(session: Session) =>
session.copy_out("COPY my_table TO STDOUT", this)

be pg_copy_data(session: Session, data: Array[U8] val) =>
_buffer.append(data)

be pg_copy_complete(session: Session, count: USize) =>
let received = String.from_iso_array(
_buffer = recover iso Array[U8] end)
_env.out.print("Exported " + count.string() + " rows")
_env.out.print(received)

be pg_copy_failed(session: Session,
failure: (ErrorResponseMessage | ClientQueryError))
=>
// handle error
```

Data format depends on the COPY command — the default is tab-delimited text with newline row terminators. Data chunks do not necessarily align with row boundaries; the receiver should buffer chunks if row-level processing is needed.
33 changes: 23 additions & 10 deletions CLAUDE.md

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ Some functionality that isn't yet supported is:
* Supplying connection configuration to the server
* Pipelining queries
* Function calls
* COPY operations

Note the appearance of an item on the above list isn't a guarantee that it will be supported in the future.

Expand Down
4 changes: 4 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ Asynchronous notifications using PostgreSQL's LISTEN/NOTIFY mechanism. Subscribe

Bulk data loading using `COPY ... FROM STDIN`. Creates a table, loads three rows of tab-delimited text data via `Session.copy_in()`, verifies the data with a SELECT, then drops the table. Demonstrates the pull-based `CopyInReceiver` interface: `pg_copy_ready` fires after each `send_copy_data`, and `finish_copy` completes the operation.

## copy-out

Bulk data export using `COPY ... TO STDOUT`. Creates a table, inserts three rows, exports them via `Session.copy_out()`, and prints the received data. Demonstrates the push-based `CopyOutReceiver` interface: the server drives the flow, calling `pg_copy_data` for each chunk, then `pg_copy_complete` when finished.

## notice

Server notice handling using `pg_notice`. Executes `DROP TABLE IF EXISTS` on a nonexistent table, which triggers a PostgreSQL `NoticeResponse`, and prints the notice fields (severity, code, message). Shows how `SessionStatusNotify.pg_notice` delivers non-fatal informational messages from the server.
Expand Down
136 changes: 136 additions & 0 deletions examples/copy-out/copy-out-example.pony
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use "cli"
use "collections"
use lori = "lori"
// in your code this `use` statement would be:
// use "postgres"
use "../../postgres"

actor Main
new create(env: Env) =>
let server_info = ServerInfo(env.vars)
let auth = lori.TCPConnectAuth(env.root)

let client = Client(auth, server_info, env.out)

// This example demonstrates COPY OUT for bulk data export. It creates a table,
// inserts three rows, uses COPY TO STDOUT to export them, prints the received
// data, verifies the row count, then drops the table.
//
// The COPY OUT protocol is server-driven: after the session sends the COPY
// query, the server pushes data via pg_copy_data callbacks. When all data is
// sent, pg_copy_complete fires with the row count.
actor Client is (SessionStatusNotify & ResultReceiver & CopyOutReceiver)
let _session: Session
let _out: OutStream
var _phase: USize = 0
var _copy_data: Array[U8] iso = recover iso Array[U8] end

new create(auth: lori.TCPConnectAuth, info: ServerInfo, out: OutStream) =>
_out = out
_session = Session(
ServerConnectInfo(auth, info.host, info.port),
DatabaseConnectInfo(info.username, info.password, info.database),
this)

be close() =>
_session.close()

be pg_session_authenticated(session: Session) =>
_out.print("Authenticated.")
_phase = 0
session.execute(
SimpleQuery("DROP TABLE IF EXISTS copy_out_example"), this)

be pg_session_authentication_failed(
s: Session,
reason: AuthenticationFailureReason)
=>
_out.print("Failed to authenticate.")

be pg_copy_data(session: Session, data: Array[U8] val) =>
_copy_data.append(data)

be pg_copy_complete(session: Session, count: USize) =>
let received: String val = String.from_iso_array(
_copy_data = recover iso Array[U8] end)
_out.print("COPY complete: " + count.string() + " rows exported.")
_out.print("Received data:")
_out.print(received)
// Drop the table
_out.print("Dropping table...")
_session.execute(
SimpleQuery("DROP TABLE copy_out_example"), this)

be pg_copy_failed(session: Session,
failure: (ErrorResponseMessage | ClientQueryError))
=>
match failure
| let e: ErrorResponseMessage =>
_out.print("COPY failed: [" + e.severity + "] " + e.code + ": "
+ e.message)
| let e: ClientQueryError =>
_out.print("COPY failed: client error")
end
close()

be pg_query_result(session: Session, result: Result) =>
_phase = _phase + 1

match _phase
| 1 =>
// Table dropped (or didn't exist). Create it.
_out.print("Creating table...")
_session.execute(
SimpleQuery(
"""
CREATE TABLE copy_out_example (
name VARCHAR(50) NOT NULL,
value INT NOT NULL
)
"""),
this)
| 2 =>
// Table created. Insert rows.
_out.print("Inserting rows...")
_session.execute(
SimpleQuery(
"INSERT INTO copy_out_example VALUES " +
"('alice', 10), ('bob', 20), ('charlie', 30)"),
this)
| 3 =>
// Rows inserted. Start COPY OUT.
_out.print("Starting COPY OUT...")
_session.copy_out(
"COPY copy_out_example TO STDOUT", this)
| 4 =>
// Table dropped. Done.
_out.print("Done.")
close()
end

be pg_query_failed(session: Session, query: Query,
failure: (ErrorResponseMessage | ClientQueryError))
=>
match failure
| let e: ErrorResponseMessage =>
_out.print("Query failed: [" + e.severity + "] " + e.code + ": "
+ e.message)
| let e: ClientQueryError =>
_out.print("Query failed: client error")
end
close()

class val ServerInfo
let host: String
let port: String
let username: String
let password: String
let database: String

new val create(vars: (Array[String] val | None)) =>
let e = EnvVars(vars)
host = try e("POSTGRES_HOST")? else "127.0.0.1" end
port = try e("POSTGRES_PORT")? else "5432" end
username = try e("POSTGRES_USERNAME")? else "postgres" end
password = try e("POSTGRES_PASSWORD")? else "postgres" end
database = try e("POSTGRES_DATABASE")? else "postgres" end
28 changes: 28 additions & 0 deletions postgres/_backend_messages.pony
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,34 @@ class val _CopyInResponseMessage
format = format'
column_formats = column_formats'

class val _CopyOutResponseMessage
"""
Message from the backend indicating it is ready to send COPY data.
Contains the overall format (0=text, 1=binary) and per-column format codes.
"""
let format: U8
let column_formats: Array[U8] val

new val create(format': U8, column_formats': Array[U8] val) =>
format = format'
column_formats = column_formats'

class val _CopyDataMessage
"""
Message from the backend containing a chunk of COPY data during a
COPY TO STDOUT operation.
"""
let data: Array[U8] val

new val create(data': Array[U8] val) =>
data = data'

primitive _CopyDoneMessage
"""
Message from the backend indicating the end of the COPY data stream
during a COPY TO STDOUT operation.
"""

primitive _PortalSuspendedMessage
"""
Message from the backend indicating that an Execute command has been
Expand Down
9 changes: 9 additions & 0 deletions postgres/_message_type.pony
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,14 @@ primitive _MessageType
fun copy_in_response(): U8 =>
'G'

fun copy_out_response(): U8 =>
'H'

fun copy_data(): U8 =>
'd'

fun copy_done(): U8 =>
'c'

fun row_description(): U8 =>
'T'
6 changes: 6 additions & 0 deletions postgres/_response_message_parser.pony
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ primitive _ResponseMessageParser
s.state.on_notice(s, msg)
| let msg: _CopyInResponseMessage =>
s.state.on_copy_in_response(s, msg)
| let msg: _CopyOutResponseMessage =>
s.state.on_copy_out_response(s, msg)
| let msg: _CopyDataMessage =>
s.state.on_copy_data(s, msg)
| _CopyDoneMessage =>
s.state.on_copy_done(s)
| let msg: _ParameterStatusMessage =>
s.state.on_parameter_status(s, msg)
| let msg: _EmptyQueryResponseMessage =>
Expand Down
35 changes: 35 additions & 0 deletions postgres/_response_parser.pony
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type _ResponseParserResult is
| _ParameterDescriptionMessage
| _NotificationResponseMessage
| _CopyInResponseMessage
| _CopyOutResponseMessage
| _CopyDataMessage
| _CopyDoneMessage
| _PortalSuspendedMessage
| _ParameterStatusMessage
| _UnsupportedMessage
Expand Down Expand Up @@ -218,6 +221,21 @@ primitive _ResponseParser
// and parse the CopyInResponse payload in an isolated reader
let copy_payload = buffer.block(payload_size)?
return _copy_in_response(consume copy_payload)?
| _MessageType.copy_out_response() =>
// Slide past the header...
buffer.skip(5)?
// and parse the CopyOutResponse payload in an isolated reader
let copy_out_payload = buffer.block(payload_size)?
return _copy_out_response(consume copy_out_payload)?
| _MessageType.copy_data() =>
// Slide past the header...
buffer.skip(5)?
// and extract the raw data payload
let copy_data_payload = buffer.block(payload_size)?
return _CopyDataMessage(consume copy_data_payload)
| _MessageType.copy_done() =>
buffer.skip(message_size)?
return _CopyDoneMessage
else
buffer.skip(message_size)?
return _UnsupportedMessage
Expand Down Expand Up @@ -427,6 +445,23 @@ primitive _ResponseParser

_CopyInResponseMessage(format, consume col_fmts)

fun _copy_out_response(payload: Array[U8] val)
: _CopyOutResponseMessage ?
=>
"""
Parse a CopyOutResponse message. Same wire format as CopyInResponse.
"""
let reader: Reader = Reader.>append(payload)
let format = reader.u8()?
let num_cols = reader.u16_be()?.usize()
let col_fmts: Array[U8] iso = recover iso Array[U8](num_cols) end

for i in Range(0, num_cols) do
col_fmts.push(reader.u16_be()?.u8())
end

_CopyOutResponseMessage(format, consume col_fmts)

fun _parameter_description(payload: Array[U8] val)
: _ParameterDescriptionMessage ?
=>
Expand Down
9 changes: 9 additions & 0 deletions postgres/_test.pony
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ actor \nodoc\ Main is TestList
test(_TestCopyInAfterSessionClosed)
test(_TestCopyInInsert)
test(_TestCopyInAbortRollback)
test(_TestResponseParserCopyOutResponseMessage)
test(_TestResponseParserCopyDataMessage)
test(_TestResponseParserCopyDoneMessage)
test(_TestCopyOutSuccess)
test(_TestCopyOutEmpty)
test(_TestCopyOutServerError)
test(_TestCopyOutShutdownDrainsCopyQueue)
test(_TestCopyOutAfterSessionClosed)
test(_TestCopyOutExport)

class \nodoc\ iso _TestAuthenticate is UnitTest
"""
Expand Down
Loading