Skip to content

Commit f671ae1

Browse files
committed
Scrub user mapping secrets in-place via ProcessUtility handler
When CREATE USER MAPPING or ALTER USER MAPPING targets an iceberg_catalog server, overwrite client_id and client_secret values with asterisks directly in the queryString buffer. This prevents credential leakage in pg_stat_statements and similar. Implementation: - iceberg_catalog_secret_options: separates secret names (client_id, client_secret) from the full user mapping option list so scope remains visible. - ScrubUserMappingSecrets: walks DefElem options, uses DefElem.location to find each secret quoted value in the query string, and overwrites value characters with * while preserving quotes and string length. - ScrubIcebergUserMappingHandler: detects user mapping DDL targeting iceberg_catalog servers, calls ScrubUserMappingSecrets, returns false so normal processing continues. Signed-off-by: sfc-gh-npuka <naisila.puka@snowflake.com>
1 parent c666585 commit f671ae1

4 files changed

Lines changed: 314 additions & 1 deletion

File tree

pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "pg_lake/http/http_client.h"
2323
#include "pg_lake/util/rel_utils.h"
2424
#include "pg_lake/parquet/field.h"
25+
#include "pg_lake/ddl/utility_hook.h"
2526
#include "pg_lake/iceberg/api/snapshot.h"
2627

2728
#define REST_CATALOG_AUTH_TYPE_DEFAULT (0)
@@ -125,5 +126,6 @@ extern PGDLLEXPORT RestCatalogRequest * GetAddPartitionCatalogRequest(Oid relati
125126
extern PGDLLEXPORT RestCatalogRequest * GetSetPartitionDefaultIdCatalogRequest(Oid relationId, int specId);
126127
extern PGDLLEXPORT RestCatalogRequest * GetRemoveSnapshotCatalogRequest(List *removedSnapshotIds, Oid relationId);
127128

128-
/* ProcessUtility handler: protects extension-owned catalog servers */
129+
/* ProcessUtility handlers */
129130
extern PGDLLEXPORT bool ProtectExtensionCatalogServersHandler(ProcessUtilityParams *processUtilityParams, void *arg);
131+
extern PGDLLEXPORT bool ScrubIcebergUserMappingHandler(ProcessUtilityParams *processUtilityParams, void *arg);

pg_lake_iceberg/src/init.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ _PG_init(void)
331331
AvroInit();
332332

333333
RegisterUtilityStatementHandler(ProtectExtensionCatalogServersHandler, NULL);
334+
RegisterUtilityStatementHandler(ScrubIcebergUserMappingHandler, NULL);
334335
}
335336

336337

pg_lake_iceberg/src/rest_catalog/rest_catalog.c

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@
4040
#include "utils/syscache.h"
4141
#include "utils/timestamp.h"
4242

43+
#include "nodes/parsenodes.h"
44+
4345
#include "pg_extension_base/base_workers.h"
46+
#include "pg_lake/ddl/utility_hook.h"
4447
#include "pg_lake/http/http_client.h"
4548
#include "pg_lake/iceberg/api/table_schema.h"
4649
#include "pg_lake/iceberg/catalog.h"
@@ -111,6 +114,7 @@ static const char *iceberg_catalog_user_mapping_options[] = {
111114
};
112115

113116

117+
114118
static bool
115119
is_valid_option_in_list(const char *keyword, const char *const *options)
116120
{
@@ -221,6 +225,110 @@ IsIcebergCatalogServer(const char *serverName)
221225
}
222226

223227

228+
/*
229+
* ScrubUserMappingSecrets overwrites secret option values in the query
230+
* string in-place with asterisks.
231+
*
232+
* In-place mutation is essential: pg_stat_statements captures the queryString
233+
* pointer before calling prev_ProcessUtility, then stores it after the call
234+
* returns. A copy with pstrdup would be invisible to pg_stat_statements
235+
* because its local pointer still references the original memory. By
236+
* overwriting the original buffer, every holder of that pointer — including
237+
* pg_stat_statements — sees the scrubbed version.
238+
*
239+
* The actual DDL execution is unaffected because CREATE/ALTER USER MAPPING
240+
* reads option values from the parse tree (DefElem nodes), not from
241+
* queryString.
242+
*/
243+
static void
244+
ScrubUserMappingSecrets(const char *queryString, List *options)
245+
{
246+
const char *secret_options[] = {"client_id", "client_secret", NULL};
247+
ListCell *lc;
248+
249+
foreach(lc, options)
250+
{
251+
DefElem *def = (DefElem *) lfirst(lc);
252+
253+
if (!is_valid_option_in_list(def->defname, secret_options))
254+
continue;
255+
256+
if (def->location < 0)
257+
continue;
258+
259+
char *p = (char *) queryString + def->location;
260+
261+
/* skip past the key name */
262+
while (*p && !isspace((unsigned char) *p) && *p != '\'')
263+
p++;
264+
265+
/* skip whitespace between key and opening quote */
266+
while (*p && *p != '\'')
267+
p++;
268+
269+
if (*p != '\'')
270+
continue;
271+
272+
p++; /* skip opening quote */
273+
274+
/* overwrite value characters with '*', handling '' escapes */
275+
while (*p && *p != '\'')
276+
*p++ = '*';
277+
while (*(p + 1) == '\'')
278+
{
279+
*p++ = '*'; /* first quote of '' pair */
280+
*p++ = '*'; /* second quote of '' pair */
281+
while (*p && *p != '\'')
282+
*p++ = '*';
283+
}
284+
}
285+
}
286+
287+
288+
/*
289+
* ScrubIcebergUserMappingHandler is a ProcessUtility handler registered via
290+
* pg_lake_engine's RegisterUtilityStatementHandler. When it detects a
291+
* CREATE/ALTER USER MAPPING targeting an iceberg_catalog server, it scrubs
292+
* secret values in the queryString in-place and returns false so normal
293+
* processing continues.
294+
*/
295+
bool
296+
ScrubIcebergUserMappingHandler(ProcessUtilityParams *processUtilityParams,
297+
void *arg)
298+
{
299+
Node *parsetree = processUtilityParams->plannedStmt->utilityStmt;
300+
const char *serverName = NULL;
301+
List *options = NIL;
302+
303+
if (IsA(parsetree, CreateUserMappingStmt))
304+
{
305+
CreateUserMappingStmt *stmt = (CreateUserMappingStmt *) parsetree;
306+
307+
serverName = stmt->servername;
308+
options = stmt->options;
309+
}
310+
else if (IsA(parsetree, AlterUserMappingStmt))
311+
{
312+
AlterUserMappingStmt *stmt = (AlterUserMappingStmt *) parsetree;
313+
314+
serverName = stmt->servername;
315+
options = stmt->options;
316+
}
317+
else
318+
return false;
319+
320+
if (serverName == NULL || options == NIL)
321+
return false;
322+
323+
if (!IsIcebergCatalogServer(serverName))
324+
return false;
325+
326+
ScrubUserMappingSecrets(processUtilityParams->queryString, options);
327+
328+
return false;
329+
}
330+
331+
224332
/*
225333
* ProtectExtensionCatalogServersHandler guards the extension-owned
226334
* iceberg_catalog servers (postgres, object_store, rest) against

pg_lake_table/tests/pytests/test_iceberg_catalog_server.py

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -981,3 +981,205 @@ def test_allow_rename_user_created_server(superuser_conn, extension):
981981
"ALTER SERVER user_rename_srv RENAME TO user_renamed_srv", superuser_conn
982982
)
983983
superuser_conn.rollback()
984+
985+
986+
# ── Query string scrubbing for user mapping DDL ───────────────────────────
987+
988+
989+
def test_scrub_create_user_mapping_in_pg_stat_statements(
990+
installcheck, superuser_conn, extension
991+
):
992+
"""CREATE USER MAPPING secrets should be scrubbed in pg_stat_statements."""
993+
if installcheck:
994+
return
995+
996+
run_command("CREATE EXTENSION IF NOT EXISTS pg_stat_statements", superuser_conn)
997+
run_command("SELECT pg_stat_statements_reset()", superuser_conn)
998+
superuser_conn.commit()
999+
1000+
run_command(
1001+
"""
1002+
CREATE SERVER test_scrub_srv TYPE 'rest'
1003+
FOREIGN DATA WRAPPER iceberg_catalog
1004+
OPTIONS (rest_endpoint 'http://localhost:8181')
1005+
""",
1006+
superuser_conn,
1007+
)
1008+
1009+
superuser_conn.commit()
1010+
1011+
run_command(
1012+
"""
1013+
CREATE USER MAPPING FOR CURRENT_USER SERVER test_scrub_srv
1014+
OPTIONS (client_id 'secret_id_value', client_secret 'secret_key_value')
1015+
""",
1016+
superuser_conn,
1017+
)
1018+
superuser_conn.commit()
1019+
1020+
result = run_query(
1021+
"""
1022+
SELECT query FROM pg_stat_statements
1023+
WHERE query ILIKE '%CREATE USER MAPPING%test_scrub_srv%'
1024+
""",
1025+
superuser_conn,
1026+
)
1027+
1028+
assert len(result) >= 1
1029+
query_text = result[0]["query"]
1030+
assert "secret_id_value" not in query_text
1031+
assert "secret_key_value" not in query_text
1032+
assert "'***" in query_text
1033+
1034+
run_command(
1035+
"DROP USER MAPPING FOR CURRENT_USER SERVER test_scrub_srv", superuser_conn
1036+
)
1037+
run_command("DROP SERVER test_scrub_srv", superuser_conn)
1038+
superuser_conn.commit()
1039+
1040+
1041+
def test_scrub_alter_user_mapping_in_pg_stat_statements(
1042+
installcheck, superuser_conn, extension
1043+
):
1044+
"""ALTER USER MAPPING secrets should also be scrubbed in pg_stat_statements."""
1045+
if installcheck:
1046+
return
1047+
1048+
run_command("CREATE EXTENSION IF NOT EXISTS pg_stat_statements", superuser_conn)
1049+
run_command("SELECT pg_stat_statements_reset()", superuser_conn)
1050+
superuser_conn.commit()
1051+
1052+
run_command(
1053+
"""
1054+
CREATE SERVER test_scrub_alter_srv TYPE 'rest'
1055+
FOREIGN DATA WRAPPER iceberg_catalog
1056+
OPTIONS (rest_endpoint 'http://localhost:8181')
1057+
""",
1058+
superuser_conn,
1059+
)
1060+
run_command(
1061+
"""
1062+
CREATE USER MAPPING FOR CURRENT_USER SERVER test_scrub_alter_srv
1063+
OPTIONS (client_id 'old_id', client_secret 'old_secret')
1064+
""",
1065+
superuser_conn,
1066+
)
1067+
superuser_conn.commit()
1068+
1069+
run_command("SELECT pg_stat_statements_reset()", superuser_conn)
1070+
superuser_conn.commit()
1071+
1072+
run_command(
1073+
"""
1074+
ALTER USER MAPPING FOR CURRENT_USER SERVER test_scrub_alter_srv
1075+
OPTIONS (SET client_id 'new_secret_id', SET client_secret 'new_secret_key')
1076+
""",
1077+
superuser_conn,
1078+
)
1079+
superuser_conn.commit()
1080+
1081+
result = run_query(
1082+
"""
1083+
SELECT query FROM pg_stat_statements
1084+
WHERE query ILIKE '%ALTER USER MAPPING%test_scrub_alter_srv%'
1085+
""",
1086+
superuser_conn,
1087+
)
1088+
1089+
assert len(result) >= 1
1090+
query_text = result[0]["query"]
1091+
assert "new_secret_id" not in query_text
1092+
assert "new_secret_key" not in query_text
1093+
assert "'***" in query_text
1094+
1095+
run_command(
1096+
"DROP USER MAPPING FOR CURRENT_USER SERVER test_scrub_alter_srv",
1097+
superuser_conn,
1098+
)
1099+
run_command("DROP SERVER test_scrub_alter_srv", superuser_conn)
1100+
superuser_conn.commit()
1101+
1102+
1103+
def test_scrub_preserves_actual_credentials(superuser_conn, extension):
1104+
"""Scrubbing the query string should not affect the stored credentials."""
1105+
run_command(
1106+
"""
1107+
CREATE SERVER test_scrub_creds_srv TYPE 'rest'
1108+
FOREIGN DATA WRAPPER iceberg_catalog
1109+
OPTIONS (rest_endpoint 'http://localhost:8181')
1110+
""",
1111+
superuser_conn,
1112+
)
1113+
run_command(
1114+
"""
1115+
CREATE USER MAPPING FOR CURRENT_USER SERVER test_scrub_creds_srv
1116+
OPTIONS (client_id 'real_id', client_secret 'real_secret')
1117+
""",
1118+
superuser_conn,
1119+
)
1120+
1121+
result = run_query(
1122+
"""
1123+
SELECT umoptions FROM pg_user_mapping um
1124+
JOIN pg_foreign_server fs ON um.umserver = fs.oid
1125+
WHERE fs.srvname = 'test_scrub_creds_srv'
1126+
""",
1127+
superuser_conn,
1128+
)
1129+
1130+
assert len(result) == 1
1131+
opts = result[0]["umoptions"]
1132+
assert "client_id=real_id" in opts
1133+
assert "client_secret=real_secret" in opts
1134+
1135+
superuser_conn.rollback()
1136+
1137+
1138+
def test_scrub_leaves_scope_visible(installcheck, superuser_conn, extension):
1139+
"""scope is not a secret — it should remain visible after scrubbing."""
1140+
if installcheck:
1141+
return
1142+
1143+
run_command("CREATE EXTENSION IF NOT EXISTS pg_stat_statements", superuser_conn)
1144+
run_command("SELECT pg_stat_statements_reset()", superuser_conn)
1145+
superuser_conn.commit()
1146+
1147+
run_command(
1148+
"""
1149+
CREATE SERVER test_scrub_scope_srv TYPE 'rest'
1150+
FOREIGN DATA WRAPPER iceberg_catalog
1151+
OPTIONS (rest_endpoint 'http://localhost:8181')
1152+
""",
1153+
superuser_conn,
1154+
)
1155+
superuser_conn.commit()
1156+
1157+
run_command(
1158+
"""
1159+
CREATE USER MAPPING FOR CURRENT_USER SERVER test_scrub_scope_srv
1160+
OPTIONS (client_id 'id123', client_secret 'secret456', scope 'PRINCIPAL_ROLE:ALL')
1161+
""",
1162+
superuser_conn,
1163+
)
1164+
superuser_conn.commit()
1165+
1166+
result = run_query(
1167+
"""
1168+
SELECT query FROM pg_stat_statements
1169+
WHERE query ILIKE '%CREATE USER MAPPING%test_scrub_scope_srv%'
1170+
""",
1171+
superuser_conn,
1172+
)
1173+
1174+
assert len(result) >= 1
1175+
query_text = result[0]["query"]
1176+
assert "id123" not in query_text
1177+
assert "secret456" not in query_text
1178+
assert "PRINCIPAL_ROLE:ALL" in query_text
1179+
1180+
run_command(
1181+
"DROP USER MAPPING FOR CURRENT_USER SERVER test_scrub_scope_srv",
1182+
superuser_conn,
1183+
)
1184+
run_command("DROP SERVER test_scrub_scope_srv", superuser_conn)
1185+
superuser_conn.commit()

0 commit comments

Comments
 (0)