Skip to content

Commit 7c67e61

Browse files
committed
Address review part 2
Token cache improvements: - Allocate the token cache hash table and token strings in a dedicated RestTokenCacheCtx memory context (under TopMemoryContext) instead of directly in TopMemoryContext, keeping the cache memory isolated. - On a 419 (token expired) retry, invalidate only the affected server cached token instead of clearing the entire cache. The server name is passed through SendRequestToRestCatalog and stored in a file-scoped static so the retry callback can target the right entry. Transaction commit batching fix: - Group batches by (host, catalogName) instead of host alone. The transaction commit URL includes the catalog prefix, so two servers pointing to the same host but with different catalog_name values need separate commits. Previously, all tables on the same host were batched together using only the first table catalogName. Other improvements: - Replace the ereport(ERROR) FDW name check in GetRestCatalogConnectionFromServer with an Assert, since the catalog option validator already ensures only iceberg_catalog servers are accepted. - Add errhint to credential/host error messages in FetchRestCatalogAccessToken, pointing users to both the server option and the corresponding GUC. - Add test_precreated_rest_server test. Signed-off-by: sfc-gh-npuka <naisila.puka@snowflake.com>
1 parent e4f7985 commit 7c67e61

4 files changed

Lines changed: 87 additions & 44 deletions

File tree

pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ extern PGDLLEXPORT void ReportHTTPError(HttpResult httpResult, int level);
114114
extern PGDLLEXPORT List *PostHeadersWithAuth(RestCatalogConnectionInfo * conn);
115115
extern PGDLLEXPORT List *DeleteHeadersWithAuth(RestCatalogConnectionInfo * conn);
116116
extern PGDLLEXPORT bool ShouldRetryRequestToRestCatalog(long status, int maxRetry, int retryNo);
117-
extern PGDLLEXPORT HttpResult SendRequestToRestCatalog(HttpMethod method, const char *url, const char *body, List *headers);
117+
extern PGDLLEXPORT HttpResult SendRequestToRestCatalog(HttpMethod method, const char *url, const char *body, List *headers, const char *serverName);
118118
extern PGDLLEXPORT RestCatalogRequest * GetAddSnapshotCatalogRequest(IcebergSnapshot * newSnapshot, Oid relationId);
119119
extern PGDLLEXPORT RestCatalogRequest * GetAddSchemaCatalogRequest(Oid relationId, DataFileSchema * dataFileSchema);
120120
extern PGDLLEXPORT RestCatalogRequest * GetSetCurrentSchemaCatalogRequest(Oid relationId, int32_t schemaId);

pg_lake_iceberg/src/rest_catalog/rest_catalog.c

Lines changed: 60 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ typedef struct RestCatalogTokenCacheEntry
7272
} RestCatalogTokenCacheEntry;
7373

7474
static HTAB *RestCatalogTokenCache = NULL;
75+
static MemoryContext RestTokenCacheCtx = NULL;
76+
77+
/*
78+
* Tracks which server's request is in flight so the retry callback can
79+
* invalidate only the right token cache entry.
80+
*/
81+
static const char *CurrentRetryServerName = NULL;
7582

7683
static char *GetRestCatalogAccessToken(RestCatalogConnectionInfo * conn, bool forceRefreshToken);
7784
static void FetchRestCatalogAccessToken(RestCatalogConnectionInfo * conn, char **accessToken, int *expiresIn);
@@ -329,11 +336,7 @@ GetRestCatalogConnectionFromServer(const char *serverName)
329336
ForeignServer *server = GetForeignServerByName(serverName, false);
330337
ForeignDataWrapper *fdw = GetForeignDataWrapper(server->fdwid);
331338

332-
if (strcmp(fdw->fdwname, ICEBERG_CATALOG_FDW_NAME) != 0)
333-
ereport(ERROR,
334-
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
335-
errmsg("server \"%s\" does not use the iceberg_catalog foreign data wrapper",
336-
serverName)));
339+
Assert(strcmp(fdw->fdwname, ICEBERG_CATALOG_FDW_NAME) == 0);
337340

338341
RestCatalogConnectionInfo *conn = palloc0(sizeof(RestCatalogConnectionInfo));
339342

@@ -462,7 +465,8 @@ StartStageRestCatalogIcebergTableCreate(Oid relationId)
462465
headers = lappend(headers, vendedCreds);
463466
}
464467

465-
HttpResult httpResult = SendRequestToRestCatalog(HTTP_POST, postUrl, body->data, headers);
468+
HttpResult httpResult = SendRequestToRestCatalog(HTTP_POST, postUrl, body->data,
469+
headers, conn->serverName);
466470

467471
if (httpResult.status != 200)
468472
{
@@ -596,7 +600,9 @@ RegisterNamespaceToRestCatalog(RestCatalogConnectionInfo * conn, const char *cat
596600
psprintf(REST_CATALOG_NAMESPACE_NAME,
597601
conn->host, URLEncodePath(catalogName),
598602
URLEncodePath(namespaceName));
599-
HttpResult httpResult = SendRequestToRestCatalog(HTTP_GET, getUrl, NULL, GetHeadersWithAuth(conn));
603+
HttpResult httpResult = SendRequestToRestCatalog(HTTP_GET, getUrl, NULL,
604+
GetHeadersWithAuth(conn),
605+
conn->serverName);
600606

601607
switch (httpResult.status)
602608
{
@@ -686,8 +692,9 @@ ErrorIfRestNamespaceDoesNotExist(RestCatalogConnectionInfo * conn, const char *c
686692
psprintf(REST_CATALOG_NAMESPACE_NAME,
687693
conn->host, URLEncodePath(catalogName),
688694
URLEncodePath(namespaceName));
689-
HttpResult httpResult = SendRequestToRestCatalog(HTTP_GET, getUrl, NULL, GetHeadersWithAuth(conn));
690-
695+
HttpResult httpResult = SendRequestToRestCatalog(HTTP_GET, getUrl, NULL,
696+
GetHeadersWithAuth(conn),
697+
conn->serverName);
691698

692699
/* namespace not found */
693700
if (httpResult.status == 404)
@@ -736,7 +743,8 @@ GetMetadataLocationFromRestCatalog(RestCatalogConnectionInfo * conn, const char
736743
conn->host, URLEncodePath(restCatalogName), URLEncodePath(namespaceName), URLEncodePath(relationName));
737744

738745
List *headers = GetHeadersWithAuth(conn);
739-
HttpResult hr = SendRequestToRestCatalog(HTTP_GET, getUrl, NULL, headers);
746+
HttpResult hr = SendRequestToRestCatalog(HTTP_GET, getUrl, NULL, headers,
747+
conn->serverName);
740748

741749
if (hr.status != 200)
742750
{
@@ -784,7 +792,9 @@ CreateNamespaceOnRestCatalog(RestCatalogConnectionInfo * conn, const char *catal
784792
psprintf(REST_CATALOG_NAMESPACE, conn->host,
785793
URLEncodePath(catalogName));
786794

787-
HttpResult httpResult = SendRequestToRestCatalog(HTTP_POST, postUrl, body.data, PostHeadersWithAuth(conn));
795+
HttpResult httpResult = SendRequestToRestCatalog(HTTP_POST, postUrl, body.data,
796+
PostHeadersWithAuth(conn),
797+
conn->serverName);
788798

789799
if (httpResult.status != 200)
790800
{
@@ -889,12 +899,16 @@ InitTokenCacheIfNeeded(void)
889899
if (RestCatalogTokenCache != NULL)
890900
return;
891901

902+
RestTokenCacheCtx = AllocSetContextCreate(TopMemoryContext,
903+
"RestTokenCacheCtx",
904+
ALLOCSET_DEFAULT_SIZES);
905+
892906
HASHCTL ctl;
893907

894908
memset(&ctl, 0, sizeof(ctl));
895909
ctl.keysize = TOKEN_CACHE_KEY_LEN;
896910
ctl.entrysize = sizeof(RestCatalogTokenCacheEntry);
897-
ctl.hcxt = TopMemoryContext;
911+
ctl.hcxt = RestTokenCacheCtx;
898912

899913
RestCatalogTokenCache = hash_create("REST Catalog Token Cache",
900914
8, &ctl,
@@ -943,7 +957,7 @@ GetRestCatalogAccessToken(RestCatalogConnectionInfo * conn, bool forceRefreshTok
943957

944958
FetchRestCatalogAccessToken(conn, &accessToken, &expiresIn);
945959

946-
entry->accessToken = MemoryContextStrdup(TopMemoryContext, accessToken);
960+
entry->accessToken = MemoryContextStrdup(RestTokenCacheCtx, accessToken);
947961
entry->accessTokenExpiry = now + (int64_t) expiresIn * 1000000; /* expiresIn is in
948962
* seconds */
949963
}
@@ -961,9 +975,15 @@ static void
961975
FetchRestCatalogAccessToken(RestCatalogConnectionInfo * conn, char **accessToken, int *expiresIn)
962976
{
963977
if (!conn->host || !*conn->host)
964-
ereport(ERROR, (errmsg("REST catalog host is not configured")));
978+
ereport(ERROR,
979+
(errmsg("REST catalog host is not configured"),
980+
errhint("Set the \"rest_endpoint\" option on the server "
981+
"or the pg_lake_iceberg.rest_catalog_host GUC.")));
965982
if (!conn->clientSecret || !*conn->clientSecret)
966-
ereport(ERROR, (errmsg("REST catalog client_secret is not configured")));
983+
ereport(ERROR,
984+
(errmsg("REST catalog client_secret is not configured"),
985+
errhint("Set the \"client_secret\" option on the server "
986+
"or the pg_lake_iceberg.rest_catalog_client_secret GUC.")));
967987

968988
char *accessTokenUrl = conn->oauthHostPath;
969989

@@ -991,7 +1011,10 @@ FetchRestCatalogAccessToken(RestCatalogConnectionInfo * conn, char **accessToken
9911011
else
9921012
{
9931013
if (!conn->clientId || !*conn->clientId)
994-
ereport(ERROR, (errmsg("REST catalog client_id is not configured")));
1014+
ereport(ERROR,
1015+
(errmsg("REST catalog client_id is not configured"),
1016+
errhint("Set the \"client_id\" option on the server "
1017+
"or the pg_lake_iceberg.rest_catalog_client_id GUC.")));
9951018

9961019
/* Build Authorization: Basic <base64(clientId:clientSecret)> */
9971020
char *encodedAuth = EncodeBasicAuth(conn->clientId, conn->clientSecret);
@@ -1003,7 +1026,9 @@ FetchRestCatalogAccessToken(RestCatalogConnectionInfo * conn, char **accessToken
10031026
headers = lappend(headers, "Content-Type: application/x-www-form-urlencoded");
10041027

10051028
/* POST */
1006-
HttpResult httpResponse = SendRequestToRestCatalog(HTTP_POST, accessTokenUrl, body.data, headers);
1029+
HttpResult httpResponse = SendRequestToRestCatalog(HTTP_POST, accessTokenUrl,
1030+
body.data, headers,
1031+
conn->serverName);
10071032

10081033
if (httpResponse.status != 200)
10091034
ereport(ERROR,
@@ -1458,14 +1483,24 @@ GetRemoveSnapshotCatalogRequest(List *removedSnapshotIds, Oid relationId)
14581483
/*
14591484
* SendRequestToRestCatalog sends an HTTP request to the rest catalog
14601485
* with retry logic for retriable errors, attempting up to MAX_HTTP_RETRY_FOR_REST_CATALOG
1461-
* times.
1486+
* times. The serverName is used by the retry callback to invalidate only the
1487+
* matching token cache entry on a 419 (token expired) response.
14621488
*/
14631489
HttpResult
1464-
SendRequestToRestCatalog(HttpMethod method, const char *url, const char *body, List *headers)
1490+
SendRequestToRestCatalog(HttpMethod method, const char *url, const char *body,
1491+
List *headers, const char *serverName)
14651492
{
14661493
const int MAX_HTTP_RETRY_FOR_REST_CATALOG = 3;
14671494

1468-
return SendHttpRequestWithRetry(method, url, body, headers, ShouldRetryRequestToRestCatalog, MAX_HTTP_RETRY_FOR_REST_CATALOG);
1495+
CurrentRetryServerName = serverName;
1496+
1497+
HttpResult result = SendHttpRequestWithRetry(method, url, body, headers,
1498+
ShouldRetryRequestToRestCatalog,
1499+
MAX_HTTP_RETRY_FOR_REST_CATALOG);
1500+
1501+
CurrentRetryServerName = NULL;
1502+
1503+
return result;
14691504
}
14701505

14711506

@@ -1508,26 +1543,15 @@ ShouldRetryRequestToRestCatalog(long status, int maxRetry, int retryNo)
15081543
return true;
15091544
}
15101545

1511-
/* token expired, retry after refreshing token */
1546+
/* token expired: invalidate only the affected server's cached token */
15121547
else if (status == TOKEN_EXPIRED_STATUS)
15131548
{
1514-
/*
1515-
* Invalidate all cached tokens so that the next request will fetch a
1516-
* fresh one. We clear the entire cache because the retry callback
1517-
* does not have access to a specific connection's info. This is safe
1518-
* because token expiry is rare and other connections will simply
1519-
* re-authenticate on their next request.
1520-
*/
1521-
if (RestCatalogTokenCache != NULL)
1549+
if (RestCatalogTokenCache != NULL && CurrentRetryServerName != NULL)
15221550
{
1523-
HASH_SEQ_STATUS seq;
1524-
RestCatalogTokenCacheEntry *entry;
1551+
char cacheKey[TOKEN_CACHE_KEY_LEN];
15251552

1526-
hash_seq_init(&seq, RestCatalogTokenCache);
1527-
while ((entry = hash_seq_search(&seq)) != NULL)
1528-
{
1529-
entry->accessTokenExpiry = 0;
1530-
}
1553+
strlcpy(cacheKey, CurrentRetryServerName, TOKEN_CACHE_KEY_LEN);
1554+
hash_search(RestCatalogTokenCache, cacheKey, HASH_REMOVE, NULL);
15311555
}
15321556
return true;
15331557
}

pg_lake_table/src/transaction/track_iceberg_metadata_changes.c

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,9 @@ PostAllRestCatalogRequests(void)
277277
{
278278
HttpResult httpResult =
279279
SendRequestToRestCatalog(HTTP_POST, requestPerTable->tableRestUrl,
280-
createTableRequest->body, PostHeadersWithAuth(requestPerTable->conn));
280+
createTableRequest->body,
281+
PostHeadersWithAuth(requestPerTable->conn),
282+
requestPerTable->conn->serverName);
281283

282284
if (httpResult.status != 200)
283285
{
@@ -293,7 +295,9 @@ PostAllRestCatalogRequests(void)
293295
{
294296
HttpResult httpResult =
295297
SendRequestToRestCatalog(HTTP_DELETE, requestPerTable->tableRestUrl,
296-
NULL, DeleteHeadersWithAuth(requestPerTable->conn));
298+
NULL,
299+
DeleteHeadersWithAuth(requestPerTable->conn),
300+
requestPerTable->conn->serverName);
297301

298302
if (httpResult.status != 204)
299303
{
@@ -359,9 +363,10 @@ PostAllRestCatalogRequests(void)
359363
}
360364

361365
/*
362-
* Group by server host and send one batch per server. For each table,
363-
* find if we already started a batch for its server host, otherwise
364-
* start a new one.
366+
* Group by (host, catalogName) and send one batch per group. The
367+
* transaction commit URL includes the catalog prefix, so tables under
368+
* different catalog names need separate commits even when the host is
369+
* the same.
365370
*/
366371
while (list_length(tablesWithModifications) > 0)
367372
{
@@ -385,7 +390,8 @@ PostAllRestCatalogRequests(void)
385390
{
386391
requestPerTable = (RestCatalogRequestPerTable *) lfirst(lc);
387392

388-
if (strcmp(requestPerTable->conn->host, batchHost) != 0)
393+
if (strcmp(requestPerTable->conn->host, batchHost) != 0 ||
394+
strcmp(requestPerTable->catalogName, catalogName) != 0)
389395
{
390396
remaining = lappend(remaining, requestPerTable);
391397
continue;
@@ -430,7 +436,9 @@ PostAllRestCatalogRequests(void)
430436
appendStringInfoChar(batchRequestBody, '}');
431437

432438
char *url = psprintf(REST_CATALOG_TRANSACTION_COMMIT, batchConn->host, catalogName);
433-
HttpResult httpResult = SendRequestToRestCatalog(HTTP_POST, url, batchRequestBody->data, PostHeadersWithAuth(batchConn));
439+
HttpResult httpResult = SendRequestToRestCatalog(HTTP_POST, url, batchRequestBody->data,
440+
PostHeadersWithAuth(batchConn),
441+
batchConn->serverName);
434442

435443
if (httpResult.status != 204)
436444
{

pg_lake_table/tests/pytests/test_iceberg_catalog_server.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,17 @@ def test_precreated_object_store_server(pg_conn, extension):
4646
assert result[0]["srvtype"] == "object_store"
4747

4848

49+
def test_precreated_rest_server(pg_conn, extension):
50+
"""A 'rest' server of TYPE 'rest' should be pre-created."""
51+
result = run_query(
52+
"SELECT srvname, srvtype FROM pg_foreign_server WHERE srvname = 'rest'",
53+
pg_conn,
54+
)
55+
assert len(result) == 1
56+
assert result[0]["srvname"] == "rest"
57+
assert result[0]["srvtype"] == "rest"
58+
59+
4960
# ── CREATE SERVER with valid options ───────────────────────────────────────
5061

5162

0 commit comments

Comments
 (0)