Skip to content

Commit 77ce1fc

Browse files
committed
pgduck_server: add RECEIVE protocol for streaming COPY-IN sink
Adds a TCP listener and a new "RECEIVE" query prefix to pgduck_server. With these in place, a remote client can stream CSV (or other data) to a server-local sink path via the standard libpq COPY-IN protocol, and pgduck runs a deferred query reading from that path once CopyDone arrives. Motivation: The existing pgduck_server design assumes the client and pgduck share a filesystem — the standard pg_lake "sidecar" deployment colocates postgres and pgduck_server in the same pod and lets pg_lake's bulk- write paths drop CSV under $PGDATA/pgsql_tmp for pgduck to read with read_csv(). That topology breaks under deployment models that don't co-locate the two processes (operator-managed Kubernetes deployments where postgres and pgduck run in separate pods, multi-host setups, etc.). The streaming-write paths in the companion pg_lake patch lean on this RECEIVE protocol to push bytes directly to pgduck via libpq without needing a shared filesystem. What this patch adds: - TCP listener on pgduck_server controlled by --listen-addresses / --port (default Unix-socket path remains supported). Lets remote postgres backends reach pgduck over the network. - A "RECEIVE <inner-query>" query prefix recognized by process_query_message. When it sees "RECEIVE …", the server: 1. Picks a server-local sink path (under --recv-dir). 2. Substitutes the bare token "@@PGLAKE_RECV_SINK@@" inside the inner query with a properly-quoted SQL literal containing the sink path. The server adds the surrounding single quotes and escapes any embedded single quotes; the placeholder itself is intentionally NOT inside a SQL string literal in the client-emitted query, so it can never collide with user- supplied data. 3. Sends CopyInResponse to the client and accepts CopyData chunks, writing them straight to the sink. 4. On CopyDone, runs the deferred inner query against the sink path and streams its result rows back via the standard PGresult flow. This lets clients use the existing libpq COPY-IN flow as the transport for arbitrary inner queries that read from a path. - recv_sink module: opens, writes, and cleans up the per-client sink files; bounded by --recv-max-bytes; refuses path traversal. - SSL-negotiate-byte handling: pgduck_server replies 'N' to libpq's SSLRequest instead of closing the connection. Lets clients with sslmode=prefer fall back to plaintext cleanly. - Explicit pgsession_flush() after pgsession_send_copy_in_response. Without this, the 5-byte CopyInResponse can sit in pgduck's send buffer indefinitely (no auto-flush on small messages); the client blocks forever in PQputCopyData waiting for the server to be ready, and the RECEIVE handshake deadlocks. Found while debugging exactly that ~4-hour hang during smoke runs. Compatibility: - Existing simple SELECT and COPY paths are unchanged. - Unix-socket transport remains supported when --listen-addresses is not set; nothing forces TCP. - No new dependencies. The recv_sink uses the same memory and I/O primitives the rest of pgsession.c uses. - The "RECEIVE" prefix is only recognized in the simple-query path (because it needs the server-side prefix detection that the extended-query protocol doesn't offer). Clients using PQsendQueryParams keep using the existing path. Signed-off-by: Tim McLaughlin <tim@gotab.io>
1 parent bbd1176 commit 77ce1fc

10 files changed

Lines changed: 1301 additions & 87 deletions

File tree

pgduck_server/include/command_line/command_line.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ typedef struct
3333
char *unix_socket_directory;
3434
char *unix_socket_group;
3535
int unix_socket_permissions;
36+
37+
/*
38+
* Comma-separated list of TCP addresses to bind on (PostgreSQL-style
39+
* semantics). Empty string or NULL means TCP is disabled (default) and
40+
* pgduck_server only listens on the Unix domain socket. Examples:
41+
* "0.0.0.0" listen on all IPv4 interfaces "0.0.0.0,::" listen on all
42+
* IPv4 + all IPv6 interfaces "127.0.0.1" loopback only The TCP port is
43+
* the same `port` field used for the Unix socket suffix.
44+
*/
45+
char *listen_addresses;
3646
unsigned int port;
3747
unsigned int max_clients;
3848
char *memory_limit;
@@ -46,6 +56,12 @@ typedef struct
4656
bool debug;
4757
char *init_file_path;
4858
char *pidfile_path;
59+
60+
/*
61+
* Base directory for RECEIVE-query sinks (libpq COPY-IN landing files).
62+
* Defaults to <cache_dir>/recv when unset; overridable via --recv-dir.
63+
*/
64+
char *recv_dir;
4965
} CommandLineOptions;
5066

5167
CommandLineOptions parse_arguments(int argc, char *argv[]);

pgduck_server/include/pgserver/pgserver.h

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,31 @@
2323
#ifndef PGDUCK_PG_SERVER_H
2424
#define PGDUCK_PG_SERVER_H
2525

26+
/*
27+
* Maximum number of TCP listening sockets we'll bind. Each address in
28+
* --listen_addresses can resolve to multiple addrinfos (rare; usually
29+
* one per family), so we size for headroom over typical configs like
30+
* "0.0.0.0,::".
31+
*/
32+
#define MAX_TCP_LISTEN_SOCKETS 16
33+
2634
/*
2735
* PGServer represents an instance of a PostgreSQL wire compatible
2836
* server.
2937
*/
3038
typedef struct PGServer
3139
{
3240
int listeningPort;
33-
int listeningSocket;
41+
int listeningSocket; /* Unix domain socket */
42+
43+
/*
44+
* TCP listening sockets (one per resolved address from
45+
* --listen_addresses). numTcpSockets == 0 when TCP is disabled (the
46+
* default), in which case the server only accepts Unix-socket connections —
47+
* preserving backwards-compatible behavior.
48+
*/
49+
int tcpSockets[MAX_TCP_LISTEN_SOCKETS];
50+
int numTcpSockets;
3451

3552
char unixSocketDir[MAXPGPATH];
3653
char unixSocketPath[MAXPGPATH];
@@ -48,6 +65,7 @@ extern int pgserver_init(PGServer * pgServer,
4865
char *unixSocketPath,
4966
char *unixSocketOwningGroup,
5067
int unixSocketPermissions,
68+
char *tcpListenAddresses,
5169
int port);
5270
extern int pgserver_run(PGServer * pgServer);
5371
extern void pgserver_destroy(PGServer * pgServer);

pgduck_server/include/pgsession/pgsession.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828

2929
#include "duckdb/duckdb.h"
3030

31+
/* forward declaration; full definition lives in pgsession/recv_sink.h */
32+
typedef struct ReceiveSink ReceiveSink;
33+
3134
#define DUCKPG_SERVER_VERSION "16.4.DuckPG"
3235

3336
/*
@@ -175,6 +178,19 @@ typedef struct PGSession
175178

176179
int lastReportedSendErrno; /* needed to make pgsession_flush
177180
* thread-safe */
181+
182+
/*
183+
* RECEIVE-query state.
184+
*
185+
* When a query begins with the "RECEIVE " prefix, we send the client a
186+
* CopyInResponse, accept CopyData messages into activeRecvSink, and defer
187+
* running the actual DuckDB query until CopyDone arrives. While
188+
* deferredQueryString is non-NULL the session is in COPY-IN-active state
189+
* and only 'd' / 'c' / 'f' messages are valid.
190+
*/
191+
ReceiveSink *activeRecvSink;
192+
char *deferredQueryString;
193+
ResponseFormat deferredResponseFormat;
178194
} PGSession;
179195

180196
/* per-client entrance point for the pgsession logic */
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2026 Snowflake Inc.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/*
19+
* Receive sink: a per-RECEIVE-query landing area on pgduck_server's local
20+
* filesystem.
21+
*
22+
* The RECEIVE query prefix (mirror of TRANSMIT) lets pg_lake clients stream
23+
* bytes via libpq COPY IN, instead of writing to a shared filesystem under
24+
* the Postgres backend's PGDATA. The bytes land in a sink owned by
25+
* pgduck_server, and the sink path is what gets passed to DuckDB's
26+
* read_csv() etc. This decouples pgduck_server's filesystem from the
27+
* client's PGDATA, which is a hard requirement when pgduck_server runs in
28+
* a different pod/host than the Postgres backend (e.g. on Kubernetes).
29+
*
30+
* v1 stages received bytes to a regular file in a private base directory.
31+
* Future versions may use a FIFO so DuckDB can consume the stream in
32+
* parallel with the client upload, but the file approach is simpler and
33+
* sufficient for typical CDC batch sizes (tens of MB).
34+
*/
35+
#ifndef PGDUCK_RECV_SINK_H
36+
#define PGDUCK_RECV_SINK_H
37+
38+
#include "c.h"
39+
#include <stddef.h>
40+
41+
typedef struct ReceiveSink ReceiveSink;
42+
43+
/*
44+
* Initialize the global receive directory. Creates it (mode 0700) if
45+
* missing and unlinks any stale files left over from a prior run. Must be
46+
* called once at startup, before any sink is created. Repeated calls
47+
* replace the configured path.
48+
*
49+
* Returns 0 on success, -1 on error (with errno set and the failure
50+
* already logged).
51+
*/
52+
int recv_sink_global_init(const char *base_dir);
53+
54+
/*
55+
* Returns the configured base directory, or NULL if recv_sink has not been
56+
* initialized.
57+
*/
58+
const char *recv_sink_global_dir(void);
59+
60+
/*
61+
* Create a new sink. Allocates a unique path under the global recv
62+
* directory, opens it for writing (mode 0600), and returns the sink. The
63+
* caller owns the sink and must call recv_sink_destroy() when done.
64+
*
65+
* Returns NULL on error (with errno set).
66+
*/
67+
ReceiveSink *recv_sink_create(void);
68+
69+
/*
70+
* Path of the sink file. Stable from create() through destroy(). DuckDB
71+
* should open this path for reading.
72+
*/
73+
const char *recv_sink_path(const ReceiveSink * sink);
74+
75+
/*
76+
* Append bytes to the sink. Performs a full write (handles partial writes
77+
* and EINTR). Returns 0 on success, -1 on error.
78+
*/
79+
int recv_sink_write(ReceiveSink * sink, const char *data, size_t len);
80+
81+
/*
82+
* Close the sink for writing. After this call, the sink path is readable
83+
* by DuckDB and reads will see EOF after all written bytes. Idempotent.
84+
* Returns 0 on success, -1 on error.
85+
*/
86+
int recv_sink_finalize(ReceiveSink * sink);
87+
88+
/*
89+
* Destroy the sink: close the file if still open, unlink the path, and
90+
* free memory. Always succeeds; safe to call with NULL.
91+
*/
92+
void recv_sink_destroy(ReceiveSink * sink);
93+
94+
#endif /* PGDUCK_RECV_SINK_H */

pgduck_server/src/command_line/command_line.c

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ print_usage()
5757
printf(" --unix_socket_directory <path> Specify the unix socket directory, default is %s\n", DEFAULT_UNIX_DOMAIN_PATH);
5858
printf(" --unix_socket_group <group name> Specify the unix socket group owner, default is \"%s\"\n", DEFAULT_UNIX_DOMAIN_GROUP);
5959
printf(" --unix_socket_permissions <mask> Specify the unix socket (chmod) permissions, default is %o\n", DEFAULT_UNIX_DOMAIN_PERMISSIONS);
60-
printf(" --port <port> Specify the port number, default is %d\n", DEFAULT_PORT);
60+
printf(" --listen_addresses <addrs> Comma-separated TCP addresses to listen on (e.g. \"0.0.0.0,::\"). Default: empty (Unix socket only)\n");
61+
printf(" --port <port> Specify the port number (used for both Unix socket suffix and TCP listener), default is %d\n", DEFAULT_PORT);
6162
printf(" --max_clients <max_clients> Specify the maximum allowed clients, default is %d\n", DEFAULT_MAX_CLIENTS);
6263
printf(" --memory_limit=<memory_limit> Optionally specify the maximum memory of pgduck_server similar to DuckDB's memory_limit, the default is 80 percent of the system memory\n");
6364
printf(" --continue_on_oom If out of memory error occurs, continue operating\n");
@@ -66,6 +67,7 @@ print_usage()
6667
printf(" --check_cli_params_only Only check the cli arguments, do not run the server\n");
6768
printf(" --init_file_path <path> Execute all statements in this file on start-up\n");
6869
printf(" --cache_dir Specify the directory to use to cache remote files (from S3)\n");
70+
printf(" --recv_dir <path> Base dir for RECEIVE-query landing files; default <cache_dir>/recv or /tmp/pgduck_recv\n");
6971
printf(" --extensions_dir <path> Install and load extensions in the specified directory\n");
7072
printf(" --pidfile <path> Write the pid of this program to the given path\n");
7173
printf(" --no_extension_install Disable extension installation\n");
@@ -85,6 +87,7 @@ parse_arguments(int argc, char *argv[])
8587
.unix_socket_directory = DEFAULT_UNIX_DOMAIN_PATH,
8688
.unix_socket_group = DEFAULT_UNIX_DOMAIN_GROUP,
8789
.unix_socket_permissions = DEFAULT_UNIX_DOMAIN_PERMISSIONS,
90+
.listen_addresses = NULL,
8891
.port = DEFAULT_PORT,
8992
.max_clients = DEFAULT_MAX_CLIENTS,
9093
.memory_limit = NULL,
@@ -97,6 +100,7 @@ parse_arguments(int argc, char *argv[])
97100
.extensions_dir = NULL,
98101
.no_extension_install = false,
99102
.debug = false,
103+
.recv_dir = NULL,
100104
};
101105
int opt;
102106
int option_index = 0;
@@ -108,6 +112,7 @@ parse_arguments(int argc, char *argv[])
108112
{"unix_socket_directory", required_argument, NULL, 'U'},
109113
{"unix_socket_group", required_argument, NULL, 'G'},
110114
{"unix_socket_permissions", required_argument, NULL, 'm'},
115+
{"listen_addresses", required_argument, NULL, 'A'},
111116
{"port", required_argument, NULL, 'P'},
112117
{"max_clients", required_argument, NULL, 'M'},
113118
{"memory_limit", required_argument, NULL, 'l'},
@@ -120,10 +125,11 @@ parse_arguments(int argc, char *argv[])
120125
{"init_file_path", required_argument, NULL, 'i'},
121126
{"pidfile", required_argument, NULL, 'p'},
122127
{"debug", no_argument, NULL, 'd'},
128+
{"recv_dir", required_argument, NULL, 'R'},
123129
{0, 0, 0, 0}
124130
};
125131

126-
while ((opt = getopt_long(argc, argv, "cvhU:P:M:D:l:L:p:d", long_options, &option_index)) != -1)
132+
while ((opt = getopt_long(argc, argv, "cvhU:A:P:M:D:l:L:p:dR:", long_options, &option_index)) != -1)
127133
{
128134
switch (opt)
129135
{
@@ -143,6 +149,15 @@ parse_arguments(int argc, char *argv[])
143149
case 'G':
144150
options.unix_socket_group = strdup(optarg);
145151
break;
152+
case 'A':
153+
154+
/*
155+
* Empty string is allowed and disables TCP listening (same as
156+
* not passing the flag at all). Any non-empty value is
157+
* treated as a comma-separated list of addresses to bind on.
158+
*/
159+
options.listen_addresses = strdup(optarg);
160+
break;
146161
case 'l':
147162
if (optarg)
148163
options.memory_limit = strdup(optarg);
@@ -252,6 +267,9 @@ parse_arguments(int argc, char *argv[])
252267
case 'p':
253268
options.pidfile_path = strdup(optarg);
254269
break;
270+
case 'R':
271+
options.recv_dir = strdup(optarg);
272+
break;
255273
case '?':
256274
print_usage();
257275
exit(EXIT_FAILURE);

pgduck_server/src/main.c

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
*/
2323
#include <stdio.h>
2424
#include <signal.h>
25+
#include <stdlib.h>
26+
#include <string.h>
2527

2628
#include "c.h"
2729
#include "postgres_fe.h"
@@ -32,6 +34,7 @@
3234
#include "pgserver/pgserver.h"
3335
#include "pgserver/client_threadpool.h"
3436
#include "pgsession/pgsession.h"
37+
#include "pgsession/recv_sink.h"
3538
#include "duckdb/duckdb.h"
3639
#include "utils/pidfile.h"
3740

@@ -74,6 +77,47 @@ main(int argc, char *argv[])
7477

7578
pgclient_threadpool_init(options.max_clients);
7679

80+
/*
81+
* Resolve and initialize the RECEIVE-query landing directory.
82+
*
83+
* Default precedence: --recv_dir > <cache_dir>/recv > /tmp/pgduck_recv.
84+
* The directory is created (mode 0700) and any stale files from a prior
85+
* run are unlinked.
86+
*/
87+
{
88+
char *recv_dir = options.recv_dir;
89+
char *resolved = NULL;
90+
91+
if (recv_dir == NULL)
92+
{
93+
if (options.cache_dir != NULL)
94+
{
95+
size_t len = strlen(options.cache_dir) + sizeof("/recv");
96+
97+
resolved = malloc(len);
98+
if (resolved == NULL)
99+
{
100+
PGDUCK_SERVER_ERROR("out of memory deriving recv_dir");
101+
return STATUS_ERROR;
102+
}
103+
snprintf(resolved, len, "%s/recv", options.cache_dir);
104+
recv_dir = resolved;
105+
}
106+
else
107+
{
108+
recv_dir = "/tmp/pgduck_recv";
109+
}
110+
}
111+
112+
if (recv_sink_global_init(recv_dir) != 0)
113+
{
114+
free(resolved);
115+
return STATUS_ERROR;
116+
}
117+
PGDUCK_SERVER_LOG("RECEIVE-query landing dir: %s", recv_dir);
118+
free(resolved);
119+
}
120+
77121
PGServer pgServer;
78122

79123
srand(time(NULL));
@@ -82,6 +126,7 @@ main(int argc, char *argv[])
82126
options.unix_socket_directory,
83127
options.unix_socket_group,
84128
options.unix_socket_permissions,
129+
options.listen_addresses,
85130
options.port) != STATUS_OK)
86131
return STATUS_ERROR;
87132

0 commit comments

Comments
 (0)