Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
32731a1
Add iceberg_catalog FDW and refactor REST catalog for multi-server su…
sfc-gh-npuka Mar 2, 2026
6cc0b78
Recognize server-based REST catalogs in catalog type detection
sfc-gh-npuka Mar 2, 2026
10b66ec
Support server-based catalogs in option validation and table creation
sfc-gh-npuka Mar 2, 2026
bf3f5e8
Track per-table REST catalog connections in transaction handling
sfc-gh-npuka Mar 2, 2026
dc60bc3
Add tests for iceberg_catalog server configuration
sfc-gh-npuka Mar 2, 2026
f39d59f
Add ProtectExtensionCatalogServersHandler
sfc-gh-npuka Mar 12, 2026
1e45830
Address review
sfc-gh-npuka Mar 12, 2026
93fc158
Allow extension owned rest catalog to alter server options
sfc-gh-npuka Mar 12, 2026
260dc78
Address review part 1
sfc-gh-npuka Mar 13, 2026
d3a6250
Address review part 2
sfc-gh-npuka Mar 13, 2026
e94d080
Block reserved catalog names and fix prefix-match comparisons
sfc-gh-npuka Mar 13, 2026
e12e69a
Address Onder's review
sfc-gh-npuka Mar 19, 2026
8fd726f
Treat postgres, object_store, and rest as built-in names, not servers
sfc-gh-npuka Mar 24, 2026
b2024c2
Use catalog_name server option for REST catalog tables
sfc-gh-npuka Mar 24, 2026
4566882
Unrelated to PR - minor fix for reordering of tests
sfc-gh-npuka Mar 25, 2026
f1bb78f
Cleanup from rebase and fix style
sfc-gh-npuka Apr 16, 2026
d153edc
Address Aykut's latest review
sfc-gh-npuka Apr 16, 2026
31f11c8
Move credentials from server options to user mapping in iceberg_catal…
sfc-gh-npuka Mar 9, 2026
7f4a5e4
Add multi-tier credential resolution: user mapping, catalogs.conf, GU…
sfc-gh-npuka Mar 9, 2026
799244a
Update tests for user mapping credentials and catalogs.conf resolution
sfc-gh-npuka Mar 9, 2026
db9e9c5
Scrub user mapping secrets in-place via ProcessUtility handler
sfc-gh-npuka Mar 10, 2026
7f71fc2
Address review
sfc-gh-npuka Mar 14, 2026
7f645e2
Add configurable catalogs.conf path via GUC
sfc-gh-npuka Mar 16, 2026
747b2e4
Use currentChar in ScrubUserMappingSecrets for readability
sfc-gh-npuka Mar 25, 2026
507450b
Rename ScrubUserMappingSecrets to RedactUserMappingSecrets; fix style
sfc-gh-npuka Apr 16, 2026
fb0da6f
Make read_only a server option; Support TYPE 'object_store' servers
sfc-gh-npuka Apr 17, 2026
706d708
Fix style
sfc-gh-npuka Apr 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pg_lake_engine/include/pg_lake/util/catalog_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#pragma once

/* FDW name for iceberg_catalog servers */
#define ICEBERG_CATALOG_FDW_NAME "iceberg_catalog"

/*
* The allowed values for IcebergDefaultCatalog, case insensitive.
*/
Expand Down Expand Up @@ -61,3 +64,6 @@ extern PGDLLEXPORT IcebergCatalogType GetIcebergCatalogType(Oid relationId);
extern PGDLLEXPORT bool HasRestCatalogTableOption(List *options);
extern PGDLLEXPORT bool HasObjectStoreCatalogTableOption(List *options);
extern PGDLLEXPORT bool HasReadOnlyOption(List *options);
extern PGDLLEXPORT bool IsCatalogOwnedByExtension(const char *catalog);
extern PGDLLEXPORT bool IsRestCatalog(const char *catalog);
extern PGDLLEXPORT bool IsObjectStoreCatalog(const char *catalog);
89 changes: 83 additions & 6 deletions pg_lake_engine/src/utils/catalog_type.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,28 +68,30 @@ GetIcebergCatalogType(Oid relationId)


/*
* HasRestCatalogTableOption returns true if the options contain
* catalog='rest'.
* HasRestCatalogTableOption returns true if the catalog option indicates a
* REST catalog: either the literal value 'rest' or the name of an
* iceberg_catalog foreign server with TYPE 'rest'.
*/
bool
HasRestCatalogTableOption(List *options)
{
char *catalog = GetStringOption(options, "catalog", false);

return catalog ? pg_strncasecmp(catalog, REST_CATALOG_NAME, strlen(catalog)) == 0 : false;
return IsRestCatalog(catalog);
}


/*
* HasObjectStoreCatalogTableOption returns true if the options contain
* catalog='object_store'.
* HasObjectStoreCatalogTableOption returns true if the catalog option
* indicates an object store catalog: either the literal 'object_store'
* or a named iceberg_catalog server with TYPE 'object_store'.
*/
bool
HasObjectStoreCatalogTableOption(List *options)
{
char *catalog = GetStringOption(options, "catalog", false);

return catalog ? pg_strncasecmp(catalog, OBJECT_STORE_CATALOG_NAME, strlen(catalog)) == 0 : false;
return IsObjectStoreCatalog(catalog);
}


Expand All @@ -104,3 +106,78 @@ HasReadOnlyOption(List *options)

return readOnly ? pg_strncasecmp(readOnly, "true", strlen("true")) == 0 : false;
}


/*
* IsCatalogOwnedByExtension returns true if the catalog name is one of
* the reserved built-in names: 'rest', 'object_store', or 'postgres'.
* Comparison is case-insensitive so that "Postgres", "REST", etc. are
* also recognized as reserved.
*/
bool
IsCatalogOwnedByExtension(const char *catalog)
{
return pg_strcasecmp(catalog, REST_CATALOG_NAME) == 0 ||
pg_strcasecmp(catalog, OBJECT_STORE_CATALOG_NAME) == 0 ||
pg_strcasecmp(catalog, POSTGRES_CATALOG_NAME) == 0;
}


/*
* IsRestCatalog returns true if the catalog name identifies a REST catalog.
* This includes the built-in 'rest' literal and any user-created
* iceberg_catalog server whose TYPE is 'rest'.
*/
bool
IsRestCatalog(const char *catalog)
{
if (catalog == NULL)
return false;

if (pg_strcasecmp(catalog, REST_CATALOG_NAME) == 0)
return true;

/* Try to look up a server with this name */
bool missingOK = true;
ForeignServer *server = GetForeignServerByName(catalog, missingOK);

if (server == NULL)
return false;

ForeignDataWrapper *fdw = GetForeignDataWrapper(server->fdwid);

if (strcmp(fdw->fdwname, ICEBERG_CATALOG_FDW_NAME) != 0)
return false;

return pg_strcasecmp(server->servertype, REST_CATALOG_NAME) == 0;
}


/*
* IsObjectStoreCatalog returns true if the catalog name identifies an
* object store catalog. This includes the built-in 'object_store'
* literal and any user-created iceberg_catalog server whose TYPE is
* 'object_store'.
*/
bool
IsObjectStoreCatalog(const char *catalog)
{
if (catalog == NULL)
return false;

if (pg_strcasecmp(catalog, OBJECT_STORE_CATALOG_NAME) == 0)
return true;

bool missingOK = true;
ForeignServer *server = GetForeignServerByName(catalog, missingOK);

if (server == NULL)
return false;

ForeignDataWrapper *fdw = GetForeignDataWrapper(server->fdwid);

if (strcmp(fdw->fdwname, ICEBERG_CATALOG_FDW_NAME) != 0)
return false;

return pg_strcasecmp(server->servertype, OBJECT_STORE_CATALOG_NAME) == 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,28 @@

#include "postgres.h"

#pragma once

/* crunchy_iceberg.enable_object_store_catalog setting */
extern PGDLLEXPORT bool EnableObjectStoreCatalog;

extern PGDLLEXPORT char *ObjectStoreCatalogLocationPrefix;
extern PGDLLEXPORT char *ExternalObjectStorePrefix;
extern PGDLLEXPORT char *InternalObjectStorePrefix;

/*
* Resolved object store catalog options. For the built-in 'object_store'
* catalog the fields come from GUC settings. For user-created servers
* (CREATE SERVER ... TYPE 'object_store') the server options override the
* GUC defaults.
*/
typedef struct ObjectStoreCatalogOptions
{
char *catalog; /* server name or "object_store" */
char *locationPrefix; /* object store catalog location prefix */
bool readOnly; /* server-level read_only default */
} ObjectStoreCatalogOptions;

extern PGDLLEXPORT ObjectStoreCatalogOptions * GetObjectStoreCatalogOptionsFromCatalog(const char *catalog);

extern PGDLLEXPORT void InitObjectStoreCatalog(void);
extern PGDLLEXPORT void ExportIcebergCatalogIfChanged(void);
extern PGDLLEXPORT const char *GetObjectStoreDefaultLocationPrefix(void);
Expand Down
50 changes: 43 additions & 7 deletions pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
#pragma once

#include "postgres.h"
#include "pg_lake/ddl/utility_hook.h"
#include "pg_lake/http/http_client.h"
#include "pg_lake/util/rel_utils.h"
#include "pg_lake/parquet/field.h"
#include "pg_lake/ddl/utility_hook.h"
#include "pg_lake/iceberg/api/snapshot.h"

#define REST_CATALOG_AUTH_TYPE_DEFAULT (0)
#define REST_CATALOG_AUTH_TYPE_OAUTH2 (0)
#define REST_CATALOG_AUTH_TYPE_HORIZON (1)

extern char *CatalogsConfPath;
extern PGDLLEXPORT char *RestCatalogHost;
extern char *RestCatalogOauthHostPath;
extern char *RestCatalogClientId;
Expand All @@ -34,6 +37,28 @@ extern char *RestCatalogScope;
extern int RestCatalogAuthType;
extern bool RestCatalogEnableVendedCredentials;

/*
* Resolved REST catalog connection options. For the built-in 'rest'
* catalog the fields come entirely from GUC settings. For user-created
* catalogs (CREATE SERVER ... FOREIGN DATA WRAPPER iceberg_catalog) the
* server options override the GUC defaults.
*/
typedef struct RestCatalogOptions
{
char *catalog; /* catalog name, used for token cache keying;
* can be 'rest' or a user-created server name
* of TYPE 'rest' */
char *host;
char *oauthHostPath;
char *clientId;
char *clientSecret;
char *scope;
char *locationPrefix;
char *catalogName; /* REST API catalog prefix; defaults to dbname */
int authType;
bool enableVendedCredentials;
} RestCatalogOptions;

#define REST_CATALOG_AUTH_TOKEN_PATH "%s/api/catalog/v1/oauth/tokens"

#define REST_CATALOG_NAMESPACE_NAME "%s/api/catalog/v1/%s/namespaces/%s"
Expand Down Expand Up @@ -77,24 +102,35 @@ typedef struct RestCatalogRequest
#define REST_CATALOG_AUTH_TOKEN_PATH "%s/api/catalog/v1/oauth/tokens"
#define GET_REST_CATALOG_METADATA_LOCATION "%s/api/catalog/v1/%s/namespaces/%s/tables/%s"

extern PGDLLEXPORT void RegisterNamespaceToRestCatalog(const char *catalogName, const char *namespaceName);
/* Catalog options resolution */
extern PGDLLEXPORT RestCatalogOptions * GetRestCatalogOptionsFromCatalog(const char *catalog);
extern PGDLLEXPORT RestCatalogOptions * GetRestCatalogOptionsForRelation(Oid relationId);

extern PGDLLEXPORT void RegisterNamespaceToRestCatalog(RestCatalogOptions * opts, const char *catalogName, const char *namespaceName);
extern PGDLLEXPORT void StartStageRestCatalogIcebergTableCreate(Oid relationId);
extern PGDLLEXPORT char *FinishStageRestCatalogIcebergTableCreateRestRequest(Oid relationId, DataFileSchema * dataFileSchema, List *partitionSpecs);
extern PGDLLEXPORT void ErrorIfRestNamespaceDoesNotExist(const char *catalogName, const char *namespaceName);
extern PGDLLEXPORT void ErrorIfRestNamespaceDoesNotExist(RestCatalogOptions * opts, const char *catalogName, const char *namespaceName);
extern PGDLLEXPORT char *GetRestCatalogName(Oid relationId);
extern PGDLLEXPORT char *GetRestCatalogNamespace(Oid relationId);
extern PGDLLEXPORT char *GetRestCatalogTableName(Oid relationId);
extern PGDLLEXPORT bool IsReadOnlyRestCatalogIcebergTable(Oid relationId);
extern PGDLLEXPORT char *GetMetadataLocationFromRestCatalog(const char *restCatalogName, const char *namespaceName,
extern PGDLLEXPORT char *GetMetadataLocationFromRestCatalog(RestCatalogOptions * opts, const char *restCatalogName, const char *namespaceName,
const char *relationName);
extern PGDLLEXPORT char *GetMetadataLocationForRestCatalogForIcebergTable(Oid relationId);
extern PGDLLEXPORT void ReportHTTPError(HttpResult httpResult, int level);
extern PGDLLEXPORT List *PostHeadersWithAuth(void);
extern PGDLLEXPORT List *DeleteHeadersWithAuth(void);
extern PGDLLEXPORT HttpResult SendRequestToRestCatalog(HttpMethod method, const char *url, const char *body, List *headers);
extern PGDLLEXPORT List *PostHeadersWithAuth(RestCatalogOptions * opts);
extern PGDLLEXPORT List *DeleteHeadersWithAuth(RestCatalogOptions * opts);
extern PGDLLEXPORT HttpResult SendRequestToRestCatalog(HttpMethod method, const char *url, const char *body, List *headers, RestCatalogOptions * opts);
extern PGDLLEXPORT RestCatalogRequest * GetAddSnapshotCatalogRequest(IcebergSnapshot * newSnapshot, Oid relationId);
extern PGDLLEXPORT RestCatalogRequest * GetAddSchemaCatalogRequest(Oid relationId, DataFileSchema * dataFileSchema);
extern PGDLLEXPORT RestCatalogRequest * GetSetCurrentSchemaCatalogRequest(Oid relationId, int32_t schemaId);
extern PGDLLEXPORT RestCatalogRequest * GetAddPartitionCatalogRequest(Oid relationId, List *partitionSpec);
extern PGDLLEXPORT RestCatalogRequest * GetSetPartitionDefaultIdCatalogRequest(Oid relationId, int specId);
extern PGDLLEXPORT RestCatalogRequest * GetRemoveSnapshotCatalogRequest(List *removedSnapshotIds, Oid relationId);

/* ProcessUtility handlers */
/* iceberg_catalog server DDL validation */
extern PGDLLEXPORT bool ValidateIcebergCatalogServerDDL(ProcessUtilityParams * processUtilityParams, void *arg);

/* scrubs user mapping secrets in-place */
extern PGDLLEXPORT bool RedactRestCatalogUserMappingSecrets(ProcessUtilityParams * processUtilityParams, void *arg);
59 changes: 59 additions & 0 deletions pg_lake_iceberg/pg_lake_iceberg--3.2--3.3.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,62 @@ CREATE OR REPLACE VIEW pg_catalog.iceberg_tables AS
FROM lake_iceberg.tables
WHERE metadata_location IS NOT NULL;

/*
* iceberg_catalog foreign data wrapper: allows defining named catalog
* configurations via CREATE SERVER so that users are not limited to a
* single global catalog configured through GUC settings.
*
* Supported server types:
* TYPE 'rest' -- REST catalog (e.g. Polaris, Gravitino)
* TYPE 'object_store' -- Object store catalog (catalog.json in S3)
*
* Server options (all optional):
* rest_endpoint, rest_auth_type, oauth_endpoint, scope,
* enable_vended_credentials, location_prefix, catalog_name,
* read_only (boolean).
*
* User mapping options (credentials, TYPE 'rest' only):
* client_id, client_secret, scope.
*
* scope is accepted in both server and user mapping; user mapping wins.
* read_only on a server propagates to all tables unless overridden.
*
* Credential resolution order (TYPE 'rest'):
* 1. CREATE USER MAPPING for the current user
* 2. $PGDATA/catalogs.conf (platform-provided)
* 3. GUC variables (backward compatibility)
*
* REST catalog example:
* CREATE SERVER my_polaris TYPE 'rest'
* FOREIGN DATA WRAPPER iceberg_catalog
* OPTIONS (rest_endpoint 'https://polaris.example.com');
*
* CREATE USER MAPPING FOR user1 SERVER my_polaris
* OPTIONS (client_id '...', client_secret '...');
*
* CREATE TABLE t (a int) USING iceberg WITH (catalog = 'my_polaris');
*
* Object store catalog example (shared writer/reader):
* -- Writer instance:
* CREATE SERVER shared_catalog TYPE 'object_store'
* FOREIGN DATA WRAPPER iceberg_catalog
* OPTIONS (location_prefix 's3://bucket/shared');
*
* CREATE TABLE t (a int) USING iceberg
* WITH (catalog = 'shared_catalog');
*
* -- Reader instance (same S3 path, read-only):
* CREATE SERVER shared_catalog TYPE 'object_store'
* FOREIGN DATA WRAPPER iceberg_catalog
* OPTIONS (location_prefix 's3://bucket/shared', read_only 'true');
*/
CREATE FUNCTION lake_iceberg.iceberg_catalog_validator(text[], oid)
RETURNS void
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;

CREATE FOREIGN DATA WRAPPER iceberg_catalog
NO HANDLER
VALIDATOR lake_iceberg.iceberg_catalog_validator;

GRANT USAGE ON FOREIGN DATA WRAPPER iceberg_catalog TO lake_write;
34 changes: 28 additions & 6 deletions pg_lake_iceberg/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include "pg_lake/iceberg/operations/vacuum.h"
#include "pg_lake/object_store_catalog/object_store_catalog.h"
#include "pg_lake/rest_catalog/rest_catalog.h"
#include "pg_lake/util/catalog_type.h"
#include "access/xact.h"

#define GUC_STANDARD 0

Expand All @@ -59,7 +61,8 @@ void _PG_init(void);

/* pg_lake_iceberg.rest_catalog_auth_type */
static const struct config_enum_entry RestCatalogAuthTypeOptions[] = {
{"default", REST_CATALOG_AUTH_TYPE_DEFAULT, false},
{"oauth2", REST_CATALOG_AUTH_TYPE_OAUTH2, false},
{"default", REST_CATALOG_AUTH_TYPE_OAUTH2, false},
{"horizon", REST_CATALOG_AUTH_TYPE_HORIZON, false},
{NULL, 0, false},
};
Expand Down Expand Up @@ -252,11 +255,21 @@ _PG_init(void)
GUC_UNIT_KB,
NULL, NULL, NULL);

DefineCustomStringVariable("pg_lake_iceberg.catalogs_conf_path",
gettext_noop("Path to the catalog credentials file. "
"Defaults to $PGDATA/catalogs.conf."),
NULL,
&CatalogsConfPath,
"catalogs.conf",
PGC_SIGHUP,
GUC_SUPERUSER_ONLY,
NULL, NULL, NULL);

DefineCustomEnumVariable("pg_lake_iceberg.rest_catalog_auth_type",
gettext_noop("Determines the format for the initial OAuth token requests."),
NULL,
&RestCatalogAuthType,
REST_CATALOG_AUTH_TYPE_DEFAULT,
REST_CATALOG_AUTH_TYPE_OAUTH2,
RestCatalogAuthTypeOptions,
PGC_SUSET,
GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
Expand Down Expand Up @@ -366,13 +379,22 @@ IcebergDefaultCatalogCheckHook(char **newvalue, void **extra, GucSource source)
{
char *newCatalog = *newvalue;

if (pg_strncasecmp(newCatalog, POSTGRES_CATALOG_NAME, strlen(newCatalog)) == 0 ||
pg_strncasecmp(newCatalog, REST_CATALOG_NAME, strlen(newCatalog)) == 0 ||
pg_strncasecmp(newCatalog, OBJECT_STORE_CATALOG_NAME, strlen(newCatalog)) == 0)
if (pg_strcasecmp(newCatalog, POSTGRES_CATALOG_NAME) == 0 ||
pg_strcasecmp(newCatalog, REST_CATALOG_NAME) == 0 ||
pg_strcasecmp(newCatalog, OBJECT_STORE_CATALOG_NAME) == 0)
return true;

/*
* When catalog access is available, also accept user-created
* iceberg_catalog foreign servers with TYPE 'rest' or 'object_store'.
*/
if (IsTransactionState() &&
(IsRestCatalog(newCatalog) || IsObjectStoreCatalog(newCatalog)))
return true;

GUC_check_errdetail("pg_lake_iceberg: allowed iceberg catalog options are '" POSTGRES_CATALOG_NAME "', "
" '" REST_CATALOG_NAME "' and '" OBJECT_STORE_CATALOG_NAME "'");
"'" REST_CATALOG_NAME "', '" OBJECT_STORE_CATALOG_NAME
"', or the name of a user-created iceberg_catalog server");

return false;
}
Loading
Loading