-
Notifications
You must be signed in to change notification settings - Fork 99
New Catalog Configuration via CREATE SERVER #242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f016e2c
d80b574
354c8e6
e7e5f58
573cc93
4fd8ce5
f2b393d
3b5228a
b3c0ef9
0171e02
32d64c7
c2d7f1a
6edcfe0
911135b
e83fd0b
a75268d
a8236b3
ad06a85
6374ee0
46daf15
ef097de
0e65cd7
87dbbc1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,12 +18,13 @@ | |
| #pragma once | ||
|
|
||
| #include "postgres.h" | ||
| #include "pg_lake/ddl/utility_hook.h" | ||
| #include "pg_lake/http/http_client.h" | ||
| #include "pg_lake/util/rel_utils.h" | ||
| #include "pg_lake/parquet/field.h" | ||
| #include "pg_lake/iceberg/api/snapshot.h" | ||
|
|
||
| #define REST_CATALOG_AUTH_TYPE_DEFAULT (0) | ||
| #define REST_CATALOG_AUTH_TYPE_OAUTH2 (0) | ||
| #define REST_CATALOG_AUTH_TYPE_HORIZON (1) | ||
|
|
||
| extern PGDLLEXPORT char *RestCatalogHost; | ||
|
|
@@ -34,6 +35,37 @@ extern char *RestCatalogScope; | |
| extern int RestCatalogAuthType; | ||
| extern bool RestCatalogEnableVendedCredentials; | ||
|
|
||
| /* | ||
| * Resolved REST catalog connection options. All REST catalogs -- | ||
| * built-in ('rest') and user-created (CREATE SERVER ... FOREIGN DATA | ||
| * WRAPPER iceberg_catalog) -- are backed by a real pg_foreign_server | ||
| * row; ApplyGUCDefaults populates the defaults, ApplyServerOptionOverrides | ||
| * layers on any per-server options. | ||
| * | ||
| * The canonical identity of a catalog is `serverOid` (the OID of the | ||
| * iceberg_catalog server row). Use it for in-memory equality, token | ||
| * cache keys, and syscache-driven invalidation. `catalog` stores the | ||
| * user-visible short name (e.g. 'rest', 'my_polaris') purely for error | ||
| * messages. | ||
| */ | ||
| typedef struct RestCatalogOptions | ||
| { | ||
| Oid serverOid; /* iceberg_catalog server OID; canonical | ||
| * identity, never InvalidOid for resolved | ||
| * opts */ | ||
| char *catalog; /* short user-facing name; used in error | ||
| * messages, never for equality */ | ||
| char *host; | ||
| char *oauthHostPath; | ||
| char *clientId; | ||
| char *clientSecret; | ||
| char *scope; | ||
| char *locationPrefix; | ||
| char *catalogName; /* REST API catalog prefix; defaults to dbname */ | ||
| int authType; | ||
| bool enableVendedCredentials; | ||
| } RestCatalogOptions; | ||
|
|
||
| #define REST_CATALOG_AUTH_TOKEN_PATH "%s/api/catalog/v1/oauth/tokens" | ||
|
|
||
| #define REST_CATALOG_NAMESPACE_NAME "%s/api/catalog/v1/%s/namespaces/%s" | ||
|
|
@@ -77,24 +109,32 @@ typedef struct RestCatalogRequest | |
| #define REST_CATALOG_AUTH_TOKEN_PATH "%s/api/catalog/v1/oauth/tokens" | ||
| #define GET_REST_CATALOG_METADATA_LOCATION "%s/api/catalog/v1/%s/namespaces/%s/tables/%s" | ||
|
|
||
| extern PGDLLEXPORT void RegisterNamespaceToRestCatalog(const char *catalogName, const char *namespaceName); | ||
| /* Catalog options resolution */ | ||
| extern PGDLLEXPORT RestCatalogOptions * ResolveRestCatalogOptions(const char *catalog); | ||
| extern PGDLLEXPORT RestCatalogOptions * GetRestCatalogOptionsForRelation(Oid relationId); | ||
| extern PGDLLEXPORT RestCatalogOptions * CopyRestCatalogOptions(MemoryContext dst, const RestCatalogOptions * src); | ||
|
|
||
| extern PGDLLEXPORT void RegisterNamespaceToRestCatalog(RestCatalogOptions * opts, const char *catalogName, const char *namespaceName); | ||
| extern PGDLLEXPORT void StartStageRestCatalogIcebergTableCreate(Oid relationId); | ||
| extern PGDLLEXPORT char *FinishStageRestCatalogIcebergTableCreateRestRequest(Oid relationId, DataFileSchema * dataFileSchema, List *partitionSpecs); | ||
| extern PGDLLEXPORT void ErrorIfRestNamespaceDoesNotExist(const char *catalogName, const char *namespaceName); | ||
| extern PGDLLEXPORT void ErrorIfRestNamespaceDoesNotExist(RestCatalogOptions * opts, const char *catalogName, const char *namespaceName); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isnt options enough as parameter? we might add namespace into the struct as well.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| extern PGDLLEXPORT char *GetRestCatalogName(Oid relationId); | ||
| extern PGDLLEXPORT char *GetRestCatalogNamespace(Oid relationId); | ||
| extern PGDLLEXPORT char *GetRestCatalogTableName(Oid relationId); | ||
| extern PGDLLEXPORT bool IsReadOnlyRestCatalogIcebergTable(Oid relationId); | ||
| extern PGDLLEXPORT char *GetMetadataLocationFromRestCatalog(const char *restCatalogName, const char *namespaceName, | ||
| extern PGDLLEXPORT char *GetMetadataLocationFromRestCatalog(RestCatalogOptions * opts, const char *restCatalogName, const char *namespaceName, | ||
| const char *relationName); | ||
| extern PGDLLEXPORT char *GetMetadataLocationForRestCatalogForIcebergTable(Oid relationId); | ||
| extern PGDLLEXPORT void ReportHTTPError(HttpResult httpResult, int level); | ||
| extern PGDLLEXPORT List *PostHeadersWithAuth(void); | ||
| extern PGDLLEXPORT List *DeleteHeadersWithAuth(void); | ||
| extern PGDLLEXPORT HttpResult SendRequestToRestCatalog(HttpMethod method, const char *url, const char *body, List *headers); | ||
| extern PGDLLEXPORT List *PostHeadersWithAuth(RestCatalogOptions * opts); | ||
| extern PGDLLEXPORT List *DeleteHeadersWithAuth(RestCatalogOptions * opts); | ||
| extern PGDLLEXPORT HttpResult SendRequestToRestCatalog(RestCatalogOptions * opts, HttpMethod method, const char *url, const char *body, List *headers); | ||
| extern PGDLLEXPORT RestCatalogRequest * GetAddSnapshotCatalogRequest(IcebergSnapshot * newSnapshot, Oid relationId); | ||
| extern PGDLLEXPORT RestCatalogRequest * GetAddSchemaCatalogRequest(Oid relationId, DataFileSchema * dataFileSchema); | ||
| extern PGDLLEXPORT RestCatalogRequest * GetSetCurrentSchemaCatalogRequest(Oid relationId, int32_t schemaId); | ||
| extern PGDLLEXPORT RestCatalogRequest * GetAddPartitionCatalogRequest(Oid relationId, List *partitionSpec); | ||
| extern PGDLLEXPORT RestCatalogRequest * GetSetPartitionDefaultIdCatalogRequest(Oid relationId, int specId); | ||
| extern PGDLLEXPORT RestCatalogRequest * GetRemoveSnapshotCatalogRequest(List *removedSnapshotIds, Oid relationId); | ||
|
|
||
| /* ProcessUtility handler for iceberg_catalog server DDL validation */ | ||
| extern PGDLLEXPORT bool ValidateIcebergCatalogServerDDL(ProcessUtilityParams * processUtilityParams, void *arg); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the function is in pg_lake_iceberg, but we register in pg_lake_table, why is so? Can't we register in pg_lake_iceberg? I think the functionality will be equivalent, but this might confuse future readers.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sfc-gh-abozkurt could you also share your input here
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On second thought, I'll move the registration to |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,82 @@ | ||
| -- Upgrade script for pg_lake_iceberg from 3.3 to 3.4 | ||
|
|
||
| /* | ||
| * iceberg_catalog foreign data wrapper: allows defining named catalog | ||
| * configurations via CREATE SERVER so that users are not limited to a | ||
| * single global REST catalog configured through GUC settings. | ||
| * | ||
| * Example: | ||
| * CREATE SERVER my_polaris TYPE 'rest' | ||
| * FOREIGN DATA WRAPPER iceberg_catalog | ||
| * OPTIONS (rest_endpoint 'http://polaris:8181', | ||
| * rest_auth_type 'default', | ||
| * client_id '...', | ||
| * client_secret '...'); | ||
| * | ||
| * CREATE TABLE t (a int) USING iceberg WITH (catalog = 'my_polaris'); | ||
| */ | ||
| CREATE FUNCTION lake_iceberg.iceberg_catalog_validator(text[], oid) | ||
| RETURNS void | ||
| AS 'MODULE_PATHNAME' | ||
| LANGUAGE C STRICT; | ||
|
|
||
| CREATE FOREIGN DATA WRAPPER iceberg_catalog | ||
| NO HANDLER | ||
| VALIDATOR lake_iceberg.iceberg_catalog_validator; | ||
|
|
||
| GRANT USAGE ON FOREIGN DATA WRAPPER iceberg_catalog TO lake_write; | ||
|
|
||
| /* | ||
| * Built-in catalog servers. | ||
| * | ||
| * These three servers are pre-created as structural anchors for the | ||
| * pg_depend dependency edges that iceberg tables record against their | ||
| * catalog server. They are extension-owned and immutable: ALTER, DROP, | ||
| * RENAME, and OWNER changes on them are all blocked. Configuration | ||
| * for the built-in catalogs lives in GUCs, not in server options. | ||
| * | ||
| * Users keep typing the short names ('postgres', 'object_store', 'rest') | ||
| * as the catalog= option value on CREATE TABLE; ResolveCatalogServerName | ||
| * maps short -> long at server lookup time. The long names are prefixed | ||
| * so they cannot collide with names users may already have in their | ||
| * databases (e.g. a postgres_fdw server literally named 'postgres'). | ||
| * | ||
| * Pre-flight: error early with a clear hint if any of the long names is | ||
| * already in use. This prevents a confusing "server already exists" | ||
| * mid-upgrade. | ||
| */ | ||
| DO $do$ | ||
| DECLARE | ||
| conflicting text; | ||
| BEGIN | ||
| SELECT srvname INTO conflicting | ||
| FROM pg_foreign_server | ||
| WHERE srvname IN ('pg_lake_postgres_catalog', | ||
| 'pg_lake_object_store_catalog', | ||
| 'pg_lake_rest_catalog') | ||
| LIMIT 1; | ||
|
|
||
| IF conflicting IS NOT NULL THEN | ||
| RAISE EXCEPTION | ||
| 'pg_lake_iceberg upgrade conflicts with existing foreign server %', conflicting | ||
| USING HINT = 'Drop or rename the server and re-run ALTER EXTENSION pg_lake_iceberg UPDATE. ' | ||
| 'pg_lake_iceberg reserves the names pg_lake_postgres_catalog, ' | ||
| 'pg_lake_object_store_catalog, and pg_lake_rest_catalog for internal use.'; | ||
| END IF; | ||
| END $do$; | ||
|
|
||
| CREATE SERVER pg_lake_postgres_catalog | ||
| TYPE 'postgres' | ||
| FOREIGN DATA WRAPPER iceberg_catalog; | ||
|
|
||
| CREATE SERVER pg_lake_object_store_catalog | ||
| TYPE 'object_store' | ||
| FOREIGN DATA WRAPPER iceberg_catalog; | ||
|
|
||
| CREATE SERVER pg_lake_rest_catalog | ||
| TYPE 'rest' | ||
| FOREIGN DATA WRAPPER iceberg_catalog; | ||
|
|
||
| GRANT USAGE ON FOREIGN SERVER pg_lake_postgres_catalog TO lake_write; | ||
| GRANT USAGE ON FOREIGN SERVER pg_lake_object_store_catalog TO lake_write; | ||
| GRANT USAGE ON FOREIGN SERVER pg_lake_rest_catalog TO lake_write; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be strcmp? I think we should allow below. Shouldnt we do case sensitive comparison in general?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not allowing since options are case insensitive, i.e. if you do
CREATE TABLE a USING iceberg WITH (catalog = 'Postgres')it's automatically gonna resolve to "postgres" catalog.