Skip to content

Commit 5638b9f

Browse files
Add routine to get running worker pid (#295)
* Add extension_base.get_worker_pid() to look up a worker's PID by job ID Exposes GetBaseWorkerPid() as an exportable C function (declared in base_workers.h) that reads the shared-memory BaseWorkerHash under a shared lock and returns the PID for the given worker ID in the current database, or 0 if not found. The SQL wrapper get_worker_pid(int) delegates to it. Tests cover both the running and not-found cases. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: David Christensen <david.christensen@snowflake.com> * Remove get_worker_pid() from base SQL; keep only in upgrade path The function is new in 3.3 so it belongs only in the 3.2->3.3 upgrade script, not in the 1.6 base install SQL. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: David Christensen <david.christensen@snowflake.com> * black format test_base_worker_launcher.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: David Christensen <david.christensen@snowflake.com> --------- Signed-off-by: David Christensen <david.christensen@snowflake.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent e7ae547 commit 5638b9f

File tree

4 files changed

+86
-0
lines changed

4 files changed

+86
-0
lines changed

pg_extension_base/include/pg_extension_base/base_workers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,4 +115,6 @@ extern PGDLLEXPORT void DeregisterBaseWorkerSelf(void);
115115

116116
extern PGDLLEXPORT int32 MyBaseWorkerId;
117117

118+
extern PGDLLEXPORT pid_t GetBaseWorkerPid(int32 workerId);
119+
118120
#endif

pg_extension_base/pg_extension_base--3.2--3.3.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,15 @@ COMMENT ON FUNCTION extension_base.deregister_worker(text, bool)
3232
IS 'deregister a base worker';
3333

3434
REVOKE ALL ON FUNCTION extension_base.deregister_worker(text, bool) FROM public;
35+
36+
37+
/* return the PID of a running base worker given its worker ID, or 0 if not found */
38+
CREATE FUNCTION extension_base.get_worker_pid(worker_id int)
39+
RETURNS int
40+
LANGUAGE c STRICT
41+
AS 'MODULE_PATHNAME', $function$pg_extension_base_get_worker_pid$function$;
42+
43+
COMMENT ON FUNCTION extension_base.get_worker_pid(int)
44+
IS 'return the PID of a running base worker, or 0 if not found';
45+
46+
REVOKE ALL ON FUNCTION extension_base.get_worker_pid(int) FROM public;

pg_extension_base/src/base_worker_launcher.c

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ static void BaseWorkerTransactionCallback(XactEvent event, void *arg);
367367
/* SQL-callable functions */
368368
PG_FUNCTION_INFO_V1(pg_extension_base_register_worker);
369369
PG_FUNCTION_INFO_V1(pg_extension_base_deregister_worker);
370+
PG_FUNCTION_INFO_V1(pg_extension_base_get_worker_pid);
370371
PG_FUNCTION_INFO_V1(pg_extension_base_list_base_workers);
371372
PG_FUNCTION_INFO_V1(pg_extension_base_list_database_starters);
372373

@@ -2925,6 +2926,42 @@ BaseWorkerTransactionCallback(XactEvent event, void *arg)
29252926
}
29262927

29272928

2929+
/*
2930+
* GetBaseWorkerPid returns the PID of a running base worker given its worker
2931+
* ID, or 0 if the worker is not found in the current database.
2932+
*/
2933+
pid_t
2934+
GetBaseWorkerPid(int32 workerId)
2935+
{
2936+
pid_t workerPid = 0;
2937+
2938+
LWLockAcquire(&BaseWorkerControl->lock, LW_SHARED);
2939+
2940+
bool isFound = false;
2941+
BaseWorkerEntry *workerEntry = GetBaseWorkerEntry(MyDatabaseId, workerId, &isFound);
2942+
2943+
if (isFound)
2944+
workerPid = workerEntry->workerPid;
2945+
2946+
LWLockRelease(&BaseWorkerControl->lock);
2947+
2948+
return workerPid;
2949+
}
2950+
2951+
2952+
/*
2953+
* pg_extension_base_get_worker_pid implements the extension_base.get_worker_pid
2954+
* SQL function.
2955+
*/
2956+
Datum
2957+
pg_extension_base_get_worker_pid(PG_FUNCTION_ARGS)
2958+
{
2959+
int32 workerId = PG_GETARG_INT32(0);
2960+
2961+
PG_RETURN_INT32(GetBaseWorkerPid(workerId));
2962+
}
2963+
2964+
29282965
/*
29292966
* pg_extension_base_list_base_workers returns the state of the base workers in shared
29302967
* memory.

pg_extension_base/tests/pytests/test_base_worker_launcher.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,41 @@ def test_oneshot_worker_drop_database(superuser_conn):
431431
superuser_conn.autocommit = False
432432

433433

434+
def test_get_worker_pid_running(superuser_conn):
435+
run_command(
436+
"CREATE EXTENSION pg_extension_base_test_scheduler CASCADE", superuser_conn
437+
)
438+
superuser_conn.commit()
439+
time.sleep(0.1)
440+
441+
worker_id = run_query(
442+
"SELECT worker_id FROM extension_base.workers WHERE worker_name = 'pg_extension_base_test_scheduler_main_worker'",
443+
superuser_conn,
444+
)[0][0]
445+
446+
pid = run_query(
447+
f"SELECT extension_base.get_worker_pid({worker_id})",
448+
superuser_conn,
449+
)[0][0]
450+
assert pid > 0
451+
452+
run_command(
453+
"DROP EXTENSION pg_extension_base_test_scheduler CASCADE", superuser_conn
454+
)
455+
superuser_conn.commit()
456+
457+
458+
def test_get_worker_pid_not_found(superuser_conn):
459+
pid = run_query(
460+
"SELECT extension_base.get_worker_pid(99999)",
461+
superuser_conn,
462+
)[
463+
0
464+
][0]
465+
assert pid == 0
466+
superuser_conn.rollback()
467+
468+
434469
def get_pg_extension_workers(conn):
435470
query = "SELECT datname, application_name FROM pg_stat_activity WHERE backend_type LIKE 'pg_base_extension server %' OR backend_type LIKE 'pg base extension%' ORDER BY application_name"
436471
result = run_query(query, conn)

0 commit comments

Comments
 (0)