Skip to content

Commit fb0da6f

Browse files
committed
Make read_only a server option; Support TYPE 'object_store' servers
Allow CREATE SERVER ... TYPE 'object_store' FOREIGN DATA WRAPPER iceberg_catalog, mirroring the existing TYPE 'rest' pattern. Each server can carry its own location_prefix and read_only options, enabling shared writer/reader topologies where multiple Postgres instances point at the same S3 path. Key changes: - ValidateIcebergCatalogServerDDL now accepts TYPE 'object_store' (TYPE 'postgres' remains rejected). - Add IsObjectStoreCatalog() in catalog_type.c, paralleling IsRestCatalog(); update HasObjectStoreCatalogTableOption to use it so that tables with catalog='<named_server>' resolve correctly. - Add ObjectStoreCatalogOptions struct and resolution function that reads server options with GUC fallback. - Add read_only as a server-level option (valid for both REST and object_store servers). Server read_only propagates to all tables unless overridden; table read_only='false' on a read_only server is an error. - Update create_table.c to resolve location_prefix from named object_store servers instead of requiring the GUC. - Update IcebergDefaultCatalogCheckHook to accept named object_store servers. - Update catalog export SPI query to include tables on named object_store servers. Signed-off-by: sfc-gh-npuka <naisila.puka@snowflake.com>
1 parent 507450b commit fb0da6f

10 files changed

Lines changed: 320 additions & 53 deletions

File tree

pg_lake_engine/include/pg_lake/util/catalog_type.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,4 @@ extern PGDLLEXPORT bool HasObjectStoreCatalogTableOption(List *options);
6666
extern PGDLLEXPORT bool HasReadOnlyOption(List *options);
6767
extern PGDLLEXPORT bool IsCatalogOwnedByExtension(const char *catalog);
6868
extern PGDLLEXPORT bool IsRestCatalog(const char *catalog);
69+
extern PGDLLEXPORT bool IsObjectStoreCatalog(const char *catalog);

pg_lake_engine/src/utils/catalog_type.c

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,16 @@ HasRestCatalogTableOption(List *options)
8282

8383

8484
/*
85-
* HasObjectStoreCatalogTableOption returns true if the options contain
86-
* catalog='object_store'.
85+
* HasObjectStoreCatalogTableOption returns true if the catalog option
86+
* indicates an object store catalog: either the literal 'object_store'
87+
* or a named iceberg_catalog server with TYPE 'object_store'.
8788
*/
8889
bool
8990
HasObjectStoreCatalogTableOption(List *options)
9091
{
9192
char *catalog = GetStringOption(options, "catalog", false);
9293

93-
return catalog ? pg_strcasecmp(catalog, OBJECT_STORE_CATALOG_NAME) == 0 : false;
94+
return IsObjectStoreCatalog(catalog);
9495
}
9596

9697

@@ -148,6 +149,35 @@ IsRestCatalog(const char *catalog)
148149
if (strcmp(fdw->fdwname, ICEBERG_CATALOG_FDW_NAME) != 0)
149150
return false;
150151

151-
Assert(pg_strcasecmp(server->servertype, REST_CATALOG_NAME) == 0);
152-
return true;
152+
return pg_strcasecmp(server->servertype, REST_CATALOG_NAME) == 0;
153+
}
154+
155+
156+
/*
157+
* IsObjectStoreCatalog returns true if the catalog name identifies an
158+
* object store catalog. This includes the built-in 'object_store'
159+
* literal and any user-created iceberg_catalog server whose TYPE is
160+
* 'object_store'.
161+
*/
162+
bool
163+
IsObjectStoreCatalog(const char *catalog)
164+
{
165+
if (catalog == NULL)
166+
return false;
167+
168+
if (pg_strcasecmp(catalog, OBJECT_STORE_CATALOG_NAME) == 0)
169+
return true;
170+
171+
bool missingOK = true;
172+
ForeignServer *server = GetForeignServerByName(catalog, missingOK);
173+
174+
if (server == NULL)
175+
return false;
176+
177+
ForeignDataWrapper *fdw = GetForeignDataWrapper(server->fdwid);
178+
179+
if (strcmp(fdw->fdwname, ICEBERG_CATALOG_FDW_NAME) != 0)
180+
return false;
181+
182+
return pg_strcasecmp(server->servertype, OBJECT_STORE_CATALOG_NAME) == 0;
153183
}

pg_lake_iceberg/include/pg_lake/object_store_catalog/object_store_catalog.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,28 @@
22

33
#include "postgres.h"
44

5-
#pragma once
6-
75
/* crunchy_iceberg.enable_object_store_catalog setting */
86
extern PGDLLEXPORT bool EnableObjectStoreCatalog;
97

108
extern PGDLLEXPORT char *ObjectStoreCatalogLocationPrefix;
119
extern PGDLLEXPORT char *ExternalObjectStorePrefix;
1210
extern PGDLLEXPORT char *InternalObjectStorePrefix;
1311

12+
/*
13+
* Resolved object store catalog options. For the built-in 'object_store'
14+
* catalog the fields come from GUC settings. For user-created servers
15+
* (CREATE SERVER ... TYPE 'object_store') the server options override the
16+
* GUC defaults.
17+
*/
18+
typedef struct ObjectStoreCatalogOptions
19+
{
20+
char *catalog; /* server name or "object_store" */
21+
char *locationPrefix; /* object store catalog location prefix */
22+
bool readOnly; /* server-level read_only default */
23+
} ObjectStoreCatalogOptions;
24+
25+
extern PGDLLEXPORT ObjectStoreCatalogOptions * GetObjectStoreCatalogOptionsFromCatalog(const char *catalog);
26+
1427
extern PGDLLEXPORT void InitObjectStoreCatalog(void);
1528
extern PGDLLEXPORT void ExportIcebergCatalogIfChanged(void);
1629
extern PGDLLEXPORT const char *GetObjectStoreDefaultLocationPrefix(void);

pg_lake_iceberg/pg_lake_iceberg--3.2--3.3.sql

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,29 @@ CREATE OR REPLACE VIEW pg_catalog.iceberg_tables AS
1818
/*
1919
* iceberg_catalog foreign data wrapper: allows defining named catalog
2020
* configurations via CREATE SERVER so that users are not limited to a
21-
* single global REST catalog configured through GUC settings.
21+
* single global catalog configured through GUC settings.
2222
*
23-
* Server options (non-secret): rest_endpoint, rest_auth_type,
24-
* oauth_endpoint, scope, enable_vended_credentials, location_prefix,
25-
* catalog_name.
26-
* User mapping options (credentials): client_id, client_secret, scope.
23+
* Supported server types:
24+
* TYPE 'rest' -- REST catalog (e.g. Polaris, Gravitino)
25+
* TYPE 'object_store' -- Object store catalog (catalog.json in S3)
26+
*
27+
* Server options (all optional):
28+
* rest_endpoint, rest_auth_type, oauth_endpoint, scope,
29+
* enable_vended_credentials, location_prefix, catalog_name,
30+
* read_only (boolean).
31+
*
32+
* User mapping options (credentials, TYPE 'rest' only):
33+
* client_id, client_secret, scope.
2734
*
2835
* scope is accepted in both server and user mapping; user mapping wins.
36+
* read_only on a server propagates to all tables unless overridden.
2937
*
30-
* Credential resolution order:
38+
* Credential resolution order (TYPE 'rest'):
3139
* 1. CREATE USER MAPPING for the current user
3240
* 2. $PGDATA/catalogs.conf (platform-provided)
3341
* 3. GUC variables (backward compatibility)
3442
*
35-
* User-defined catalog example:
43+
* REST catalog example:
3644
* CREATE SERVER my_polaris TYPE 'rest'
3745
* FOREIGN DATA WRAPPER iceberg_catalog
3846
* OPTIONS (rest_endpoint 'https://polaris.example.com');
@@ -42,14 +50,19 @@ CREATE OR REPLACE VIEW pg_catalog.iceberg_tables AS
4250
*
4351
* CREATE TABLE t (a int) USING iceberg WITH (catalog = 'my_polaris');
4452
*
45-
* Platform-provided catalog example:
46-
* CREATE SERVER horizon TYPE 'rest'
53+
* Object store catalog example (shared writer/reader):
54+
* -- Writer instance:
55+
* CREATE SERVER shared_catalog TYPE 'object_store'
4756
* FOREIGN DATA WRAPPER iceberg_catalog
48-
* OPTIONS (rest_endpoint 'https://horizon.example.com');
57+
* OPTIONS (location_prefix 's3://bucket/shared');
4958
*
50-
* -- Credentials in $PGDATA/catalogs.conf:
51-
* -- horizon.client_id = 'platform_id'
52-
* -- horizon.client_secret = 'platform_secret'
59+
* CREATE TABLE t (a int) USING iceberg
60+
* WITH (catalog = 'shared_catalog');
61+
*
62+
* -- Reader instance (same S3 path, read-only):
63+
* CREATE SERVER shared_catalog TYPE 'object_store'
64+
* FOREIGN DATA WRAPPER iceberg_catalog
65+
* OPTIONS (location_prefix 's3://bucket/shared', read_only 'true');
5366
*/
5467
CREATE FUNCTION lake_iceberg.iceberg_catalog_validator(text[], oid)
5568
RETURNS void

pg_lake_iceberg/src/init.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -386,14 +386,15 @@ IcebergDefaultCatalogCheckHook(char **newvalue, void **extra, GucSource source)
386386

387387
/*
388388
* When catalog access is available, also accept user-created
389-
* iceberg_catalog foreign servers with TYPE 'rest'.
389+
* iceberg_catalog foreign servers with TYPE 'rest' or 'object_store'.
390390
*/
391-
if (IsTransactionState() && IsRestCatalog(newCatalog))
391+
if (IsTransactionState() &&
392+
(IsRestCatalog(newCatalog) || IsObjectStoreCatalog(newCatalog)))
392393
return true;
393394

394395
GUC_check_errdetail("pg_lake_iceberg: allowed iceberg catalog options are '" POSTGRES_CATALOG_NAME "', "
395396
"'" REST_CATALOG_NAME "', '" OBJECT_STORE_CATALOG_NAME
396-
"', or the name of a user-created iceberg_catalog server with TYPE 'rest'");
397+
"', or the name of a user-created iceberg_catalog server");
397398

398399
return false;
399400
}

pg_lake_iceberg/src/object_store_catalog/object_store_catalog.c

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "miscadmin.h"
44

55
#include "commands/dbcommands.h"
6+
#include "commands/defrem.h"
67
#include "foreign/foreign.h"
78
#include "utils/inval.h"
89
#include "utils/snapmgr.h"
@@ -21,6 +22,7 @@
2122
#include "pg_lake/extensions/pg_lake_iceberg.h"
2223
#include "pg_lake/storage/local_storage.h"
2324
#include "pg_lake/util/string_utils.h"
25+
#include "pg_lake/util/catalog_type.h"
2426

2527
char *ObjectStoreCatalogLocationPrefix = NULL;
2628
char *ExternalObjectStorePrefix = "fromsf";
@@ -321,6 +323,11 @@ PushMetadataLocationToObjectStoreCatalog(void)
321323

322324
StringInfo fetchObjectStoreMetadata = makeStringInfo();
323325

326+
/*
327+
* Match tables whose catalog option is either the built-in
328+
* 'object_store' or any user-created iceberg_catalog server with
329+
* TYPE 'object_store'.
330+
*/
324331
appendStringInfo(fetchObjectStoreMetadata,
325332
" SELECT "
326333
" c.metadata_location,"
@@ -331,7 +338,17 @@ PushMetadataLocationToObjectStoreCatalog(void)
331338
" WHERE EXISTS ("
332339
" SELECT 1"
333340
" FROM unnest(f.ftoptions) opt"
334-
" WHERE LOWER(opt) = LOWER('catalog=" OBJECT_STORE_CATALOG_NAME "')"
341+
" WHERE opt ~* '^catalog='"
342+
" AND ("
343+
" LOWER(opt) = LOWER('catalog=" OBJECT_STORE_CATALOG_NAME "')"
344+
" OR LOWER(SUBSTRING(opt FROM 9)) IN ("
345+
" SELECT LOWER(srvname)"
346+
" FROM pg_foreign_server s"
347+
" JOIN pg_foreign_data_wrapper w ON s.srvfdw = w.oid"
348+
" WHERE w.fdwname = '" ICEBERG_CATALOG_FDW_NAME "'"
349+
" AND LOWER(s.srvtype) = LOWER('" OBJECT_STORE_CATALOG_NAME "')"
350+
" )"
351+
" )"
335352
" )");
336353

337354
/*
@@ -569,3 +586,47 @@ GetObjectStoreDefaultLocationPrefix(void)
569586

570587
return StripTrailingSlash(ObjectStoreCatalogLocationPrefix, inPlace);
571588
}
589+
590+
591+
/*
592+
* GetObjectStoreCatalogOptionsFromCatalog returns resolved object store
593+
* catalog options. For the built-in 'object_store' literal the GUCs are
594+
* used directly. For user-created servers the server options override
595+
* the GUC defaults.
596+
*/
597+
ObjectStoreCatalogOptions *
598+
GetObjectStoreCatalogOptionsFromCatalog(const char *catalog)
599+
{
600+
ObjectStoreCatalogOptions *opts = palloc0(sizeof(ObjectStoreCatalogOptions));
601+
602+
if (pg_strcasecmp(catalog, OBJECT_STORE_CATALOG_NAME) == 0)
603+
opts->catalog = pstrdup(OBJECT_STORE_CATALOG_NAME);
604+
else
605+
opts->catalog = pstrdup(catalog);
606+
607+
/* GUC defaults */
608+
opts->locationPrefix = (char *) GetObjectStoreDefaultLocationPrefix();
609+
opts->readOnly = false;
610+
611+
if (pg_strcasecmp(catalog, OBJECT_STORE_CATALOG_NAME) != 0)
612+
{
613+
ForeignServer *server = GetForeignServerByName(catalog, false);
614+
ListCell *lc;
615+
616+
foreach(lc, server->options)
617+
{
618+
DefElem *def = (DefElem *) lfirst(lc);
619+
620+
if (pg_strcasecmp(def->defname, "location_prefix") == 0)
621+
{
622+
bool inPlace = false;
623+
624+
opts->locationPrefix = StripTrailingSlash(defGetString(def), inPlace);
625+
}
626+
else if (pg_strcasecmp(def->defname, "read_only") == 0)
627+
opts->readOnly = defGetBoolean(def);
628+
}
629+
}
630+
631+
return opts;
632+
}

pg_lake_iceberg/src/rest_catalog/rest_catalog.c

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ static const char *iceberg_catalog_server_options[] = {
118118
"enable_vended_credentials",
119119
"location_prefix",
120120
"catalog_name",
121+
"read_only",
121122
NULL
122123
};
123124

@@ -146,7 +147,7 @@ is_valid_option_in_list(const char *keyword, const char *const *options)
146147
* iceberg_catalog_validator validates options for the iceberg_catalog FDW.
147148
*
148149
* Server options: rest_endpoint, scope, rest_auth_type, oauth_endpoint,
149-
* enable_vended_credentials, location_prefix, catalog_name.
150+
* enable_vended_credentials, location_prefix, catalog_name, read_only.
150151
* User mapping options: client_id, client_secret, scope.
151152
*
152153
* scope is accepted in both places; user mapping scope takes priority.
@@ -199,7 +200,7 @@ iceberg_catalog_validator(PG_FUNCTION_ARGS)
199200
errmsg("invalid option \"%s\" for iceberg_catalog server", def->defname),
200201
errhint("Valid options are: rest_endpoint, rest_auth_type, "
201202
"oauth_endpoint, scope, enable_vended_credentials, "
202-
"location_prefix, catalog_name.")));
203+
"location_prefix, catalog_name, read_only.")));
203204
}
204205

205206
if (pg_strcasecmp(def->defname, "rest_auth_type") == 0)
@@ -214,7 +215,8 @@ iceberg_catalog_validator(PG_FUNCTION_ARGS)
214215
errmsg("invalid rest_auth_type option: \"%s\"", authType),
215216
errhint("Valid values are \"oauth2\" and \"horizon\".")));
216217
}
217-
else if (pg_strcasecmp(def->defname, "enable_vended_credentials") == 0)
218+
else if (pg_strcasecmp(def->defname, "enable_vended_credentials") == 0 ||
219+
pg_strcasecmp(def->defname, "read_only") == 0)
218220
{
219221
(void) defGetBoolean(def);
220222
}
@@ -349,8 +351,8 @@ RedactRestCatalogUserMappingSecrets(ProcessUtilityParams * processUtilityParams,
349351
* ValidateIcebergCatalogServerDDL validates DDL on iceberg_catalog servers:
350352
*
351353
* - CREATE SERVER: rejects reserved names ('postgres', 'object_store',
352-
* 'rest'), rejects TYPE 'postgres'/'object_store', and requires
353-
* TYPE 'rest'.
354+
* 'rest'), rejects TYPE 'postgres', and requires TYPE 'rest' or
355+
* 'object_store'.
354356
* - ALTER SERVER RENAME TO: rejects renaming to a reserved name.
355357
*
356358
* ALTER/DROP/OWNER on reserved names will fail naturally because no
@@ -381,21 +383,20 @@ ValidateIcebergCatalogServerDDL(ProcessUtilityParams * processUtilityParams,
381383
errhint("Choose a different server name.")));
382384

383385
if (stmt->servertype != NULL &&
384-
(pg_strcasecmp(stmt->servertype, POSTGRES_CATALOG_NAME) == 0 ||
385-
pg_strcasecmp(stmt->servertype, OBJECT_STORE_CATALOG_NAME) == 0))
386+
pg_strcasecmp(stmt->servertype, POSTGRES_CATALOG_NAME) == 0)
386387
ereport(ERROR,
387388
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
388389
errmsg("cannot create iceberg_catalog server with TYPE '%s'",
389390
stmt->servertype),
390-
errhint("Use the built-in \"%s\" or \"%s\" catalogs, "
391-
"or create a server of type 'rest'.",
392-
POSTGRES_CATALOG_NAME, OBJECT_STORE_CATALOG_NAME)));
391+
errhint("Use the built-in \"%s\" catalog.",
392+
POSTGRES_CATALOG_NAME)));
393393

394394
if (stmt->servertype == NULL ||
395-
pg_strcasecmp(stmt->servertype, REST_CATALOG_NAME) != 0)
395+
(pg_strcasecmp(stmt->servertype, REST_CATALOG_NAME) != 0 &&
396+
pg_strcasecmp(stmt->servertype, OBJECT_STORE_CATALOG_NAME) != 0))
396397
ereport(ERROR,
397398
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
398-
errmsg("iceberg_catalog server requires TYPE 'rest'")));
399+
errmsg("iceberg_catalog server requires TYPE 'rest' or 'object_store'")));
399400
}
400401
else if (IsA(parsetree, RenameStmt))
401402
{

pg_lake_table/src/ddl/create_table.c

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -865,12 +865,23 @@ ProcessCreateIcebergTableFromForeignTableStmt(ProcessUtilityParams * params)
865865

866866
if (hasObjectStoreCatalogOption)
867867
{
868-
const char *objectStoreCatalogLocationPrefix = GetObjectStoreDefaultLocationPrefix();
868+
char *catalogOptionValue =
869+
GetStringOption(createStmt->options, "catalog", false);
870+
ObjectStoreCatalogOptions *osOpts =
871+
GetObjectStoreCatalogOptionsFromCatalog(catalogOptionValue);
872+
873+
/*
874+
* Named servers may carry their own location_prefix; fall back to
875+
* the GUC-based prefix for the built-in 'object_store' catalog.
876+
*/
877+
if (osOpts->locationPrefix != NULL)
878+
defaultLocationPrefix = (char *) osOpts->locationPrefix;
869879

870-
if (objectStoreCatalogLocationPrefix == NULL)
880+
if (defaultLocationPrefix == NULL)
871881
{
872882
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
873-
errmsg(OBJECT_STORE_CATALOG_NAME " catalog iceberg tables require "
883+
errmsg("object_store catalog iceberg tables require "
884+
"a location_prefix server option or "
874885
"pg_lake_iceberg.object_store_catalog_location_prefix "
875886
"to be set")));
876887
}

0 commit comments

Comments
 (0)