Skip to content

Commit 34d2a8c

Browse files
committed
Scrub user mapping secrets in-place via ProcessUtility handler
When CREATE USER MAPPING or ALTER USER MAPPING targets an iceberg_catalog server, overwrite client_id and client_secret values with asterisks directly in the queryString buffer. This prevents credential leakage in pg_stat_statements and similar. Why in-place mutation instead of a copy: pg_stat_statements captures the queryString pointer before calling prev_ProcessUtility, then passes the same local pointer to pgss_store after the call returns. A pstrdup copy would be invisible to it. By overwriting the original buffer, every consumer sharing that pointer - like pg_stat_statements - sees the scrubbed version automatically. DDL execution is unaffected because CREATE/ALTER USER MAPPING reads option values from DefElem nodes in the parse tree, not queryString. Why RegisterUtilityStatementHandler instead of ProcessUtility_hook: pg_lake_iceberg.so is preloaded by pg_extension_base during shared_preload_libraries (via pg_lake_table.control), which runs before pg_stat_statements installs its hook. A hook installed in _PG_init would therefore sit below pg_stat_statements in the chain, defeating the purpose. Using pg_lake_engine's handler mechanism (which also runs below pg_stat_statements) is fine because the in-place mutation affects the shared buffer regardless of call order. Implementation: - iceberg_catalog_secret_options: separates secret names (client_id, client_secret) from the full user mapping option list so scope remains visible. - ScrubUserMappingSecrets: walks DefElem options, uses DefElem.location to find each secret quoted value in the query string, and overwrites value characters with * while preserving quotes and string length. - ScrubIcebergUserMappingHandler: detects user mapping DDL targeting iceberg_catalog servers, calls ScrubUserMappingSecrets, returns false so normal processing continues. Signed-off-by: sfc-gh-npuka <naisila.puka@snowflake.com>
1 parent 6203d7a commit 34d2a8c

4 files changed

Lines changed: 335 additions & 0 deletions

File tree

pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "pg_lake/http/http_client.h"
2222
#include "pg_lake/util/rel_utils.h"
2323
#include "pg_lake/parquet/field.h"
24+
#include "pg_lake/ddl/utility_hook.h"
2425
#include "pg_lake/iceberg/api/snapshot.h"
2526

2627
#define REST_CATALOG_AUTH_TYPE_DEFAULT (0)
@@ -125,3 +126,6 @@ extern PGDLLEXPORT RestCatalogRequest * GetSetCurrentSchemaCatalogRequest(Oid re
125126
extern PGDLLEXPORT RestCatalogRequest * GetAddPartitionCatalogRequest(Oid relationId, List *partitionSpec);
126127
extern PGDLLEXPORT RestCatalogRequest * GetSetPartitionDefaultIdCatalogRequest(Oid relationId, int specId);
127128
extern PGDLLEXPORT RestCatalogRequest * GetRemoveSnapshotCatalogRequest(List *removedSnapshotIds, Oid relationId);
129+
130+
/* ProcessUtility handler: scrubs secrets from user mapping DDL in-place */
131+
extern PGDLLEXPORT bool ScrubIcebergUserMappingHandler(ProcessUtilityParams *processUtilityParams, void *arg);

pg_lake_iceberg/src/init.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,8 @@ _PG_init(void)
317317
NULL, NULL, NULL);
318318

319319
AvroInit();
320+
321+
RegisterUtilityStatementHandler(ScrubIcebergUserMappingHandler, NULL);
320322
}
321323

322324

pg_lake_iceberg/src/rest_catalog/rest_catalog.c

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@
3939
#include "utils/syscache.h"
4040
#include "utils/timestamp.h"
4141

42+
#include "nodes/parsenodes.h"
43+
4244
#include "pg_extension_base/base_workers.h"
45+
#include "pg_lake/ddl/utility_hook.h"
4346
#include "pg_lake/http/http_client.h"
4447
#include "pg_lake/iceberg/api/table_schema.h"
4548
#include "pg_lake/iceberg/catalog.h"
@@ -110,6 +113,7 @@ static const char *iceberg_catalog_user_mapping_options[] = {
110113
};
111114

112115

116+
113117
static bool
114118
is_valid_option_in_list(const char *keyword, const char *const *options)
115119
{
@@ -202,6 +206,130 @@ iceberg_catalog_validator(PG_FUNCTION_ARGS)
202206
}
203207

204208

209+
/*
210+
* IsIcebergCatalogServer returns true if the named server exists and uses
211+
* the iceberg_catalog FDW. Returns false (without error) when the server
212+
* does not exist yet (e.g. during CREATE SERVER ... CREATE USER MAPPING
213+
* inside the same transaction before the server is committed).
214+
*/
215+
static bool
216+
IsIcebergCatalogServer(const char *serverName)
217+
{
218+
ForeignServer *server = GetForeignServerByName(serverName, true);
219+
220+
if (server == NULL)
221+
return false;
222+
223+
ForeignDataWrapper *fdw = GetForeignDataWrapper(server->fdwid);
224+
225+
return strcmp(fdw->fdwname, ICEBERG_CATALOG_FDW_NAME) == 0;
226+
}
227+
228+
229+
/*
230+
* ScrubUserMappingSecrets overwrites secret option values in the query
231+
* string in-place with asterisks.
232+
*
233+
* In-place mutation is essential: pg_stat_statements captures the queryString
234+
* pointer before calling prev_ProcessUtility, then stores it after the call
235+
* returns. A copy with pstrdup would be invisible to pg_stat_statements
236+
* because its local pointer still references the original memory. By
237+
* overwriting the original buffer, every holder of that pointer — including
238+
* pg_stat_statements — sees the scrubbed version.
239+
*
240+
* The actual DDL execution is unaffected because CREATE/ALTER USER MAPPING
241+
* reads option values from the parse tree (DefElem nodes), not from
242+
* queryString.
243+
*/
244+
static void
245+
ScrubUserMappingSecrets(const char *queryString, List *options)
246+
{
247+
const char *secret_options[] = {"client_id", "client_secret", NULL};
248+
ListCell *lc;
249+
250+
foreach(lc, options)
251+
{
252+
DefElem *def = (DefElem *) lfirst(lc);
253+
254+
if (!is_valid_option_in_list(def->defname, secret_options))
255+
continue;
256+
257+
if (def->location < 0)
258+
continue;
259+
260+
char *p = (char *) queryString + def->location;
261+
262+
/* skip past the key name */
263+
while (*p && !isspace((unsigned char) *p) && *p != '\'')
264+
p++;
265+
266+
/* skip whitespace between key and opening quote */
267+
while (*p && *p != '\'')
268+
p++;
269+
270+
if (*p != '\'')
271+
continue;
272+
273+
p++; /* skip opening quote */
274+
275+
/* overwrite value characters with '*', handling '' escapes */
276+
while (*p && *p != '\'')
277+
*p++ = '*';
278+
while (*(p + 1) == '\'')
279+
{
280+
*p++ = '*'; /* first quote of '' pair */
281+
*p++ = '*'; /* second quote of '' pair */
282+
while (*p && *p != '\'')
283+
*p++ = '*';
284+
}
285+
}
286+
}
287+
288+
289+
/*
290+
* ScrubIcebergUserMappingHandler is a ProcessUtility handler registered via
291+
* pg_lake_engine's RegisterUtilityStatementHandler. When it detects a
292+
* CREATE/ALTER USER MAPPING targeting an iceberg_catalog server, it scrubs
293+
* secret values in the queryString in-place and returns false so normal
294+
* processing continues.
295+
*/
296+
bool
297+
ScrubIcebergUserMappingHandler(ProcessUtilityParams *processUtilityParams,
298+
void *arg)
299+
{
300+
Node *parsetree = processUtilityParams->plannedStmt->utilityStmt;
301+
const char *serverName = NULL;
302+
List *options = NIL;
303+
304+
if (IsA(parsetree, CreateUserMappingStmt))
305+
{
306+
CreateUserMappingStmt *stmt = (CreateUserMappingStmt *) parsetree;
307+
308+
serverName = stmt->servername;
309+
options = stmt->options;
310+
}
311+
else if (IsA(parsetree, AlterUserMappingStmt))
312+
{
313+
AlterUserMappingStmt *stmt = (AlterUserMappingStmt *) parsetree;
314+
315+
serverName = stmt->servername;
316+
options = stmt->options;
317+
}
318+
else
319+
return false;
320+
321+
if (serverName == NULL || options == NIL)
322+
return false;
323+
324+
if (!IsIcebergCatalogServer(serverName))
325+
return false;
326+
327+
ScrubUserMappingSecrets(processUtilityParams->queryString, options);
328+
329+
return false;
330+
}
331+
332+
205333
/*
206334
* GetRestCatalogConnectionFromGUCs returns a RestCatalogConnectionInfo
207335
* populated from the current GUC variables. Used for backward-compatible

pg_lake_table/tests/pytests/test_iceberg_catalog_server.py

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -809,3 +809,204 @@ def test_catalog_object_store_literal_still_works(
809809
pg_conn,
810810
)
811811
pg_conn.rollback()
812+
813+
814+
# ── Query string scrubbing for user mapping DDL ───────────────────────────
815+
816+
817+
def test_scrub_create_user_mapping_in_pg_stat_statements(
818+
installcheck, superuser_conn, extension
819+
):
820+
"""CREATE USER MAPPING secrets should be scrubbed in pg_stat_statements."""
821+
if installcheck:
822+
return
823+
824+
run_command("CREATE EXTENSION IF NOT EXISTS pg_stat_statements", superuser_conn)
825+
run_command("SELECT pg_stat_statements_reset()", superuser_conn)
826+
superuser_conn.commit()
827+
828+
run_command(
829+
"""
830+
CREATE SERVER test_scrub_srv TYPE 'rest'
831+
FOREIGN DATA WRAPPER iceberg_catalog
832+
OPTIONS (rest_endpoint 'http://localhost:8181')
833+
""",
834+
superuser_conn,
835+
)
836+
superuser_conn.commit()
837+
838+
run_command(
839+
"""
840+
CREATE USER MAPPING FOR CURRENT_USER SERVER test_scrub_srv
841+
OPTIONS (client_id 'secret_id_value', client_secret 'secret_key_value')
842+
""",
843+
superuser_conn,
844+
)
845+
superuser_conn.commit()
846+
847+
result = run_query(
848+
"""
849+
SELECT query FROM pg_stat_statements
850+
WHERE query ILIKE '%CREATE USER MAPPING%test_scrub_srv%'
851+
""",
852+
superuser_conn,
853+
)
854+
855+
assert len(result) >= 1
856+
query_text = result[0]["query"]
857+
assert "secret_id_value" not in query_text
858+
assert "secret_key_value" not in query_text
859+
assert "'***" in query_text
860+
861+
run_command(
862+
"DROP USER MAPPING FOR CURRENT_USER SERVER test_scrub_srv", superuser_conn
863+
)
864+
run_command("DROP SERVER test_scrub_srv", superuser_conn)
865+
superuser_conn.commit()
866+
867+
868+
def test_scrub_alter_user_mapping_in_pg_stat_statements(
869+
installcheck, superuser_conn, extension
870+
):
871+
"""ALTER USER MAPPING secrets should also be scrubbed in pg_stat_statements."""
872+
if installcheck:
873+
return
874+
875+
run_command("CREATE EXTENSION IF NOT EXISTS pg_stat_statements", superuser_conn)
876+
run_command("SELECT pg_stat_statements_reset()", superuser_conn)
877+
superuser_conn.commit()
878+
879+
run_command(
880+
"""
881+
CREATE SERVER test_scrub_alter_srv TYPE 'rest'
882+
FOREIGN DATA WRAPPER iceberg_catalog
883+
OPTIONS (rest_endpoint 'http://localhost:8181')
884+
""",
885+
superuser_conn,
886+
)
887+
run_command(
888+
"""
889+
CREATE USER MAPPING FOR CURRENT_USER SERVER test_scrub_alter_srv
890+
OPTIONS (client_id 'old_id', client_secret 'old_secret')
891+
""",
892+
superuser_conn,
893+
)
894+
superuser_conn.commit()
895+
896+
run_command("SELECT pg_stat_statements_reset()", superuser_conn)
897+
superuser_conn.commit()
898+
899+
run_command(
900+
"""
901+
ALTER USER MAPPING FOR CURRENT_USER SERVER test_scrub_alter_srv
902+
OPTIONS (SET client_id 'new_secret_id', SET client_secret 'new_secret_key')
903+
""",
904+
superuser_conn,
905+
)
906+
superuser_conn.commit()
907+
908+
result = run_query(
909+
"""
910+
SELECT query FROM pg_stat_statements
911+
WHERE query ILIKE '%ALTER USER MAPPING%test_scrub_alter_srv%'
912+
""",
913+
superuser_conn,
914+
)
915+
916+
assert len(result) >= 1
917+
query_text = result[0]["query"]
918+
assert "new_secret_id" not in query_text
919+
assert "new_secret_key" not in query_text
920+
assert "'***" in query_text
921+
922+
run_command(
923+
"DROP USER MAPPING FOR CURRENT_USER SERVER test_scrub_alter_srv",
924+
superuser_conn,
925+
)
926+
run_command("DROP SERVER test_scrub_alter_srv", superuser_conn)
927+
superuser_conn.commit()
928+
929+
930+
def test_scrub_preserves_actual_credentials(superuser_conn, extension):
931+
"""Scrubbing the query string should not affect the stored credentials."""
932+
run_command(
933+
"""
934+
CREATE SERVER test_scrub_creds_srv TYPE 'rest'
935+
FOREIGN DATA WRAPPER iceberg_catalog
936+
OPTIONS (rest_endpoint 'http://localhost:8181')
937+
""",
938+
superuser_conn,
939+
)
940+
run_command(
941+
"""
942+
CREATE USER MAPPING FOR CURRENT_USER SERVER test_scrub_creds_srv
943+
OPTIONS (client_id 'real_id', client_secret 'real_secret')
944+
""",
945+
superuser_conn,
946+
)
947+
948+
result = run_query(
949+
"""
950+
SELECT umoptions FROM pg_user_mapping um
951+
JOIN pg_foreign_server fs ON um.umserver = fs.oid
952+
WHERE fs.srvname = 'test_scrub_creds_srv'
953+
""",
954+
superuser_conn,
955+
)
956+
957+
assert len(result) == 1
958+
opts = result[0]["umoptions"]
959+
assert "client_id=real_id" in opts
960+
assert "client_secret=real_secret" in opts
961+
962+
superuser_conn.rollback()
963+
964+
965+
def test_scrub_leaves_scope_visible(installcheck, superuser_conn, extension):
966+
"""scope is not a secret — it should remain visible after scrubbing."""
967+
if installcheck:
968+
return
969+
970+
run_command("CREATE EXTENSION IF NOT EXISTS pg_stat_statements", superuser_conn)
971+
run_command("SELECT pg_stat_statements_reset()", superuser_conn)
972+
superuser_conn.commit()
973+
974+
run_command(
975+
"""
976+
CREATE SERVER test_scrub_scope_srv TYPE 'rest'
977+
FOREIGN DATA WRAPPER iceberg_catalog
978+
OPTIONS (rest_endpoint 'http://localhost:8181')
979+
""",
980+
superuser_conn,
981+
)
982+
superuser_conn.commit()
983+
984+
run_command(
985+
"""
986+
CREATE USER MAPPING FOR CURRENT_USER SERVER test_scrub_scope_srv
987+
OPTIONS (client_id 'id123', client_secret 'secret456', scope 'PRINCIPAL_ROLE:ALL')
988+
""",
989+
superuser_conn,
990+
)
991+
superuser_conn.commit()
992+
993+
result = run_query(
994+
"""
995+
SELECT query FROM pg_stat_statements
996+
WHERE query ILIKE '%CREATE USER MAPPING%test_scrub_scope_srv%'
997+
""",
998+
superuser_conn,
999+
)
1000+
1001+
assert len(result) >= 1
1002+
query_text = result[0]["query"]
1003+
assert "id123" not in query_text
1004+
assert "secret456" not in query_text
1005+
assert "PRINCIPAL_ROLE:ALL" in query_text
1006+
1007+
run_command(
1008+
"DROP USER MAPPING FOR CURRENT_USER SERVER test_scrub_scope_srv",
1009+
superuser_conn,
1010+
)
1011+
run_command("DROP SERVER test_scrub_scope_srv", superuser_conn)
1012+
superuser_conn.commit()

0 commit comments

Comments
 (0)