Skip to content

Commit 489323c

Browse files
committed
Some cleanup from last review addressing since I left everything in Cursor's hands
Signed-off-by: sfc-gh-npuka <naisila.puka@snowflake.com>
1 parent 150e385 commit 489323c

5 files changed

Lines changed: 65 additions & 43 deletions

File tree

pg_lake_engine/src/utils/catalog_type.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ HasReadOnlyOption(List *options)
110110
/*
111111
* IsCatalogOwnedByExtension returns true if the catalog name is one of
112112
* the reserved built-in names: 'rest', 'object_store', or 'postgres'.
113-
* Comparison is case-insensitive so that "Postgres", "REST", etc. are
114-
* also recognized as reserved.
113+
* Comparison is case-insensitive.
115114
*/
116115
bool
117116
IsCatalogOwnedByExtension(const char *catalog)

pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ extern PGDLLEXPORT char *GetMetadataLocationForRestCatalogForIcebergTable(Oid re
119119
extern PGDLLEXPORT void ReportHTTPError(HttpResult httpResult, int level);
120120
extern PGDLLEXPORT List *PostHeadersWithAuth(RestCatalogOptions * opts);
121121
extern PGDLLEXPORT List *DeleteHeadersWithAuth(RestCatalogOptions * opts);
122-
extern PGDLLEXPORT HttpResult SendRequestToRestCatalog(HttpMethod method, const char *url, const char *body, List *headers, RestCatalogOptions * opts);
122+
extern PGDLLEXPORT HttpResult SendRequestToRestCatalog(RestCatalogOptions * opts, HttpMethod method, const char *url, const char *body, List *headers);
123123
extern PGDLLEXPORT RestCatalogRequest * GetAddSnapshotCatalogRequest(IcebergSnapshot * newSnapshot, Oid relationId);
124124
extern PGDLLEXPORT RestCatalogRequest * GetAddSchemaCatalogRequest(Oid relationId, DataFileSchema * dataFileSchema);
125125
extern PGDLLEXPORT RestCatalogRequest * GetSetCurrentSchemaCatalogRequest(Oid relationId, int32_t schemaId);

pg_lake_iceberg/src/rest_catalog/rest_catalog.c

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ int RestCatalogAuthType = REST_CATALOG_AUTH_TYPE_OAUTH2;
6868
bool RestCatalogEnableVendedCredentials = true;
6969

7070
/*
71-
* Per-catalog token cache. Keyed by catalog.
71+
* Per-rest-catalog token cache. Keyed by catalog.
72+
* Should always be accessed via GetRestCatalogAccessToken()
7273
*/
7374
#define TOKEN_CACHE_KEY_LEN NAMEDATALEN
7475

@@ -82,6 +83,8 @@ typedef struct RestCatalogTokenCacheEntry
8283
static HTAB *RestCatalogTokenCache = NULL;
8384
static MemoryContext RestTokenCacheCtx = NULL;
8485

86+
/* end of per-catalog token cache variables */
87+
8588
static char *GetRestCatalogAccessToken(RestCatalogOptions * opts, bool forceRefreshToken);
8689
static void FetchRestCatalogAccessToken(RestCatalogOptions * opts, char **accessToken, int *expiresIn);
8790
static void CreateNamespaceOnRestCatalog(RestCatalogOptions * opts, const char *catalogName, const char *namespaceName);
@@ -723,8 +726,8 @@ StartStageRestCatalogIcebergTableCreate(Oid relationId)
723726
headers = lappend(headers, vendedCreds);
724727
}
725728

726-
HttpResult httpResult = SendRequestToRestCatalog(HTTP_POST, postUrl, body->data,
727-
headers, opts);
729+
HttpResult httpResult = SendRequestToRestCatalog(opts, HTTP_POST, postUrl, body->data,
730+
headers);
728731

729732
if (httpResult.status != 200)
730733
{
@@ -859,9 +862,8 @@ RegisterNamespaceToRestCatalog(RestCatalogOptions * opts, const char *catalogNam
859862
psprintf(REST_CATALOG_NAMESPACE_NAME,
860863
opts->host, URLEncodePath(catalogName),
861864
URLEncodePath(namespaceName));
862-
HttpResult httpResult = SendRequestToRestCatalog(HTTP_GET, getUrl, NULL,
863-
GetHeadersWithAuth(opts),
864-
opts);
865+
HttpResult httpResult = SendRequestToRestCatalog(opts, HTTP_GET, getUrl, NULL,
866+
GetHeadersWithAuth(opts));
865867

866868
switch (httpResult.status)
867869
{
@@ -951,9 +953,8 @@ ErrorIfRestNamespaceDoesNotExist(RestCatalogOptions * opts, const char *catalogN
951953
psprintf(REST_CATALOG_NAMESPACE_NAME,
952954
opts->host, URLEncodePath(catalogName),
953955
URLEncodePath(namespaceName));
954-
HttpResult httpResult = SendRequestToRestCatalog(HTTP_GET, getUrl, NULL,
955-
GetHeadersWithAuth(opts),
956-
opts);
956+
HttpResult httpResult = SendRequestToRestCatalog(opts, HTTP_GET, getUrl, NULL,
957+
GetHeadersWithAuth(opts));
957958

958959
/* namespace not found */
959960
if (httpResult.status == 404)
@@ -1002,8 +1003,7 @@ GetMetadataLocationFromRestCatalog(RestCatalogOptions * opts, const char *restCa
10021003
opts->host, URLEncodePath(restCatalogName), URLEncodePath(namespaceName), URLEncodePath(relationName));
10031004

10041005
List *headers = GetHeadersWithAuth(opts);
1005-
HttpResult hr = SendRequestToRestCatalog(HTTP_GET, getUrl, NULL, headers,
1006-
opts);
1006+
HttpResult hr = SendRequestToRestCatalog(opts, HTTP_GET, getUrl, NULL, headers);
10071007

10081008
if (hr.status != 200)
10091009
{
@@ -1051,9 +1051,8 @@ CreateNamespaceOnRestCatalog(RestCatalogOptions * opts, const char *catalogName,
10511051
psprintf(REST_CATALOG_NAMESPACE, opts->host,
10521052
URLEncodePath(catalogName));
10531053

1054-
HttpResult httpResult = SendRequestToRestCatalog(HTTP_POST, postUrl, body.data,
1055-
PostHeadersWithAuth(opts),
1056-
opts);
1054+
HttpResult httpResult = SendRequestToRestCatalog(opts, HTTP_POST, postUrl, body.data,
1055+
PostHeadersWithAuth(opts));
10571056

10581057
if (httpResult.status != 200)
10591058
{
@@ -1155,6 +1154,12 @@ BuildTokenCacheKey(char *key, const RestCatalogOptions * opts)
11551154
* Any ALTER/DROP SERVER blows away the entire token cache so stale
11561155
* credentials are never reused. The cache is rebuilt lazily on the
11571156
* next token lookup.
1157+
*
1158+
* We ignore hashvalue and reset the whole cache rather than selectively
1159+
* invalidating a single server's entry (as postgres_fdw does). With a
1160+
* handful of servers and infrequent ALTER SERVER, the cost of a few
1161+
* extra OAuth round-trips is negligible compared to the complexity of
1162+
* keying the cache by OID for selective invalidation.
11581163
*/
11591164
static void
11601165
InvalidateRestTokenCache(Datum arg, int cacheid, uint32 hashvalue)
@@ -1169,6 +1174,11 @@ InvalidateRestTokenCache(Datum arg, int cacheid, uint32 hashvalue)
11691174

11701175
/*
11711176
* Initialize the per-catalog token cache hash table if needed.
1177+
*
1178+
* TokenCacheCallbackRegistered is separate from RestCatalogTokenCache because
1179+
* the callback must be registered exactly once per backend lifetime
1180+
* (CacheRegisterSyscacheCallback appends to a fixed-size array), while
1181+
* RestCatalogTokenCache is reset to NULL on every invalidation.
11721182
*/
11731183
static bool TokenCacheCallbackRegistered = false;
11741184

@@ -1318,10 +1328,14 @@ FetchRestCatalogAccessToken(RestCatalogOptions * opts, char **accessToken, int *
13181328

13191329
headers = lappend(headers, "Content-Type: application/x-www-form-urlencoded");
13201330

1321-
/* POST — pass NULL opts to skip 419 token refresh (avoids recursion) */
1322-
HttpResult httpResponse = SendRequestToRestCatalog(HTTP_POST, accessTokenUrl,
1323-
body.data, headers,
1324-
NULL);
1331+
/*
1332+
* Pass NULL opts so SendRequestToRestCatalog skips the 419 token-refresh
1333+
* retry branch. Otherwise a 419 here would call
1334+
* GetRestCatalogAccessToken -> FetchRestCatalogAccessToken ->
1335+
* SendRequestToRestCatalog in an infinite loop.
1336+
*/
1337+
HttpResult httpResponse = SendRequestToRestCatalog(NULL, HTTP_POST, accessTokenUrl,
1338+
body.data, headers);
13251339

13261340
if (httpResponse.status != 200)
13271341
ereport(ERROR,

pg_lake_table/src/transaction/track_iceberg_metadata_changes.c

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,9 @@ PostAllRestCatalogRequests(void)
292292
if (createTableRequest != NULL)
293293
{
294294
HttpResult httpResult =
295-
SendRequestToRestCatalog(HTTP_POST, requestPerTable->tableRestUrl,
296-
createTableRequest->body,
297-
PostHeadersWithAuth(PgLakeXactRestCatalog->catalogOpts),
298-
PgLakeXactRestCatalog->catalogOpts);
295+
SendRequestToRestCatalog(PgLakeXactRestCatalog->catalogOpts, HTTP_POST,
296+
requestPerTable->tableRestUrl, createTableRequest->body,
297+
PostHeadersWithAuth(PgLakeXactRestCatalog->catalogOpts));
299298

300299
if (httpResult.status != 200)
301300
{
@@ -310,10 +309,9 @@ PostAllRestCatalogRequests(void)
310309
else if (dropTableRequest != NULL)
311310
{
312311
HttpResult httpResult =
313-
SendRequestToRestCatalog(HTTP_DELETE, requestPerTable->tableRestUrl,
314-
NULL,
315-
DeleteHeadersWithAuth(PgLakeXactRestCatalog->catalogOpts),
316-
PgLakeXactRestCatalog->catalogOpts);
312+
SendRequestToRestCatalog(PgLakeXactRestCatalog->catalogOpts, HTTP_DELETE,
313+
requestPerTable->tableRestUrl, NULL,
314+
DeleteHeadersWithAuth(PgLakeXactRestCatalog->catalogOpts));
317315

318316
if (httpResult.status != 204)
319317
{
@@ -433,9 +431,9 @@ PostAllRestCatalogRequests(void)
433431

434432
char *url = psprintf(REST_CATALOG_TRANSACTION_COMMIT,
435433
PgLakeXactRestCatalog->catalogOpts->host, catalogName);
436-
HttpResult httpResult = SendRequestToRestCatalog(HTTP_POST, url, batchRequestBody->data,
437-
PostHeadersWithAuth(PgLakeXactRestCatalog->catalogOpts),
438-
PgLakeXactRestCatalog->catalogOpts);
434+
HttpResult httpResult = SendRequestToRestCatalog(PgLakeXactRestCatalog->catalogOpts, HTTP_POST,
435+
url, batchRequestBody->data,
436+
PostHeadersWithAuth(PgLakeXactRestCatalog->catalogOpts));
439437

440438
if (httpResult.status != 204)
441439
{

pg_lake_table/tests/pytests/test_modify_iceberg_rest_table.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -923,7 +923,7 @@ def test_token_cache_reuses_token_across_catalog_ops(
923923
924924
Uses pg_lake_iceberg.http_client_trace_traffic to observe actual
925925
HTTP traffic: each token fetch shows up as a POST to .../oauth/tokens
926-
in the connection notices.
926+
in the connection notices. Does 3 back-to-back inserts.
927927
"""
928928
if installcheck:
929929
return
@@ -935,18 +935,17 @@ def test_token_cache_reuses_token_across_catalog_ops(
935935
pg_conn.commit()
936936

937937
run_command(
938-
f"CREATE TABLE {SCHEMA_NAME}.{TABLE_NAME} (id bigint, value text) "
939-
f"USING iceberg WITH (catalog='rest')",
938+
"SET pg_lake_iceberg.http_client_trace_traffic TO on",
940939
pg_conn,
941940
)
942-
pg_conn.commit()
941+
pg_conn.notices.clear()
943942

944943
run_command(
945-
"SET pg_lake_iceberg.http_client_trace_traffic TO on",
944+
f"CREATE TABLE {SCHEMA_NAME}.{TABLE_NAME} (id bigint, value text) "
945+
f"USING iceberg WITH (catalog='rest')",
946946
pg_conn,
947947
)
948-
949-
pg_conn.notices.clear()
948+
pg_conn.commit()
950949

951950
for i in range(3):
952951
run_command(
@@ -958,8 +957,8 @@ def test_token_cache_reuses_token_across_catalog_ops(
958957
token_fetches = sum(
959958
1 for n in pg_conn.notices if "oauth/tokens" in n and "POST" in n
960959
)
961-
assert token_fetches <= 1, (
962-
f"Expected at most 1 OAuth token fetch (cached), got {token_fetches}. "
960+
assert token_fetches == 1, (
961+
f"Expected exactly 1 OAuth token fetch (cached), got {token_fetches}. "
963962
f"Notices:\n" + "\n".join(pg_conn.notices)
964963
)
965964

@@ -987,6 +986,8 @@ def test_alter_server_credentials_invalidates_token_cache(
987986
next catalog operation re-fetches it. We verify this by enabling
988987
HTTP traffic tracing and checking that a POST to .../oauth/tokens
989988
appears after the ALTER SERVER (proving the cache was invalidated).
989+
Test that cache is invalidated on bogus credentials (1 fetch, commit fails),
990+
then cache is invalidated again on restored credentials (1 fetch, commit succeeds).
990991
"""
991992
if installcheck:
992993
return
@@ -1078,8 +1079,8 @@ def test_alter_server_credentials_invalidates_token_cache(
10781079
"Expected COMMIT to fail after ALTER SERVER set bogus client_id "
10791080
"(cache should have been invalidated, forcing re-auth with bad creds)"
10801081
)
1081-
assert post_alter_fetches >= 1, (
1082-
f"Expected token re-fetch after ALTER SERVER (cache invalidated), "
1082+
assert post_alter_fetches == 1, (
1083+
f"Expected exactly 1 token re-fetch after ALTER SERVER (cache invalidated), "
10831084
f"got {post_alter_fetches}. Notices ({len(post_alter_notices)}):\n"
10841085
+ "\n".join(post_alter_notices)
10851086
)
@@ -1090,12 +1091,22 @@ def test_alter_server_credentials_invalidates_token_cache(
10901091
)
10911092
superuser_conn.commit()
10921093

1094+
superuser_conn.notices.clear()
1095+
10931096
run_command(
10941097
f"INSERT INTO {SCHEMA_NAME}.{TABLE_NAME} VALUES (4)",
10951098
superuser_conn,
10961099
)
10971100
superuser_conn.commit()
10981101

1102+
restore_fetches = sum(
1103+
1 for n in superuser_conn.notices if "oauth/tokens" in n and "POST" in n
1104+
)
1105+
assert restore_fetches == 1, (
1106+
f"Expected exactly 1 token re-fetch after restoring credentials, "
1107+
f"got {restore_fetches}. Notices:\n" + "\n".join(superuser_conn.notices)
1108+
)
1109+
10991110
results = run_query(
11001111
f"SELECT count(*) FROM {SCHEMA_NAME}.{TABLE_NAME}", superuser_conn
11011112
)

0 commit comments

Comments
 (0)