Skip to content

Commit 6c5e4bd

Browse files
committed
Address review
- Add helper functions: - IsRestCatalogOwnedByExtension ('rest' name) - IsCatalogOwnedByExtension ('postgres', 'object_store', 'rest') - Rename IsServerBasedRestCatalog to IsRestCatalogOwnedByUsers - Use servername per server token cache - Add tests where we modify tables from different catalogs(rest_endpoints) in the same transaction. - Keep some comments I accidentally removed. Signed-off-by: sfc-gh-npuka <naisila.puka@snowflake.com>
1 parent 79ed6c6 commit 6c5e4bd

7 files changed

Lines changed: 190 additions & 31 deletions

File tree

pg_lake_engine/include/pg_lake/util/catalog_type.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,6 @@ extern PGDLLEXPORT IcebergCatalogType GetIcebergCatalogType(Oid relationId);
6161
extern PGDLLEXPORT bool HasRestCatalogTableOption(List *options);
6262
extern PGDLLEXPORT bool HasObjectStoreCatalogTableOption(List *options);
6363
extern PGDLLEXPORT bool HasReadOnlyOption(List *options);
64-
extern PGDLLEXPORT bool IsServerBasedRestCatalog(List *options);
64+
extern PGDLLEXPORT bool IsCatalogOwnedByExtension(const char *catalog);
65+
extern PGDLLEXPORT bool IsRestCatalogOwnedByExtension(const char *catalog);
66+
extern PGDLLEXPORT bool IsRestCatalogOwnedByUsers(List *options);

pg_lake_engine/src/utils/catalog_type.c

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,10 @@ HasRestCatalogTableOption(List *options)
8080
if (catalog == NULL)
8181
return false;
8282

83-
if (pg_strncasecmp(catalog, REST_CATALOG_NAME, strlen(catalog)) == 0)
83+
if (IsRestCatalogOwnedByExtension(catalog))
8484
return true;
8585

86-
return IsServerBasedRestCatalog(options);
86+
return IsRestCatalogOwnedByUsers(options);
8787
}
8888

8989

@@ -114,23 +114,44 @@ HasReadOnlyOption(List *options)
114114

115115

116116
/*
117-
* IsServerBasedRestCatalog returns true if the catalog option refers to a
118-
* ForeignServer created with the iceberg_catalog FDW whose TYPE is 'rest'.
119-
* Returns false if the catalog value is a known literal ('rest',
117+
* IsRestCatalogOwnedByExtension returns true if the catalog name matches
118+
* the extension-owned 'rest' catalog literal.
119+
*/
120+
bool
121+
IsRestCatalogOwnedByExtension(const char *catalog)
122+
{
123+
return pg_strncasecmp(catalog, REST_CATALOG_NAME, strlen(REST_CATALOG_NAME)) == 0;
124+
}
125+
126+
127+
/*
128+
* IsCatalogOwnedByExtension returns true if the catalog name is one of the
129+
* extension-owned literals: 'rest', 'object_store', or 'postgres'.
130+
*/
131+
bool
132+
IsCatalogOwnedByExtension(const char *catalog)
133+
{
134+
return IsRestCatalogOwnedByExtension(catalog) ||
135+
pg_strncasecmp(catalog, OBJECT_STORE_CATALOG_NAME, strlen(OBJECT_STORE_CATALOG_NAME)) == 0 ||
136+
pg_strncasecmp(catalog, POSTGRES_CATALOG_NAME, strlen(POSTGRES_CATALOG_NAME)) == 0;
137+
}
138+
139+
140+
/*
141+
* IsRestCatalogOwnedByUsers returns true if the catalog option refers to a
142+
* ForeignServer created by the user with the iceberg_catalog FDW whose TYPE is 'rest'.
143+
* Returns false if the catalog is owned by the extension ('rest',
120144
* 'object_store', 'postgres') or if no matching server is found.
121145
*/
122146
bool
123-
IsServerBasedRestCatalog(List *options)
147+
IsRestCatalogOwnedByUsers(List *options)
124148
{
125149
char *catalog = GetStringOption(options, "catalog", false);
126150

127151
if (catalog == NULL)
128152
return false;
129153

130-
/* Skip known literal catalog names */
131-
if (pg_strncasecmp(catalog, REST_CATALOG_NAME, strlen(REST_CATALOG_NAME)) == 0 ||
132-
pg_strncasecmp(catalog, OBJECT_STORE_CATALOG_NAME, strlen(OBJECT_STORE_CATALOG_NAME)) == 0 ||
133-
pg_strncasecmp(catalog, POSTGRES_CATALOG_NAME, strlen(POSTGRES_CATALOG_NAME)) == 0)
154+
if (IsCatalogOwnedByExtension(catalog))
134155
return false;
135156

136157
/* Try to look up a server with this name */

pg_lake_iceberg/src/rest_catalog/rest_catalog.c

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ int RestCatalogAuthType = REST_CATALOG_AUTH_TYPE_DEFAULT;
6060
bool RestCatalogEnableVendedCredentials = true;
6161

6262
/*
63-
* Per-server token cache. Keyed by server name (for server-based catalogs)
64-
* or "GUC" (for GUC-based backward-compatible catalog='rest').
63+
* Per-server token cache. Keyed by server name.
6564
*/
6665
#define TOKEN_CACHE_KEY_LEN NAMEDATALEN
6766

@@ -262,9 +261,7 @@ ProtectExtensionCatalogServersHandler(ProcessUtilityParams *processUtilityParams
262261
if (!IsIcebergCatalogServer(serverName))
263262
continue;
264263

265-
if (pg_strcasecmp(serverName, POSTGRES_CATALOG_NAME) == 0 ||
266-
pg_strcasecmp(serverName, OBJECT_STORE_CATALOG_NAME) == 0 ||
267-
pg_strcasecmp(serverName, REST_CATALOG_NAME) == 0)
264+
if (IsCatalogOwnedByExtension(serverName))
268265
ereport(ERROR,
269266
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
270267
errmsg("cannot drop the extension-owned \"%s\" catalog server",
@@ -283,9 +280,7 @@ ProtectExtensionCatalogServersHandler(ProcessUtilityParams *processUtilityParams
283280
if (!IsIcebergCatalogServer(serverName))
284281
return false;
285282

286-
if (pg_strcasecmp(serverName, POSTGRES_CATALOG_NAME) == 0 ||
287-
pg_strcasecmp(serverName, OBJECT_STORE_CATALOG_NAME) == 0 ||
288-
pg_strcasecmp(serverName, REST_CATALOG_NAME) == 0)
283+
if (IsCatalogOwnedByExtension(serverName))
289284
ereport(ERROR,
290285
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
291286
errmsg("cannot rename the extension-owned \"%s\" catalog server",
@@ -306,7 +301,7 @@ GetRestCatalogConnectionFromGUCs(void)
306301
{
307302
RestCatalogConnectionInfo *conn = palloc0(sizeof(RestCatalogConnectionInfo));
308303

309-
conn->serverName = NULL;
304+
conn->serverName = REST_CATALOG_NAME;
310305
conn->host = RestCatalogHost;
311306
conn->oauthHostPath = RestCatalogOauthHostPath;
312307
conn->clientId = RestCatalogClientId;
@@ -404,7 +399,7 @@ GetRestCatalogConnectionForRelation(Oid relationId)
404399
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
405400
errmsg("catalog option is not set for relation %u", relationId)));
406401

407-
if (pg_strncasecmp(catalog, REST_CATALOG_NAME, strlen(catalog)) == 0)
402+
if (IsRestCatalogOwnedByExtension(catalog))
408403
return GetRestCatalogConnectionFromGUCs();
409404

410405
return GetRestCatalogConnectionFromServer(catalog);
@@ -874,15 +869,13 @@ ReportHTTPError(HttpResult httpResult, int level)
874869

875870

876871
/*
877-
* Build a cache key for the per-server token cache. Uses server name for
878-
* server-based catalogs, or "GUC" for GUC-based backward-compatible mode.
872+
* Build a cache key for the per-server token cache.
879873
*/
880874
static void
881875
BuildTokenCacheKey(char *key, const RestCatalogConnectionInfo *conn)
882876
{
883-
strlcpy(key,
884-
conn->serverName ? conn->serverName : "GUC",
885-
TOKEN_CACHE_KEY_LEN);
877+
Assert(conn->serverName != NULL);
878+
strlcpy(key, conn->serverName, TOKEN_CACHE_KEY_LEN);
886879
}
887880

888881

pg_lake_table/src/ddl/create_table.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,7 @@ ProcessCreateIcebergTableFromForeignTableStmt(ProcessUtilityParams * params)
722722
char *catalogOptionValue = GetStringOption(createStmt->options, "catalog", false);
723723
RestCatalogConnectionInfo *conn;
724724

725-
if (pg_strncasecmp(catalogOptionValue, REST_CATALOG_NAME, strlen(catalogOptionValue)) == 0)
725+
if (IsRestCatalogOwnedByExtension(catalogOptionValue))
726726
conn = GetRestCatalogConnectionFromGUCs();
727727
else
728728
conn = GetRestCatalogConnectionFromServer(catalogOptionValue);
@@ -938,7 +938,7 @@ ProcessCreateIcebergTableFromForeignTableStmt(ProcessUtilityParams * params)
938938
char *catalogOptionValue = GetStringOption(createStmt->options, "catalog", false);
939939
RestCatalogConnectionInfo *conn;
940940

941-
if (pg_strncasecmp(catalogOptionValue, REST_CATALOG_NAME, strlen(catalogOptionValue)) == 0)
941+
if (IsRestCatalogOwnedByExtension(catalogOptionValue))
942942
conn = GetRestCatalogConnectionFromGUCs();
943943
else
944944
conn = GetRestCatalogConnectionFromServer(catalogOptionValue);

pg_lake_table/src/fdw/option.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,7 @@ pg_lake_iceberg_validator(PG_FUNCTION_ARGS)
773773
* Check if the catalog value refers to an iceberg_catalog
774774
* server. If so, treat it as a REST catalog.
775775
*/
776-
if (IsServerBasedRestCatalog(options_list))
776+
if (IsRestCatalogOwnedByUsers(options_list))
777777
icebergCatalogType = REST_CATALOG_READ_ONLY;
778778
else
779779
ereport(ERROR,

pg_lake_table/src/transaction/track_iceberg_metadata_changes.c

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,17 +329,31 @@ PostAllRestCatalogRequests(void)
329329
{
330330
if (!requestPerTable->isValid)
331331
{
332+
/*
333+
* Might only happen if an OOM happened during adding this request
334+
* to the hash table.
335+
*/
332336
elog(WARNING, "Skipping invalid REST catalog request for relation %u",
333337
requestPerTable->relationId);
334338
continue;
335339
}
336340

337341
if (requestPerTable->createTableRequest != NULL &&
338342
requestPerTable->dropTableRequest != NULL)
343+
{
344+
/*
345+
* table is created and dropped in the same transaction, nothing
346+
* post to do for this table to the REST catalog.
347+
*/
339348
continue;
340-
341-
if (requestPerTable->tableModifyRequests == NIL)
349+
}
350+
else if (requestPerTable->tableModifyRequests == NIL)
351+
{
352+
/*
353+
* no modifications to send for this table
354+
*/
342355
continue;
356+
}
343357

344358
tablesWithModifications = lappend(tablesWithModifications, requestPerTable);
345359
}

pg_lake_table/tests/pytests/test_modify_iceberg_rest_table.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,3 +565,132 @@ def get_rest_table_metadata_location(encoded_namespace, encoded_table_name, pg_c
565565
status, json_str, headers = res[0]
566566
metadata = json.loads(json_str)
567567
return metadata["metadata"]["location"]
568+
569+
570+
def test_multi_table_different_rest_catalog_hosts_in_single_transaction(
571+
installcheck,
572+
superuser_conn,
573+
pg_conn,
574+
s3,
575+
extension,
576+
with_default_location,
577+
polaris_session,
578+
create_http_helper_functions,
579+
):
580+
"""
581+
Tables from two REST catalog servers with different hosts are modified
582+
in the same transaction. PostAllRestCatalogRequests groups modifications
583+
by conn->host, so using 'localhost' vs '127.0.0.1' (same Polaris, different
584+
host strings) produces two separate batch commit requests.
585+
"""
586+
if installcheck:
587+
return
588+
589+
server_a = "multi_host_catalog_a"
590+
server_b = "multi_host_catalog_b"
591+
table_a = "multi_host_tx_a"
592+
table_b = "multi_host_tx_b"
593+
ns = TABLE_NAMESPACE + "_multi_host"
594+
595+
_create_polaris_catalog_server(superuser_conn, server_a, "localhost")
596+
_create_polaris_catalog_server(superuser_conn, server_b, "127.0.0.1")
597+
superuser_conn.commit()
598+
599+
run_command(f"CREATE SCHEMA IF NOT EXISTS {ns}", pg_conn)
600+
pg_conn.commit()
601+
602+
run_command(
603+
f"CREATE TABLE {ns}.{table_a} (id bigint, value text) USING iceberg WITH (catalog='{server_a}')",
604+
pg_conn,
605+
)
606+
pg_conn.commit()
607+
608+
run_command(
609+
f"CREATE TABLE {ns}.{table_b} (id bigint, value text) USING iceberg WITH (catalog='{server_b}')",
610+
pg_conn,
611+
)
612+
pg_conn.commit()
613+
614+
# Insert into both tables (different hosts) within a single transaction
615+
run_command(
616+
f"INSERT INTO {ns}.{table_a} SELECT i, 'a' FROM generate_series(1, 50) i",
617+
pg_conn,
618+
)
619+
run_command(
620+
f"INSERT INTO {ns}.{table_b} SELECT i, 'b' FROM generate_series(1, 30) i",
621+
pg_conn,
622+
)
623+
pg_conn.commit()
624+
625+
results_a = run_query(f"SELECT count(*) FROM {ns}.{table_a}", pg_conn)
626+
assert results_a[0][0] == 50
627+
628+
results_b = run_query(f"SELECT count(*) FROM {ns}.{table_b}", pg_conn)
629+
assert results_b[0][0] == 30
630+
631+
# Mixed DML across different hosts in a single transaction
632+
run_command(
633+
f"INSERT INTO {ns}.{table_a} SELECT i, 'a2' FROM generate_series(51, 70) i",
634+
pg_conn,
635+
)
636+
run_command(
637+
f"DELETE FROM {ns}.{table_b} WHERE id <= 10",
638+
pg_conn,
639+
)
640+
pg_conn.commit()
641+
642+
results_a = run_query(f"SELECT count(*) FROM {ns}.{table_a}", pg_conn)
643+
assert results_a[0][0] == 70
644+
645+
results_b = run_query(f"SELECT count(*) FROM {ns}.{table_b}", pg_conn)
646+
assert results_b[0][0] == 20
647+
648+
# UPDATE on both hosts in a single transaction
649+
run_command(
650+
f"UPDATE {ns}.{table_a} SET value = 'updated_a' WHERE id <= 5",
651+
pg_conn,
652+
)
653+
run_command(
654+
f"UPDATE {ns}.{table_b} SET value = 'updated_b' WHERE id > 20",
655+
pg_conn,
656+
)
657+
pg_conn.commit()
658+
659+
results_a = run_query(
660+
f"SELECT count(*) FROM {ns}.{table_a} WHERE value = 'updated_a'", pg_conn
661+
)
662+
assert results_a[0][0] == 5
663+
664+
results_b = run_query(
665+
f"SELECT count(*) FROM {ns}.{table_b} WHERE value = 'updated_b'", pg_conn
666+
)
667+
assert results_b[0][0] == 10
668+
669+
# Cleanup
670+
pg_conn.rollback()
671+
run_command(f"DROP SCHEMA {ns} CASCADE", pg_conn)
672+
pg_conn.commit()
673+
run_command(f"DROP SERVER {server_a}", superuser_conn)
674+
run_command(f"DROP SERVER {server_b}", superuser_conn)
675+
superuser_conn.commit()
676+
677+
678+
def _create_polaris_catalog_server(conn, server_name, hostname):
679+
"""Create an iceberg_catalog server pointing to the Polaris instance via the given hostname."""
680+
creds = json.loads(Path(server_params.POLARIS_PRINCIPAL_CREDS_FILE).read_text())
681+
client_id = creds["credentials"]["clientId"]
682+
client_secret = creds["credentials"]["clientSecret"]
683+
endpoint = f"http://{hostname}:{server_params.POLARIS_PORT}"
684+
685+
run_command(
686+
f"""
687+
CREATE SERVER {server_name} TYPE 'rest'
688+
FOREIGN DATA WRAPPER iceberg_catalog
689+
OPTIONS (
690+
rest_endpoint '{endpoint}',
691+
client_id '{client_id}',
692+
client_secret '{client_secret}'
693+
)
694+
""",
695+
conn,
696+
)

0 commit comments

Comments
 (0)