Skip to content

Commit 150fed0

Browse files
fix spurious cancel on pgduck_server
Signed-off-by: Aykut Bozkurt <aykut.bozkurt@snowflake.com>
1 parent af1c864 commit 150fed0

3 files changed

Lines changed: 62 additions & 18 deletions

File tree

pgduck_server/include/pgserver/client_threadpool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ extern int pgclient_threadpool_cancel_thread(int cancellationProcId, uint8 *canc
4141
extern int pgclient_threadpool_cancel_thread(int cancellationProcId, int32 cancellationToken);
4242
#endif
4343
extern void pgclient_threadpool_set_duckdb_conn(int threadIndex, duckdb_connection conn);
44+
extern void pgclient_threadpool_set_executing(int threadIndex, bool executing);
4445
extern int pgclient_threadpool_cancel_all(void);
4546

4647
#endif /* PGDUCK_CLIENT_THREAD_H */

pgduck_server/src/pgserver/client_threadpool.c

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ typedef struct PgClientThreadState
5858
/* DuckDB connection to interrupt */
5959
duckdb_connection duckdbConnection;
6060

61+
/*
62+
* Whether the thread is currently executing a DuckDB query. Set by the
63+
* session thread (under wrlock) before/after DuckDB execution so that the
64+
* cancel handler can skip duckdb_interrupt() on idle sessions.
65+
*/
66+
bool isExecutingQuery;
67+
6168
} PgClientThreadState;
6269

6370
int MaxAllowedClients;
@@ -221,6 +228,7 @@ pgclient_threadpool_free_slot(int threadIndex)
221228
threadState->cancellationTokenSize = 0;
222229
#endif
223230
threadState->duckdbConnection = NULL;
231+
threadState->isExecutingQuery = false;
224232

225233
--ActiveClientThreadCount;
226234

@@ -252,18 +260,18 @@ pgclient_threadpool_cancel_thread(int cancellationProcId, uint8 *cancellationTok
252260
ClientThreadPool[threadIndex].cancellationProcId == cancellationProcId)
253261
{
254262
usedThreadIndex = threadIndex;
255-
duckdb_connection conn = ClientThreadPool[threadIndex].duckdbConnection;
256263

257264
/*
258-
* As per DuckDB docs, duckdb connections are thread safe so we
259-
* can safely interrupt it from another thread.
260-
*
261-
* We do the interrupt while holding the threadpool lock.
262-
* Otherwise, a thread could end just before we call
263-
* duckdb_interrupt. Luckily, duckdb_interrupt will only set an
264-
* atomic<bool> flag.
265+
* Only interrupt when the session is actually executing a DuckDB
266+
* query. Interrupting an idle session sets an atomic flag that
267+
* persists and poisons the next query on that connection.
265268
*/
266-
duckdb_interrupt(conn);
269+
if (ClientThreadPool[threadIndex].isExecutingQuery)
270+
{
271+
duckdb_connection conn = ClientThreadPool[threadIndex].duckdbConnection;
272+
273+
duckdb_interrupt(conn);
274+
}
267275
break;
268276
}
269277
}
@@ -286,18 +294,18 @@ pgclient_threadpool_cancel_thread(int cancellationProcId, int32 cancellationToke
286294
ClientThreadPool[threadIndex].cancellationProcId == cancellationProcId)
287295
{
288296
usedThreadIndex = threadIndex;
289-
duckdb_connection conn = ClientThreadPool[threadIndex].duckdbConnection;
290297

291298
/*
292-
* As per DuckDB docs, duckdb connections are thread safe so we
293-
* can safely interrupt it from another thread.
294-
*
295-
* We do the interrupt while holding the threadpool lock.
296-
* Otherwise, a thread could end just before we call
297-
* duckdb_interrupt. Luckily, duckdb_interrupt will only set an
298-
* atomic<bool> flag.
299+
* Only interrupt when the session is actually executing a DuckDB
300+
* query. Interrupting an idle session sets an atomic flag that
301+
* persists and poisons the next query on that connection.
299302
*/
300-
duckdb_interrupt(conn);
303+
if (ClientThreadPool[threadIndex].isExecutingQuery)
304+
{
305+
duckdb_connection conn = ClientThreadPool[threadIndex].duckdbConnection;
306+
307+
duckdb_interrupt(conn);
308+
}
301309
break;
302310
}
303311
}
@@ -325,6 +333,20 @@ pgclient_threadpool_set_duckdb_conn(int threadIndex, duckdb_connection conn)
325333
}
326334

327335

336+
/*
337+
* pgclient_threadpool_set_executing marks whether the given thread is
338+
* currently executing a DuckDB query. The cancel handler checks this
339+
* flag (under rdlock) to avoid interrupting idle sessions.
340+
*/
341+
void
342+
pgclient_threadpool_set_executing(int threadIndex, bool executing)
343+
{
344+
pthread_rwlock_wrlock(&rwlock);
345+
ClientThreadPool[threadIndex].isExecutingQuery = executing;
346+
pthread_rwlock_unlock(&rwlock);
347+
}
348+
349+
328350
/*
329351
* pgclient_threadpool_cancel_all interrupts every active DuckDB query.
330352
*

pgduck_server/src/pgsession/pgsession.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,15 @@ pgsession_handle_connection(void *input)
346346

347347

348348
finally:
349+
/*
350+
* Clear the DuckDB connection pointer from the thread pool BEFORE
351+
* destroying the session. Otherwise a cancel request arriving between
352+
* duckdb_disconnect (inside pgsession_destroy) and
353+
* pgclient_threadpool_free_slot would call duckdb_interrupt on a freed
354+
* pointer.
355+
*/
356+
pgclient_threadpool_set_duckdb_conn(pgClient->threadIndex, NULL);
357+
349358
pgsession_prepared_statement_deallocate(&pgSession);
350359
pgsession_destroy(&pgSession);
351360
pg_free(inputMessage.data);
@@ -392,8 +401,12 @@ process_query_message(PGSession * pgSession, StringInfo inputMessage)
392401
}
393402

394403
char *errorMessage = NULL;
404+
int threadIndex = pgSession->pgClient->threadIndex;
405+
406+
pgclient_threadpool_set_executing(threadIndex, true);
395407
DuckDBStatus status = duckdb_session_run_command(&pgSession->duckSession, queryString,
396408
&responseFormat, &errorMessage);
409+
pgclient_threadpool_set_executing(threadIndex, false);
397410

398411
if (status == DUCKDB_SUCCESS)
399412
{
@@ -506,8 +519,12 @@ process_parse_message(PGSession * pgSession, StringInfo inputMessage)
506519
char *queryStringCopy = pstrdup(queryString);
507520

508521
char *errorMessage = NULL;
522+
int threadIndex = pgSession->pgClient->threadIndex;
523+
524+
pgclient_threadpool_set_executing(threadIndex, true);
509525
DuckDBStatus status = duckdb_session_prepare(&pgSession->duckSession, queryStringCopy,
510526
&errorMessage);
527+
pgclient_threadpool_set_executing(threadIndex, false);
511528

512529
/*
513530
* make sure we destroy the prepared statement, even in case of failure
@@ -813,9 +830,13 @@ process_execute_message(PGSession * pgSession, StringInfo inputMessage)
813830
ResponseFormat *responseFormat = &pgSession->pgSessionPreparedStmt.responseFormat;
814831

815832
char *errorMessage = NULL;
833+
int threadIndex = pgSession->pgClient->threadIndex;
834+
835+
pgclient_threadpool_set_executing(threadIndex, true);
816836
DuckDBStatus status = duckdb_session_execute_prepared(&pgSession->duckSession,
817837
responseFormat,
818838
&errorMessage);
839+
pgclient_threadpool_set_executing(threadIndex, false);
819840

820841
if (status == DUCKDB_SUCCESS)
821842
{

0 commit comments

Comments
 (0)