Skip to content

Commit 954c6e2

Browse files
committed
pg_lake: streaming writes for INSERT, UPDATE, DELETE, COPY, and iceberg metadata
Adds an opt-in streaming-write feature that pushes bulk-write bytes directly to pgduck_server via libpq COPY-IN, instead of writing them to a shared filesystem under $PGDATA/pgsql_tmp and asking pgduck to read them back. Companion to the pgduck_server RECEIVE protocol. This decouples pgduck_server's filesystem from postgres's: the two processes can run on different machines, different containers, different pods, with no shared mount required. User-visible: - New GUC `pg_lake_engine.streaming_writes` (default off). When flipped on, the bulk-write code paths route through the streaming protocol; when off, all behavior is identical to today. - No SQL syntax changes. INSERT, UPDATE, DELETE, COPY FROM STDIN, CREATE TABLE iceberg, and iceberg metadata uploads all switch paths transparently based on the GUC. What this patch adds, by component: `pg_lake_engine`: - `OpenCSVStreamWriter` / `FinishCSVStreamWriter` / `CSVStreamWriterDestReceiver` — the client-side primitive that opens a libpq COPY-IN to pgduck_server's RECEIVE sink, hands back a postgres `DestReceiver` that callers drive with rows, and finalizes the deferred query when the stream ends. - `StreamLocalFileToS3` — uses the same RECEIVE protocol to stream iceberg metadata files (metadata.json, manifest list, manifest) into pgduck for upload. Replaces the file-based path that needed pgduck to see those files locally. - Cooperative wait via `WaitForResult` in the new primitives: `WaitLatchOrSocket` + `PQconsumeInput` + `CHECK_FOR_INTERRUPTS` loop, so `statement_timeout` / SIGINT / postmaster-death actually fire while the backend is waiting on pgduck. The naive `PQgetResult` would not. `pg_lake_table` (the FDW): - `multi_data_file_dest.c` (the INSERT-side `MultiDataFileUploadDestReceiver`): open a `CSVStreamWriter` instead of a CSV temp file when the GUC is on. Each rotation opens a new stream; FlushChildDestReceiver finalizes it via FinishCSVStreamWriter. The per-rotation writer lives in the receiver's existing childContext, which gets reset between rotations. The `partition` pointer on each `DataFileModification` is deep-copied under `parentContext` before that reset (NULL preserved for unpartitioned tables) so downstream consumers in `ApplyDataFileModifications` — which runs at PRE_COMMIT in a different memory-context lifetime — don't dereference into freed memory. - `pg_lake_table.c` (the UPDATE/DELETE callsites): the `deleteStreamWriter` lazy-open in `DeleteSingleRow` opens a new per-source-file stream for the position-delete records, and `FinishForeignModify` calls `FinishCSVStreamWriter` to seal it. The writer is allocated in a dedicated long-lived `deleteStreamMemoryContext` (created at create_foreign_modify time, sub of the FDW state context) so it survives the executor's per-tuple resets — the same lifetime discipline multi_data_file_dest.c already uses for its INSERT-side writer. The "all rows deleted, drop the deletion file" optimization in `WriteDeleteRecord` is gated to the file-based path; for the streaming path, the writer keeps accumulating rows and the resulting position-delete file covers all rows in the source — semantically equivalent under iceberg's position-delete merge. `pg_lake_copy` (the COPY pushdown): - `copy.c` / `copy_io.c`: `COPY foreign_table FROM STDIN` opens a `CSVStreamWriter` and pumps the client's CopyData straight through to pgduck. The non-pushdown path (when COPY can't be pushed down) still uses the file-based code. Memory-context discipline: The streaming writers live across multiple rows and sometimes across the executor's per-tuple ExprContext reset. Each writer is allocated in a long-lived context dedicated to that writer's lifetime — for the INSERT side, the existing `MultiDataFileUploadDestReceiver` childContext; for the DELETE side, a new `deleteStreamMemoryContext` on the modify state. This mirrors how the file-based path already explicitly allocates its `CreateCSVDestReceiver` "in a long-lived memory context" (per the comment in create_foreign_modify). Compatibility: - Default-off GUC: existing deployments are untouched. No behavior change unless `streaming_writes = on`. - The file-based code paths are unchanged. No regressions on shared-filesystem topologies. - No SQL surface changes. No changes to data layout or iceberg catalog representation. Testing: Verified end-to-end against a CDC-shape workload (200 mixed INSERT/UPDATE/DELETE events with COMMIT per event) via a local kind cluster + a GKE deployment with pgduck_server running in a separate pod from postgres. 10 back-to-back 600s soak runs with full diagnostic instrumentation across the entire writer lifetime detected zero correctness issues. Signed-off-by: Tim McLaughlin <tim@gotab.io>
1 parent 77ce1fc commit 954c6e2

15 files changed

Lines changed: 1815 additions & 180 deletions

File tree

pg_lake_copy/include/pg_lake/copy/copy_io.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
#ifndef PG_LAKE_COPY_IO_H
1919
#define PG_LAKE_COPY_IO_H
2020

21+
#include "libpq-fe.h"
22+
2123

2224
void CopyInputToFile(char *filePath, int columnCount, bool isBinary);
25+
void CopyInputToStream(PGconn *streamConn, int columnCount, bool isBinary);
2326
void CopyFileToOutput(char *filePath, int columnCount, bool isBinary);
2427

2528

pg_lake_copy/src/copy/copy.c

Lines changed: 82 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ PgLakeCopyValidityCheckHookType PgLakeCopyValidityCheckHook = NULL;
9696
static bool IsPgLakeCopy(CopyStmt *copyStmt);
9797
static bool IsCopyFromStdin(CopyStmt *copyStmt);
9898
static bool IsCopyToStdout(CopyStmt *copyStmt);
99+
static int64 StreamingCopyFromStdinPushdown(Oid relationId, char *readQuery,
100+
TupleDesc tupleDesc);
99101
static void ProcessPgLakeCopyFrom(CopyStmt *copyStmt, ParseState *pstate,
100102
Relation relation, Node *whereClause,
101103
uint64 *rowsProcessed);
@@ -449,23 +451,52 @@ ProcessPgLakeCopyFrom(CopyStmt *copyStmt, ParseState *pstate, Relation relation,
449451
*/
450452
TupleDesc tupleDesc = BuildTupleDescriptorForRelation(relation, copyStmt->attlist);
451453

454+
/*
455+
* Compute doCopyPushdown up front so the STDIN block can choose between
456+
* the streaming-write path (when GUC is on AND we'll push the COPY down
457+
* to pgduck) and the file-based path (everything else). For non-pushdown
458+
* COPYs the COPY query is wrapped in TRANSMIT and rows round-trip back
459+
* through PG; that's a different shape than the deferred-INSERT we use
460+
* for streaming, so streaming + non-pushdown still uses the file-based
461+
* path. Document the gap.
462+
*/
463+
bool doCopyPushdown = IsCopyFromPushdownable(relation, copyStmt->attlist,
464+
whereClause, sourceFormat);
465+
bool useStreamingStdin = StreamingWritesEnabled && IsCopyFromStdin(copyStmt) &&
466+
doCopyPushdown;
467+
452468
if (IsCopyFromStdin(copyStmt))
453469
{
454-
sourcePath = GenerateTempFileName(TEMP_FILE_PATTERN, ensureCleanup);
470+
if (useStreamingStdin)
471+
{
472+
/*
473+
* Streaming-write path: don't park bytes locally. Use the
474+
* pgduck_server RECEIVE sink-path placeholder; the deferred COPY
475+
* query that AddQueryResultToTableStream builds will pick this up
476+
* and substitute it with the server-local sink path before
477+
* running read_csv() on it.
478+
*/
479+
sourcePath = pstrdup(PG_LAKE_RECV_PATH_PLACEHOLDER);
480+
}
481+
else
482+
{
483+
sourcePath = GenerateTempFileName(TEMP_FILE_PATTERN, ensureCleanup);
455484

456-
/*
457-
* we send the expected column count to make pedantic clients happy
458-
*/
459-
int columnCount = tupleDesc->natts;
485+
/*
486+
* we send the expected column count to make pedantic clients
487+
* happy
488+
*/
489+
int columnCount = tupleDesc->natts;
460490

461-
bool isBinary = true;
491+
bool isBinary = true;
462492

463-
/*
464-
* We copy the incoming bytes to a file first and then try to convert
465-
* that file. We could perhaps optimize this in the future by copying
466-
* via a named pipe.
467-
*/
468-
CopyInputToFile(sourcePath, columnCount, isBinary);
493+
/*
494+
* We copy the incoming bytes to a file first and then try to
495+
* convert that file. We could perhaps optimize this in the future
496+
* by copying via a named pipe.
497+
*/
498+
CopyInputToFile(sourcePath, columnCount, isBinary);
499+
}
469500
}
470501

471502
/*
@@ -483,9 +514,6 @@ ProcessPgLakeCopyFrom(CopyStmt *copyStmt, ParseState *pstate, Relation relation,
483514
* case. However, we do want explicit casts to avoid writing incorrect
484515
* types.
485516
*/
486-
bool doCopyPushdown = IsCopyFromPushdownable(relation, copyStmt->attlist,
487-
whereClause, sourceFormat);
488-
489517
if (!doCopyPushdown)
490518
readFlags |= READ_DATA_TRANSMIT;
491519
else
@@ -535,6 +563,13 @@ ProcessPgLakeCopyFrom(CopyStmt *copyStmt, ParseState *pstate, Relation relation,
535563
*/
536564
if (doCopyPushdown)
537565
{
566+
if (useStreamingStdin)
567+
{
568+
*rowsProcessed = StreamingCopyFromStdinPushdown(relationId, readQuery,
569+
tupleDesc);
570+
return;
571+
}
572+
538573
*rowsProcessed = AddQueryResultToTable(relationId, readQuery, tupleDesc, true);
539574
return;
540575
}
@@ -660,6 +695,38 @@ IsCopyFromPushdownable(Relation relation, List *columnNameList,
660695
}
661696

662697

698+
/*
699+
* StreamingCopyFromStdinPushdown drives the COPY tablename FROM STDIN
700+
* pushdown path when pg_lake_engine.streaming_writes=on.
701+
*
702+
* Pulled out of ProcessPgLakeCopyFrom so the caller stays small enough
703+
* for gcc's clobbered-variable analysis (-Wclobbered) to keep its
704+
* pre-streaming behavior on the unrelated TupleDesc helpers below;
705+
* inlining a multi-step PG-call sequence into ProcessPgLakeCopyFrom
706+
* triggered false-positive clobber warnings on attributeDescriptor /
707+
* cleanTupleDesc that don't fire when this branch is its own function.
708+
*
709+
* No PG_TRY/CATCH: errors here trigger transaction abort; pgduck's
710+
* PGDuckClientTransactionCallback recycles the libpq connection and
711+
* the in-progress-file record registered inside Start... is cleaned
712+
* up by the pre-commit hook on abort.
713+
*/
714+
static int64
715+
StreamingCopyFromStdinPushdown(Oid relationId, char *readQuery, TupleDesc tupleDesc)
716+
{
717+
AddQueryResultStreamHandle *handle =
718+
StartAddQueryResultToTableStream(relationId, readQuery, tupleDesc,
719+
/* wrapNativeTypes */ true);
720+
721+
int columnCount = tupleDesc->natts;
722+
bool isBinary = true;
723+
724+
CopyInputToStream(AddQueryResultStreamConnection(handle), columnCount, isBinary);
725+
726+
return FinishAddQueryResultToTableStream(handle);
727+
}
728+
729+
663730
/*
664731
* GenerateReadDataSourceQuery generates the pgduck query to read the given
665732
* source format/path.

pg_lake_copy/src/copy/copy_io.c

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "storage/fd.h"
3131
#include "utils/memutils.h"
3232

33+
#include "libpq-fe.h"
3334
#include "libpq/libpq.h"
3435

3536
#define MAX_READ_SIZE (65536)
@@ -55,6 +56,55 @@ static void SendCopyEnd(void);
5556
static void SendCopyData(char *sendBuffer, int sendBufferLength);
5657

5758

59+
/*
60+
* CopyInputToStream is the streaming counterpart of CopyInputToFile.
61+
*
62+
* Instead of writing the postgres client's COPY-IN bytes to a local file,
63+
* forwards them via PQputCopyData on `streamConn`. The caller must have
64+
* already opened a libpq COPY-IN stream against pgduck_server (via
65+
* OpenCopyInStreamToPGDuck) so streamConn is in COPY-IN-active state.
66+
* Uses the same SendCopyInResponseToClient(columnCount, isBinary) the
67+
* file-based path uses to tell the client we're ready.
68+
*
69+
* Caller is responsible for finalizing the libpq stream via
70+
* FinishCopyInStreamToPGDuck (or moral equivalent) afterwards.
71+
*/
72+
void
73+
CopyInputToStream(PGconn *streamConn, int columnCount, bool isBinary)
74+
{
75+
CopyFromStdinState cstate = {
76+
.fe_msgbuf = makeStringInfo(),
77+
.raw_reached_eof = false
78+
};
79+
80+
/* tell the client we are ready for data */
81+
SendCopyInResponseToClient(columnCount, isBinary);
82+
83+
char *receiveBuffer = palloc(MAX_READ_SIZE);
84+
85+
while (!cstate.raw_reached_eof)
86+
{
87+
unsigned long bytesRead = ReceiveDataFromClient(&cstate, receiveBuffer);
88+
89+
if (bytesRead > 0)
90+
{
91+
if (PQputCopyData(streamConn, receiveBuffer, (int) bytesRead) != 1)
92+
{
93+
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
94+
errmsg("failed to forward COPY data to pgduck_server: %s",
95+
PQerrorMessage(streamConn))));
96+
}
97+
}
98+
else if (bytesRead == 0)
99+
{
100+
break;
101+
}
102+
}
103+
104+
pfree(receiveBuffer);
105+
}
106+
107+
58108
/*
59109
* CopyInputToFile copies data from the socket to the given file.
60110
* We request the client send a specific column count.

pg_lake_engine/include/pg_lake/csv/csv_writer.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#ifndef CSV_WRITER_H
1919
#define CSV_WRITER_H
2020

21+
#include "libpq-fe.h"
22+
2123
#include "pg_lake/copy/copy_format.h"
2224
#include "tcop/dest.h"
2325
#include "nodes/pg_list.h"
@@ -28,6 +30,22 @@ extern PGDLLEXPORT DestReceiver *CreateCSVDestReceiverExtended(char *filename,
2830
List *copyOptions,
2931
CopyDataFormat targetFormat,
3032
bool sessionLifetime);
33+
34+
/*
35+
* Streaming variant: bytes go to a libpq COPY-IN already opened on
36+
* `streamConn` (use OpenCopyInStreamToPGDuck() to get there). The caller
37+
* is responsible for finalizing the stream via FinishCopyInStreamToPGDuck()
38+
* AFTER calling rShutdown on the returned DestReceiver — rShutdown only
39+
* flushes the per-row buffer and emits any binary trailer; it deliberately
40+
* does NOT call PQputCopyEnd, so callers can still emit additional
41+
* CopyData (e.g. for multi-segment writes) before closing. The deferred
42+
* query's PGresult is returned by FinishCopyInStreamToPGDuck so callers
43+
* can extract row counts / column statistics.
44+
*/
45+
extern PGDLLEXPORT DestReceiver *CreateCSVStreamDestReceiver(PGconn *streamConn,
46+
List *copyOptions,
47+
CopyDataFormat targetFormat);
48+
3149
extern PGDLLEXPORT int GetCSVDestReceiverMaxLineSize(DestReceiver *dest);
3250
extern PGDLLEXPORT uint64 GetCSVDestReceiverFileSize(DestReceiver *dest);
3351

pg_lake_engine/include/pg_lake/pgduck/client.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,27 @@ extern PGDLLEXPORT char *GetSingleValueFromPGDuck(char *query);
5252
extern PGDLLEXPORT void SendQueryWithParams(PGDuckConnection * pgduckConn, char *queryString,
5353
int numParams, const char **parameterValues);
5454

55+
/*
56+
* Streaming-write helpers for the RECEIVE protocol prefix on pgduck_server.
57+
*
58+
* OpenCopyInStreamToPGDuck sends `queryString` (which must begin with
59+
* "RECEIVE " and contain '@@PG_LAKE_RECV@@' as the read_csv path
60+
* placeholder) and waits for pgduck_server's CopyInResponse. After this
61+
* returns, the caller may emit CSV bytes via PQputCopyData on
62+
* pgDuckConnection->conn (typically by passing it to
63+
* CreateCSVStreamDestReceiver and driving a producer query through it).
64+
*
65+
* FinishCopyInStreamToPGDuck calls PQputCopyEnd(NULL), waits for the
66+
* deferred query's first PGresult, drains the trailing NULL terminator,
67+
* and returns the PGresult to the caller. The caller owns it and must
68+
* PQclear it. Errors raised by the deferred query (e.g. INSERT failures)
69+
* surface here as ereport(ERROR) before returning.
70+
*
71+
* Both throw on protocol or query errors; the connection is left in an
72+
* idle state on success, in an error state on failure.
73+
*/
74+
extern PGDLLEXPORT void OpenCopyInStreamToPGDuck(PGDuckConnection * pgDuckConnection,
75+
const char *queryString);
76+
extern PGDLLEXPORT PGresult *FinishCopyInStreamToPGDuck(PGDuckConnection * pgDuckConnection);
77+
5578
#endif

pg_lake_engine/include/pg_lake/pgduck/write_data.h

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,25 @@
2323
#include "pg_lake/parquet/field.h"
2424
#include "pg_lake/pgduck/iceberg_validation.h"
2525
#include "nodes/pg_list.h"
26+
#include "tcop/dest.h"
2627

2728
/* pg_lake_table.target_row_group_size_mb */
2829
#define DEFAULT_TARGET_ROW_GROUP_SIZE_MB 512
2930
extern PGDLLEXPORT int TargetRowGroupSizeMB;
3031

32+
/* pg_lake_engine.streaming_writes */
33+
extern PGDLLEXPORT bool StreamingWritesEnabled;
34+
35+
/*
36+
* Sink-path placeholder substituted server-side by pgduck_server's
37+
* RECEIVE handler. Must match SINK_PLACEHOLDER in pgduck_server's
38+
* pgsession.c. Streaming-write callers pass this where the file-based
39+
* path would pass the local CSV file path; the placeholder is then
40+
* embedded inside read_csv('<placeholder>', ...) and pgduck swaps in
41+
* the server-local sink path before running the deferred query.
42+
*/
43+
#define PG_LAKE_RECV_PATH_PLACEHOLDER "@@PG_LAKE_RECV@@"
44+
3145
typedef enum ParquetVersion
3246
{
3347
PARQUET_VERSION_V1 = 1,
@@ -57,7 +71,42 @@ extern PGDLLEXPORT StatsCollector * WriteQueryResultTo(char *query,
5771
List *leafFields,
5872
IcebergOutOfRangePolicy outOfRangePolicy,
5973
bool wrapNativeTypes);
74+
/*
75+
* BuildCopyToCommandString assembles `COPY (query) TO 'destinationPath'
76+
* WITH (format ..., compression ..., return_stats, ...)` for the given
77+
* destination format / compression / format options. The same string
78+
* builder is used by WriteQueryResultTo (file path), OpenCSVStreamWriter
79+
* (streaming), and the streaming AddQueryResultToTable variant in
80+
* writable_table.c.
81+
*/
82+
extern PGDLLEXPORT char *BuildCopyToCommandString(char *query, char *destinationPath,
83+
CopyDataFormat destinationFormat,
84+
CopyDataCompression destinationCompression,
85+
List *formatOptions,
86+
bool queryHasRowId,
87+
DataFileSchema * schema,
88+
TupleDesc queryTupleDesc,
89+
IcebergOutOfRangePolicy outOfRangePolicy,
90+
bool wrapNativeTypes);
91+
6092
extern PGDLLEXPORT void AppendFields(StringInfo map, DataFileSchema * schema);
6193
extern PGDLLEXPORT char *TupleDescToColumnMapForWrite(TupleDesc tupleDesc, CopyDataFormat destinationFormat);
6294
extern PGDLLEXPORT char *TupleDescToProjectionListForWrite(TupleDesc tupleDesc,
6395
CopyDataFormat destinationFormat);
96+
97+
/*
98+
* Streaming counterpart of ConvertCSVFileTo. See OpenCSVStreamWriter in
99+
* write_data.c for the contract. The struct is opaque to callers.
100+
*/
101+
typedef struct CSVStreamWriter CSVStreamWriter;
102+
103+
extern PGDLLEXPORT CSVStreamWriter * OpenCSVStreamWriter(TupleDesc csvTupleDesc,
104+
int maxLineSize,
105+
char *destinationPath,
106+
CopyDataFormat destinationFormat,
107+
CopyDataCompression destinationCompression,
108+
List *formatOptions,
109+
DataFileSchema * schema,
110+
List *leafFields);
111+
extern PGDLLEXPORT DestReceiver *CSVStreamWriterDestReceiver(CSVStreamWriter * writer);
112+
extern PGDLLEXPORT StatsCollector * FinishCSVStreamWriter(CSVStreamWriter * writer);

0 commit comments

Comments
 (0)