Skip to content

Commit a85d4fe

Browse files
pgduck_server handles sigint/sigterm gracefully (#229)
Signed-off-by: Aykut Bozkurt <aykut.bozkurt@snowflake.com>
1 parent 9d2b5ab commit a85d4fe

7 files changed

Lines changed: 335 additions & 41 deletions

File tree

pgduck_server/include/pgserver/client_threadpool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,6 @@ extern int pgclient_threadpool_cancel_thread(int cancellationProcId, uint8 *canc
4141
extern int pgclient_threadpool_cancel_thread(int cancellationProcId, int32 cancellationToken);
4242
#endif
4343
extern void pgclient_threadpool_set_duckdb_conn(int threadIndex, duckdb_connection conn);
44+
extern int pgclient_threadpool_cancel_all(void);
4445

4546
#endif /* PGDUCK_CLIENT_THREAD_H */

pgduck_server/include/pgserver/pgserver.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ extern int pgserver_init(PGServer * pgServer,
5050
int unixSocketPermissions,
5151
int port);
5252
extern int pgserver_run(PGServer * pgServer);
53-
extern int pgserver_destroy(PGServer * pgServer);
53+
extern void pgserver_destroy(PGServer * pgServer);
5454

5555

5656
#endif /* PGDUCK_PG_SERVER_H */

pgduck_server/src/main.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,7 @@ main(int argc, char *argv[])
8888
if (pgserver_run(&pgServer) != STATUS_OK)
8989
return STATUS_ERROR;
9090

91-
if (pgserver_destroy(&pgServer) != STATUS_OK)
92-
return STATUS_ERROR;
91+
pgserver_destroy(&pgServer);
9392

9493
return STATUS_OK;
9594
}

pgduck_server/src/pgserver/client_threadpool.c

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,3 +323,36 @@ pgclient_threadpool_set_duckdb_conn(int threadIndex, duckdb_connection conn)
323323

324324
pthread_rwlock_unlock(&rwlock);
325325
}
326+
327+
328+
/*
329+
* pgclient_threadpool_cancel_all interrupts every active DuckDB query.
330+
*
331+
* Called during server shutdown so that client threads get a clean
332+
* interruption error instead of an abrupt connection reset when the
333+
* process exits. Only sets the DuckDB interrupt flag (an atomic bool),
334+
* so this is cheap and safe to call from the main thread.
335+
*
336+
* Returns the number of active threads that were interrupted.
337+
*/
338+
int
339+
pgclient_threadpool_cancel_all(void)
340+
{
341+
int interrupted = 0;
342+
343+
pthread_rwlock_rdlock(&rwlock);
344+
345+
for (int i = 0; i < MaxThreads; i++)
346+
{
347+
if (ClientThreadPool[i].isStarted &&
348+
ClientThreadPool[i].duckdbConnection != NULL)
349+
{
350+
duckdb_interrupt(ClientThreadPool[i].duckdbConnection);
351+
interrupted++;
352+
}
353+
}
354+
355+
pthread_rwlock_unlock(&rwlock);
356+
357+
return interrupted;
358+
}

pgduck_server/src/pgserver/pgserver.c

Lines changed: 142 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <netdb.h>
3131
#include <common/ip.h>
3232
#include <pthread.h>
33+
#include <signal.h>
3334
#include <sys/fcntl.h>
3435
#include <sys/file.h>
3536
#include <sys/stat.h>
@@ -325,45 +326,98 @@ set_unix_socket_permissions(char *unixSocketPath, char *groupName, int permissio
325326

326327
static volatile sig_atomic_t running = 1;
327328

328-
/* basic sigint handler */
329329
static void
330-
handle_signal(int sig)
330+
handle_shutdown_signal(int sig)
331331
{
332332
running = 0;
333333
}
334334

335335

336336
/*
337-
* pgserver_run is the main loop for the PostgreSQL wire compatible server.
337+
* install_shutdown_signal_handlers -- install SIGINT/SIGTERM handlers.
338+
*
339+
* The handler simply sets `running = 0`. SA_RESTART is deliberately
340+
* *not* set so that accept() in the main loop returns -1/EINTR, giving
341+
* the loop a chance to notice the flag and exit promptly.
338342
*/
339-
int
340-
pgserver_run(PGServer * pgServer)
343+
static int
344+
install_shutdown_signal_handlers(void)
341345
{
342-
/* install signal handlers */
343346
struct sigaction sa;
344347

345-
/* Use our custom handler */
346-
sa.sa_handler = handle_signal;
347-
348-
/* Do not block any other signals during handling */
348+
sa.sa_handler = handle_shutdown_signal;
349349
sigemptyset(&sa.sa_mask);
350+
sa.sa_flags = 0; /* no SA_RESTART — accept() must return
351+
* EINTR */
350352

351-
/* CRITICAL: Do NOT set the SA_RESTART flag. */
352-
/* This ensures that system calls like accept() are interrupted. */
353-
sa.sa_flags = 0;
353+
if (sigaction(SIGINT, &sa, NULL) == -1 ||
354+
sigaction(SIGTERM, &sa, NULL) == -1)
355+
{
356+
PGDUCK_SERVER_ERROR("sigaction failed: %s", strerror(errno));
357+
return STATUS_ERROR;
358+
}
359+
360+
return STATUS_OK;
361+
}
354362

355-
/* Install the handler for SIGINT and SIGTERM */
356-
if (sigaction(SIGINT, &sa, NULL) == -1)
363+
364+
/*
365+
* disable_shutdown_signals -- block SIGINT/SIGTERM in the calling thread.
366+
*
367+
* Used before pthread_create() so the child thread inherits a blocked
368+
* mask and never receives these signals.
369+
*/
370+
static int
371+
disable_shutdown_signals(void)
372+
{
373+
sigset_t sigs;
374+
375+
sigemptyset(&sigs);
376+
sigaddset(&sigs, SIGINT);
377+
sigaddset(&sigs, SIGTERM);
378+
379+
if (pthread_sigmask(SIG_BLOCK, &sigs, NULL) != 0)
357380
{
358-
perror("sigaction for SIGINT failed");
359-
exit(STATUS_ERROR);
381+
PGDUCK_SERVER_ERROR("pthread_sigmask failed: %s", strerror(errno));
382+
return STATUS_ERROR;
360383
}
361-
if (sigaction(SIGTERM, &sa, NULL) == -1)
384+
385+
return STATUS_OK;
386+
}
387+
388+
389+
/*
390+
* enable_shutdown_signals -- unblock SIGINT/SIGTERM and re-install the
391+
* shutdown signal handlers so accept() can be interrupted again.
392+
*/
393+
static int
394+
enable_shutdown_signals(void)
395+
{
396+
sigset_t sigs;
397+
398+
sigemptyset(&sigs);
399+
sigaddset(&sigs, SIGINT);
400+
sigaddset(&sigs, SIGTERM);
401+
402+
if (pthread_sigmask(SIG_UNBLOCK, &sigs, NULL) != 0)
362403
{
363-
perror("sigaction for SIGTERM failed");
364-
exit(STATUS_ERROR);
404+
PGDUCK_SERVER_ERROR("pthread_sigmask failed: %s", strerror(errno));
405+
return STATUS_ERROR;
365406
}
366407

408+
return STATUS_OK;
409+
}
410+
411+
412+
/*
413+
* pgserver_run is the main loop for the PostgreSQL wire compatible server.
414+
*/
415+
int
416+
pgserver_run(PGServer * pgServer)
417+
{
418+
if (install_shutdown_signal_handlers() != STATUS_OK)
419+
return STATUS_ERROR;
420+
367421
while (running)
368422
{
369423
PGClient *client = (PGClient *) pg_malloc0(sizeof(PGClient));
@@ -375,8 +429,21 @@ pgserver_run(PGServer * pgServer)
375429

376430
if (client->clientSocket < 0)
377431
{
432+
int save_errno = errno;
433+
434+
pg_free(client);
435+
436+
/*
437+
* EINTR can come from our shutdown handler (running == 0) or from
438+
* unrelated sources like a debugger attaching (ptrace). In either
439+
* case, just retry the loop — the while-condition takes care of
440+
* the shutdown case.
441+
*/
442+
if (save_errno == EINTR)
443+
continue;
444+
378445
PGDUCK_SERVER_ERROR("Could not accept the client: %s",
379-
strerror(errno));
446+
strerror(save_errno));
380447

381448
/*
382449
* TODO: We can probably recover from this error, but lets handle
@@ -422,6 +489,9 @@ pgserver_run(PGServer * pgServer)
422489
initState->startFunction = pgServer->startFunction;
423490
initState->pgClient = client;
424491

492+
if (disable_shutdown_signals() != STATUS_OK)
493+
exit(STATUS_ERROR);
494+
425495
if (pgserver_create_client_thread(initState) != OK)
426496
{
427497
PGDUCK_SERVER_ERROR("Thread creation failed for client %d", client->clientSocket);
@@ -430,19 +500,61 @@ pgserver_run(PGServer * pgServer)
430500
pg_free(client);
431501
pg_free(initState);
432502
pgclient_threadpool_free_slot(threadIndex);
433-
continue;
434503
}
435-
}
436504

437-
PGDUCK_SERVER_LOG("Done running");
505+
if (enable_shutdown_signals() != STATUS_OK)
506+
exit(STATUS_ERROR);
507+
}
438508

439509
return STATUS_OK;
440510
}
441511

442512

513+
/*
514+
* pgserver_destroy performs a graceful shutdown of the server.
515+
*
516+
* 1. Close the listening socket so no new connections are accepted.
517+
* 2. Interrupt every active DuckDB query so client threads can send a
518+
* proper error to their clients instead of an abrupt TCP reset.
519+
* 3. Brief grace period to let interrupted threads finish their error
520+
* path and close their sockets cleanly.
521+
*
522+
* We don't join the client threads (they are detached), so the grace
523+
* period is best-effort. When main() returns, exit() will tear down
524+
* any remaining threads.
525+
*/
526+
void
527+
pgserver_destroy(PGServer * pgServer)
528+
{
529+
PGDUCK_SERVER_LOG("Shutting down: closing listening socket");
530+
closesocket(pgServer->listeningSocket);
531+
pgServer->listeningSocket = -1;
532+
533+
int interrupted = pgclient_threadpool_cancel_all();
534+
535+
if (interrupted > 0)
536+
{
537+
PGDUCK_SERVER_LOG("Shutting down: interrupted %d active connection(s), "
538+
"waiting briefly for them to finish", interrupted);
539+
540+
/*
541+
* 2 seconds is generous for the threads to send an error to their
542+
* client and run through pgclient_thread_cleanup.
543+
*/
544+
pg_usleep(2 * 1000000L);
545+
}
546+
547+
PGDUCK_SERVER_LOG("Done running");
548+
}
549+
550+
443551
/*
444552
* pgserver_create_client_thread creates a new thread for the client.
445553
* We use PTHREAD_CREATE_DETACHED so that we don't have to join the threads.
554+
*
555+
* The caller must block shutdown signals before calling this function
556+
* so the new thread inherits a blocked mask and never receives
557+
* SIGINT/SIGTERM.
446558
*/
447559
static int
448560
pgserver_create_client_thread(const PgClientThreadInitState * initState)
@@ -487,6 +599,12 @@ pgclient_thread_main(void *arg)
487599
{
488600
PgClientThreadInitState *initState = (PgClientThreadInitState *) arg;
489601

602+
/*
603+
* SIGINT/SIGTERM are already blocked — pgserver_create_client_thread()
604+
* blocks them before pthread_create(), so this thread inherits a blocked
605+
* mask. No per-thread sigmask call needed.
606+
*/
607+
490608
/* cleanup handler */
491609
pthread_cleanup_push(pgclient_thread_cleanup, arg);
492610

@@ -523,18 +641,6 @@ pgclient_thread_cleanup(void *arg)
523641
}
524642

525643

526-
/*
527-
* cleanup on successful exists.
528-
*
529-
* TODO: not called ever yet
530-
*/
531-
int
532-
pgserver_destroy(PGServer * pgServer)
533-
{
534-
closesocket(pgServer->listeningSocket);
535-
536-
return STATUS_OK;
537-
}
538644

539645

540646
/*

pgduck_server/tests/pytests/test_s3.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,7 @@ def test_s3_get_region_invalid(pgduck_conn):
198198
pgduck_conn,
199199
raise_error=False,
200200
)
201-
assert "Could not establish connection error" in error
201+
assert (
202+
"Could not establish connection error" in error
203+
or "server closed the connection" in error
204+
)

0 commit comments

Comments
 (0)