Skip to content

Commit 0b63f97

Browse files
committed
Address Onder's review
REST catalog options and retry mechanism: - Rename RestCatalogConnectionInfo to RestCatalogOptions throughout - Eliminate CurrentRetryServerName static; pass opts directly through HttpRetryFn callback (void *context + List *headers) so the 419 token-expired handler can force-refresh and patch the Authorization header in-place - Fix double-free in GetRestCatalogAccessToken: null out entry fields before calling FetchRestCatalogAccessToken Transaction-scoped state: - Reject transactions that touch tables from different REST catalog servers - Replace per-table rest catalog opts deep-copy with a single PgLakeXactRestCatalogOpts static, deep-copied into TopTransactionContext on first use - This avoids syscache lookups at XACT_EVENT_COMMIT time, which are forbidden (AssertCouldGetRelation fires during TRANS_COMMIT state) Server configuration enforcement: - Require TYPE 'rest' on CREATE SERVER ... FOREIGN DATA WRAPPER iceberg_catalog (reject NULL or non-rest types) - Make FDW option names and auth type values case-insensitive (pg_strcasecmp), while keeping server names case-sensitive - Make reserved catalog name checks (postgres, object_store, rest) case-insensitive via IsCatalogOwnedByExtension - Support location_prefix server option, overriding the GUC default - Accept user-created iceberg_catalog servers in default_catalog GUC Tests: - Add test_reject_modify_different_rest_catalogs_in_single_transaction - Add test_server_location_prefix_overrides_guc - Add tests for TYPE enforcement, case-sensitive server names, and default_catalog GUC with user-created servers - Remove obsolete no-TYPE-defaults-to-rest tests Signed-off-by: sfc-gh-npuka <naisila.puka@snowflake.com>
1 parent 9b018bb commit 0b63f97

16 files changed

Lines changed: 602 additions & 419 deletions

File tree

pg_lake_engine/include/pg_lake/util/string_utils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ extern PGDLLEXPORT char *ReverseStringSearch(const char *haystack, const char *n
3434
extern PGDLLEXPORT int32_t AdjustAnyCharTypmod(int32_t typmod, int32_t newLength);
3535
extern PGDLLEXPORT int32_t GetAnyCharLengthFrom(int32_t typmod);
3636
extern PGDLLEXPORT bool PgStrcasecmpNullable(const char *a, const char *b);
37+
extern PGDLLEXPORT char *PstrdupRemoveTrailingSlash(const char *input);
3738

3839
#define RangeVarQuoteIdentifier(rv) \
3940
(((rv)->schemaname != NULL) ? \

pg_lake_engine/src/utils/catalog_type.c

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,10 @@ HasReadOnlyOption(List *options)
108108

109109

110110
/*
111-
* IsCatalogOwnedByExtension returns true if the catalog name is one of the
112-
* extension-owned literals: 'rest', 'object_store', or 'postgres'.
111+
* IsCatalogOwnedByExtension returns true if the catalog name matches one of
112+
* the extension-owned names: 'rest', 'object_store', or 'postgres'.
113+
* Comparison is case-insensitive so that "Postgres", "REST", etc. are
114+
* also recognized as reserved.
113115
*/
114116
bool
115117
IsCatalogOwnedByExtension(const char *catalog)
@@ -123,7 +125,7 @@ IsCatalogOwnedByExtension(const char *catalog)
123125
/*
124126
* IsRestCatalog returns true if the catalog name identifies a REST catalog.
125127
* This includes the extension-owned 'rest' literal and any user-created
126-
* iceberg_catalog server whose TYPE is 'rest' (or omitted, defaulting to 'rest').
128+
* iceberg_catalog server whose TYPE is 'rest'.
127129
*/
128130
bool
129131
IsRestCatalog(const char *catalog)
@@ -143,13 +145,9 @@ IsRestCatalog(const char *catalog)
143145

144146
ForeignDataWrapper *fdw = GetForeignDataWrapper(server->fdwid);
145147

146-
if (strcmp(fdw->fdwname, "iceberg_catalog") != 0)
148+
if (strcmp(fdw->fdwname, ICEBERG_CATALOG_FDW_NAME) != 0)
147149
return false;
148150

149-
/* Check server TYPE if set */
150-
if (server->servertype != NULL && *server->servertype != '\0')
151-
return pg_strcasecmp(server->servertype, "rest") == 0;
152-
153-
/* No TYPE specified, assume rest */
154-
return true;
151+
Assert(server->servertype != NULL && *server->servertype != '\0');
152+
return pg_strcasecmp(server->servertype, REST_CATALOG_NAME) == 0;
155153
}

pg_lake_engine/src/utils/string_utils.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,3 +261,23 @@ PgStrcasecmpNullable(const char *a, const char *b)
261261

262262
return pg_strcasecmp(a, b) == 0;
263263
}
264+
265+
266+
/*
267+
* PstrdupRemoveTrailingSlash returns a pstrdup'd copy of input with a
268+
* trailing '/' removed, if present. Returns NULL when input is NULL.
269+
*/
270+
char *
271+
PstrdupRemoveTrailingSlash(const char *input)
272+
{
273+
if (input == NULL)
274+
return NULL;
275+
276+
char *result = pstrdup(input);
277+
size_t len = strlen(result);
278+
279+
if (len > 0 && result[len - 1] == '/')
280+
result[len - 1] = '\0';
281+
282+
return result;
283+
}

pg_lake_iceberg/include/pg_lake/http/http_client.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ typedef struct
4646
extern bool HttpClientTraceTraffic;
4747

4848
/* Callback function to determine if a request should be retried */
49-
typedef bool (*HttpRetryFn) (long status, int maxRetry, int retryNo);
49+
typedef bool (*HttpRetryFn) (long status, int maxRetry, int retryNo, void *context, List *headers);
5050

5151
/* plain C API (no PostgreSQL types) */
5252
extern PGDLLEXPORT HttpResult HttpGet(const char *url, List *headers);
@@ -55,5 +55,6 @@ extern PGDLLEXPORT HttpResult HttpPost(const char *url, const char *body, List *
5555
extern PGDLLEXPORT HttpResult HttpDelete(const char *url, List *headers);
5656
extern PGDLLEXPORT HttpResult HttpPut(const char *url, const char *body, List *headers);
5757
extern PGDLLEXPORT HttpResult SendHttpRequestWithRetry(HttpMethod method, const char *url, const char *body,
58-
List *headers, HttpRetryFn retryFn, int maxRetry);
58+
List *headers, HttpRetryFn retryFn, int maxRetry,
59+
void *retryContext);
5960
extern PGDLLEXPORT int LinearBackoffSleepMs(int baseMs, int retryNo);

pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,22 @@ extern int RestCatalogAuthType;
3636
extern bool RestCatalogEnableVendedCredentials;
3737

3838
/*
39-
* Holds per-server REST catalog connection settings. Populated from the
40-
* server options of an iceberg_catalog ForeignServer, with GUC fallback
41-
* for any option not explicitly set on the server.
39+
* Holds per-server REST catalog options. Populated from the server options
40+
* of an iceberg_catalog ForeignServer, with GUC fallback for any option
41+
* not explicitly set on the server.
4242
*/
43-
typedef struct RestCatalogConnectionInfo
43+
typedef struct RestCatalogOptions
4444
{
4545
char *serverName; /* server name, used for token cache keying */
4646
char *host;
4747
char *oauthHostPath;
4848
char *clientId;
4949
char *clientSecret;
5050
char *scope;
51+
char *locationPrefix;
5152
int authType;
5253
bool enableVendedCredentials;
53-
} RestCatalogConnectionInfo;
54+
} RestCatalogOptions;
5455

5556
#define REST_CATALOG_AUTH_TOKEN_PATH "%s/api/catalog/v1/oauth/tokens"
5657

@@ -96,31 +97,32 @@ typedef struct RestCatalogRequest
9697
#define GET_REST_CATALOG_METADATA_LOCATION "%s/api/catalog/v1/%s/namespaces/%s/tables/%s"
9798

9899
/* Connection info resolution */
99-
extern PGDLLEXPORT RestCatalogConnectionInfo * GetRestCatalogConnectionFromServer(const char *serverName);
100-
extern PGDLLEXPORT RestCatalogConnectionInfo * GetRestCatalogConnectionForRelation(Oid relationId);
100+
extern PGDLLEXPORT RestCatalogOptions * GetRestCatalogOptionsFromServer(const char *serverName);
101+
extern PGDLLEXPORT RestCatalogOptions * GetRestCatalogOptionsForRelation(Oid relationId);
101102

102-
extern PGDLLEXPORT void RegisterNamespaceToRestCatalog(RestCatalogConnectionInfo * conn, const char *catalogName, const char *namespaceName);
103+
extern PGDLLEXPORT void RegisterNamespaceToRestCatalog(RestCatalogOptions * opts, const char *catalogName, const char *namespaceName);
103104
extern PGDLLEXPORT void StartStageRestCatalogIcebergTableCreate(Oid relationId);
104105
extern PGDLLEXPORT char *FinishStageRestCatalogIcebergTableCreateRestRequest(Oid relationId, DataFileSchema * dataFileSchema, List *partitionSpecs);
105-
extern PGDLLEXPORT void ErrorIfRestNamespaceDoesNotExist(RestCatalogConnectionInfo * conn, const char *catalogName, const char *namespaceName);
106+
extern PGDLLEXPORT void ErrorIfRestNamespaceDoesNotExist(RestCatalogOptions * opts, const char *catalogName, const char *namespaceName);
106107
extern PGDLLEXPORT char *GetRestCatalogName(Oid relationId);
107108
extern PGDLLEXPORT char *GetRestCatalogNamespace(Oid relationId);
108109
extern PGDLLEXPORT char *GetRestCatalogTableName(Oid relationId);
109110
extern PGDLLEXPORT bool IsReadOnlyRestCatalogIcebergTable(Oid relationId);
110-
extern PGDLLEXPORT char *GetMetadataLocationFromRestCatalog(RestCatalogConnectionInfo * conn, const char *restCatalogName, const char *namespaceName,
111+
extern PGDLLEXPORT char *GetMetadataLocationFromRestCatalog(RestCatalogOptions * opts, const char *restCatalogName, const char *namespaceName,
111112
const char *relationName);
112113
extern PGDLLEXPORT char *GetMetadataLocationForRestCatalogForIcebergTable(Oid relationId);
113114
extern PGDLLEXPORT void ReportHTTPError(HttpResult httpResult, int level);
114-
extern PGDLLEXPORT List *PostHeadersWithAuth(RestCatalogConnectionInfo * conn);
115-
extern PGDLLEXPORT List *DeleteHeadersWithAuth(RestCatalogConnectionInfo * conn);
116-
extern PGDLLEXPORT bool ShouldRetryRequestToRestCatalog(long status, int maxRetry, int retryNo);
117-
extern PGDLLEXPORT HttpResult SendRequestToRestCatalog(HttpMethod method, const char *url, const char *body, List *headers, const char *serverName);
115+
extern PGDLLEXPORT List *PostHeadersWithAuth(RestCatalogOptions * opts);
116+
extern PGDLLEXPORT List *DeleteHeadersWithAuth(RestCatalogOptions * opts);
117+
extern PGDLLEXPORT bool ShouldRetryRequestToRestCatalog(long status, int maxRetry, int retryNo, void *context, List *headers);
118+
extern PGDLLEXPORT HttpResult SendRequestToRestCatalog(HttpMethod method, const char *url, const char *body, List *headers, RestCatalogOptions * opts);
118119
extern PGDLLEXPORT RestCatalogRequest * GetAddSnapshotCatalogRequest(IcebergSnapshot * newSnapshot, Oid relationId);
119120
extern PGDLLEXPORT RestCatalogRequest * GetAddSchemaCatalogRequest(Oid relationId, DataFileSchema * dataFileSchema);
120121
extern PGDLLEXPORT RestCatalogRequest * GetSetCurrentSchemaCatalogRequest(Oid relationId, int32_t schemaId);
121122
extern PGDLLEXPORT RestCatalogRequest * GetAddPartitionCatalogRequest(Oid relationId, List *partitionSpec);
122123
extern PGDLLEXPORT RestCatalogRequest * GetSetPartitionDefaultIdCatalogRequest(Oid relationId, int specId);
123124
extern PGDLLEXPORT RestCatalogRequest * GetRemoveSnapshotCatalogRequest(List *removedSnapshotIds, Oid relationId);
124125

125-
/* ProcessUtility handler: protects extension-owned catalog servers */
126-
extern PGDLLEXPORT bool BlockDDLOnExtensionCatalogs(ProcessUtilityParams *processUtilityParams, void *arg);
126+
/* ProcessUtility handlers for iceberg_catalog servers */
127+
extern PGDLLEXPORT bool BlockDDLOnExtensionCatalogs(ProcessUtilityParams * processUtilityParams, void *arg);
128+
extern PGDLLEXPORT bool RequireRestTypeForIcebergCatalogServer(ProcessUtilityParams * processUtilityParams, void *arg);

pg_lake_iceberg/src/http/http_client.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,8 @@ CurlReturnError(CURL * curl, struct curl_slist *headerList,
277277
*/
278278
HttpResult
279279
SendHttpRequestWithRetry(HttpMethod method, const char *url, const char *body,
280-
List *headers, HttpRetryFn retryFn, int maxRetry)
280+
List *headers, HttpRetryFn retryFn, int maxRetry,
281+
void *retryContext)
281282
{
282283
Assert(maxRetry > 0);
283284

@@ -287,7 +288,7 @@ SendHttpRequestWithRetry(HttpMethod method, const char *url, const char *body,
287288
{
288289
result = SendHttpRequest(method, url, body, headers);
289290

290-
if (retryFn != NULL && retryFn(result.status, maxRetry, retryNo))
291+
if (retryFn != NULL && retryFn(result.status, maxRetry, retryNo, retryContext, headers))
291292
continue;
292293
else
293294
break;

pg_lake_iceberg/src/iceberg/catalog.c

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "pg_lake/object_store_catalog/object_store_catalog.h"
2424
#include "pg_lake/rest_catalog/rest_catalog.h"
2525
#include "pg_lake/util/rel_utils.h"
26+
#include "pg_lake/util/string_utils.h"
2627
#include "pg_extension_base/spi_helpers.h"
2728
#include "catalog/namespace.h"
2829
#include "commands/dbcommands.h"
@@ -715,24 +716,7 @@ UpdateAllInternalIcebergTablesToReadOnly(void)
715716
char *
716717
GetIcebergDefaultLocationPrefix(void)
717718
{
718-
if (IcebergDefaultLocationPrefix == NULL)
719-
{
720-
return NULL;
721-
}
722-
723-
size_t len = strlen(IcebergDefaultLocationPrefix);
724-
725-
if (len > 0 && IcebergDefaultLocationPrefix[len - 1] == '/')
726-
{
727-
/* remove trailing "/" */
728-
char *locationPrefixRemovedTrailingSlash = pstrdup(IcebergDefaultLocationPrefix);
729-
730-
locationPrefixRemovedTrailingSlash[len - 1] = '\0';
731-
732-
return locationPrefixRemovedTrailingSlash;
733-
}
734-
735-
return IcebergDefaultLocationPrefix;
719+
return PstrdupRemoveTrailingSlash(IcebergDefaultLocationPrefix);
736720
}
737721

738722

pg_lake_iceberg/src/init.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
#include "pg_lake/iceberg/operations/vacuum.h"
3636
#include "pg_lake/object_store_catalog/object_store_catalog.h"
3737
#include "pg_lake/rest_catalog/rest_catalog.h"
38+
#include "pg_lake/util/catalog_type.h"
39+
#include "access/xact.h"
3840

3941
#define GUC_STANDARD 0
4042

@@ -372,8 +374,16 @@ IcebergDefaultCatalogCheckHook(char **newvalue, void **extra, GucSource source)
372374
pg_strcasecmp(newCatalog, OBJECT_STORE_CATALOG_NAME) == 0)
373375
return true;
374376

377+
/*
378+
* When catalog access is available, also accept user-created
379+
* iceberg_catalog foreign servers with TYPE 'rest'.
380+
*/
381+
if (IsTransactionState() && IsRestCatalog(newCatalog))
382+
return true;
383+
375384
GUC_check_errdetail("pg_lake_iceberg: allowed iceberg catalog options are '" POSTGRES_CATALOG_NAME "', "
376-
" '" REST_CATALOG_NAME "' and '" OBJECT_STORE_CATALOG_NAME "'");
385+
"'" REST_CATALOG_NAME "', '" OBJECT_STORE_CATALOG_NAME
386+
"', or the name of a user-created iceberg_catalog server with TYPE 'rest'");
377387

378388
return false;
379389
}

0 commit comments

Comments
 (0)