Skip to content

Commit 67bd64c

Browse files
More cleanup from Onder's latest review
Fail fast at statement time on cross-catalog DML. Rename ValidateXactRestCatalog to BindRelationToXactRestCatalog and call it from both FDW write paths (postgresBeginForeignModify and AddQueryResultToTable). The function now binds the transaction to the relation's REST catalog on the first DML and rejects any subsequent statement targeting a different catalog, so the second INSERT errors out before any Parquet is written. The pre-commit hook is kept as a belt-and-suspenders fallback for DDL paths that reach the tracker without going through the new guard. The regression test asserts the second INSERT (not COMMIT) raises. Move ValidateIcebergCatalogServerDDL registration from pg_lake_table to pg_lake_iceberg, where the handler is actually defined. Architecturally this puts ownership of catalog-server DDL validation in the extension that owns the catalog server abstraction. Fix latent dangling-pointer in GetValidCatalogOptionsHint. The static hint cache was allocated via initStringInfo in whatever short-lived context the validator happened to be running in (typically MessageContext), so the second invalid-option failure in the same backend returned freed memory to errhint. Allocate in TopMemoryContext so the cache survives transaction boundaries. The strengthened test_reject_unknown_server_option now issues two failing CREATE SERVER statements on the same connection and asserts the full option list appears in the hint on both attempts; the previous version would not have caught this bug. Also covers earlier review-driven hardening on the same branch: token cache invalidation on ALTER SERVER credentials, ALTER SERVER rest_endpoint blocking for all dependent REST iceberg tables (writable and read-only), and DROP OWNED BY / concurrent DROP SERVER dependency tests. Co-authored-by: Cursor <cursoragent@cursor.com> Signed-off-by: sfc-gh-npuka <naisila.puka@snowflake.com>
1 parent 59c9867 commit 67bd64c

9 files changed

Lines changed: 373 additions & 62 deletions

File tree

pg_lake_iceberg/src/init.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "pg_lake/avro/avro_reader.h"
2828
#include "pg_lake/avro/avro_writer.h"
2929
#include "pg_lake/copy/copy_format.h"
30+
#include "pg_lake/ddl/utility_hook.h"
3031
#include "pg_lake/iceberg/api.h"
3132
#include "pg_lake/pgduck/numeric.h"
3233
#include "pg_lake/iceberg/catalog.h"
@@ -332,6 +333,8 @@ _PG_init(void)
332333
NULL, NULL, NULL);
333334

334335
AvroInit();
336+
337+
RegisterUtilityStatementHandler(ValidateIcebergCatalogServerDDL, NULL);
335338
}
336339

337340

pg_lake_iceberg/src/rest_catalog/rest_catalog.c

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ FindCatalogOptionDesc(const char *name)
173173

174174
/*
175175
* Build the "Valid options are: ?" hint string. Cached after first call.
176+
*
177+
* The cache must outlive any individual transaction: this helper is only
178+
* called on validator error paths, and ereport(ERROR) immediately aborts
179+
* the current transaction, freeing whatever short-lived context the
180+
* validator was running under. Allocate the buffer in TopMemoryContext
181+
* so the static pointer remains valid for the lifetime of the backend.
176182
*/
177183
static const char *
178184
GetValidCatalogOptionsHint(void)
@@ -181,8 +187,10 @@ GetValidCatalogOptionsHint(void)
181187

182188
if (hint == NULL)
183189
{
190+
MemoryContext oldcxt;
184191
StringInfoData buf;
185192

193+
oldcxt = MemoryContextSwitchTo(TopMemoryContext);
186194
initStringInfo(&buf);
187195
appendStringInfoString(&buf, "Valid options are: ");
188196
for (int i = 0; i < NUM_CATALOG_OPTIONS; i++)
@@ -193,6 +201,7 @@ GetValidCatalogOptionsHint(void)
193201
}
194202
appendStringInfoChar(&buf, '.');
195203
hint = buf.data;
204+
MemoryContextSwitchTo(oldcxt);
196205
}
197206

198207
return hint;
@@ -334,13 +343,15 @@ iceberg_catalog_validator(PG_FUNCTION_ARGS)
334343

335344

336345
/*
337-
* ServerHasDependentWritableTable returns true if the given server
338-
* has at least one dependent writable iceberg table recorded in
339-
* pg_depend. Used to block ALTER SERVER changes that would silently
340-
* break existing tables.
346+
* ServerHasDependentRestIcebergTable returns true if the given server
347+
* has at least one dependent REST-backed iceberg table (read-only or
348+
* writable) recorded in pg_depend. Used to block ALTER SERVER changes
349+
* that would silently break existing tables, since both writable and
350+
* read-only REST tables make REST API calls at runtime against the
351+
* server's rest_endpoint.
341352
*/
342353
static bool
343-
ServerHasDependentWritableTable(Oid serverOid)
354+
ServerHasDependentRestIcebergTable(Oid serverOid)
344355
{
345356
Relation depRel;
346357
ScanKeyData key[2];
@@ -369,7 +380,9 @@ ServerHasDependentWritableTable(Oid serverOid)
369380
if (depForm->classid != RelationRelationId)
370381
continue;
371382

372-
if (GetIcebergCatalogType(depForm->objid) == REST_CATALOG_READ_WRITE)
383+
IcebergCatalogType type = GetIcebergCatalogType(depForm->objid);
384+
385+
if (type == REST_CATALOG_READ_WRITE || type == REST_CATALOG_READ_ONLY)
373386
{
374387
found = true;
375388
break;
@@ -488,9 +501,9 @@ ValidateIcebergCatalogServerDDL(ProcessUtilityParams * processUtilityParams,
488501
return false;
489502

490503
/*
491-
* Changing rest_endpoint on a server with dependent writable tables
492-
* would silently point them at a different REST catalog, breaking the
493-
* metadata chain.
504+
* Changing rest_endpoint on a server with dependent iceberg tables
505+
* (writable or read-only) would silently point them at a different
506+
* REST catalog, breaking the metadata chain.
494507
*/
495508
ListCell *lc;
496509

@@ -499,12 +512,12 @@ ValidateIcebergCatalogServerDDL(ProcessUtilityParams * processUtilityParams,
499512
DefElem *def = (DefElem *) lfirst(lc);
500513

501514
if (pg_strcasecmp(def->defname, "rest_endpoint") == 0 &&
502-
ServerHasDependentWritableTable(server->serverid))
515+
ServerHasDependentRestIcebergTable(server->serverid))
503516
{
504517
ereport(ERROR,
505518
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
506519
errmsg("cannot change \"rest_endpoint\" on server \"%s\" "
507-
"because it has dependent writable iceberg tables",
520+
"because it has dependent iceberg tables",
508521
stmt->servername),
509522
errhint("Drop the dependent tables first, or create a "
510523
"new server with the desired endpoint.")));
@@ -524,14 +537,16 @@ ValidateIcebergCatalogServerDDL(ProcessUtilityParams * processUtilityParams,
524537
static void
525538
ApplyGUCDefaults(RestCatalogOptions * opts)
526539
{
540+
char *defaultLocationPrefix = GetIcebergDefaultLocationPrefix();
541+
527542
opts->host = RestCatalogHost ? pstrdup(RestCatalogHost) : NULL;
528543
opts->oauthHostPath = RestCatalogOauthHostPath ? pstrdup(RestCatalogOauthHostPath) : NULL;
529544
opts->clientId = RestCatalogClientId ? pstrdup(RestCatalogClientId) : NULL;
530545
opts->clientSecret = RestCatalogClientSecret ? pstrdup(RestCatalogClientSecret) : NULL;
531546
opts->scope = RestCatalogScope ? pstrdup(RestCatalogScope) : NULL;
532547
opts->authType = RestCatalogAuthType;
533548
opts->enableVendedCredentials = RestCatalogEnableVendedCredentials;
534-
opts->locationPrefix = GetIcebergDefaultLocationPrefix();
549+
opts->locationPrefix = defaultLocationPrefix ? pstrdup(defaultLocationPrefix) : NULL;
535550
}
536551

537552

pg_lake_table/include/pg_lake/transaction/track_iceberg_metadata_changes.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,4 @@ extern PGDLLEXPORT void ResetRestCatalogRequests(void);
6161
extern PGDLLEXPORT HTAB *GetTrackedIcebergMetadataOperations(void);
6262
extern PGDLLEXPORT bool HasAnyTrackedIcebergMetadataChanges(void);
6363
extern PGDLLEXPORT bool IsIcebergTableCreatedInCurrentTransaction(Oid relation);
64-
extern PGDLLEXPORT void ValidateXactRestCatalog(Oid relationId);
64+
extern PGDLLEXPORT void BindRelationToXactRestCatalog(Oid relationId);

pg_lake_table/src/fdw/pg_lake_table.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2206,7 +2206,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
22062206
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
22072207
return;
22082208

2209-
ValidateXactRestCatalog(RelationGetRelid(resultRelInfo->ri_RelationDesc));
2209+
BindRelationToXactRestCatalog(RelationGetRelid(resultRelInfo->ri_RelationDesc));
22102210

22112211
/* Construct an execution state. */
22122212
fmstate = create_foreign_modify(resultRelInfo->ri_RelationDesc,

pg_lake_table/src/fdw/writable_table.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1095,7 +1095,7 @@ AddQueryResultToTable(Oid relationId, char *readQuery, TupleDesc queryTupleDesc,
10951095
{
10961096
Assert(queryTupleDesc != NULL && queryTupleDesc->natts > 0);
10971097

1098-
ValidateXactRestCatalog(relationId);
1098+
BindRelationToXactRestCatalog(relationId);
10991099

11001100
int64 rowsProcessed = 0;
11011101
ForeignTable *foreignTable = GetForeignTable(relationId);

pg_lake_table/src/init.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
#include "pg_lake/planner/insert_select.h"
4343
#include "pg_lake/planner/query_pushdown.h"
4444
#include "pg_lake/util/s3_file_utils.h"
45-
#include "pg_lake/rest_catalog/rest_catalog.h"
4645
#include "pg_lake/test/hide_lake_objects.h"
4746
#include "pg_lake/transaction/transaction_hooks.h"
4847
#include "pg_lake/transaction/track_iceberg_metadata_changes.h"
@@ -383,7 +382,6 @@ _PG_init(void)
383382

384383
MarkGUCPrefixReserved(PG_LAKE_TABLE);
385384

386-
RegisterUtilityStatementHandler(ValidateIcebergCatalogServerDDL, NULL);
387385
RegisterUtilityStatementHandler(ProcessVacuumPgLakeTable, NULL);
388386
RegisterUtilityStatementHandler(ProcessCreatePgLakeTable, NULL);
389387
RegisterUtilityStatementHandler(ProcessCreateAsSelectPgLakeTable, NULL);

pg_lake_table/src/transaction/track_iceberg_metadata_changes.c

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -609,40 +609,68 @@ InitRestCatalogRequestsHashIfNeeded(void)
609609

610610

611611
/*
612-
* ValidateXactRestCatalog is a fail-fast guard that prevents cross-catalog
613-
* DML within a single transaction. It resolves the relation's catalog
614-
* identifier and, if a different catalog was already locked in for this
615-
* transaction, errors out immediately — before any Parquet data is written
616-
* to S3.
612+
* BindRelationToXactRestCatalog binds the current transaction to the REST
613+
* catalog associated with `relationId`, failing fast if a *different* REST
614+
* catalog was already locked in for this transaction.
617615
*
618-
* No-ops for relations that are not REST-backed writable iceberg tables,
619-
* or when no catalog has been locked in yet (first DML in the xact).
616+
* Semantics:
617+
* - For relations that are not REST-backed writable iceberg tables: no-op.
618+
* - For the first REST-backed write of the transaction: pre-resolves the
619+
* full catalog options and stashes them in TopTransactionContext, so
620+
* subsequent calls within the same transaction can be checked without
621+
* touching pg_foreign_server again at XACT_EVENT_COMMIT.
622+
* - For any subsequent REST-backed write whose catalog differs from the
623+
* locked-in one: raises ERRCODE_FEATURE_NOT_SUPPORTED *before* any
624+
* Parquet data is written to S3.
625+
*
626+
* Called at the top of every DML entry point that can mutate REST-backed
627+
* iceberg tables: postgresBeginForeignModify() for row-by-row DML, and
628+
* AddQueryResultToTable() for the INSERT...SELECT and COPY..FROM pushdown
629+
* paths. DDL paths (CREATE TABLE / DROP TABLE) reach the same protection
630+
* indirectly via RecordRestCatalogRequestInTx(), which is invoked
631+
* synchronously at statement time from the utility hook.
620632
*/
621633
void
622-
ValidateXactRestCatalog(Oid relationId)
634+
BindRelationToXactRestCatalog(Oid relationId)
623635
{
624636
if (!IsPgLakeIcebergForeignTableById(relationId) ||
625637
GetIcebergCatalogType(relationId) != REST_CATALOG_READ_WRITE)
626638
return;
627639

628-
if (PgLakeXactRestCatalog == NULL ||
629-
PgLakeXactRestCatalog->catalogOpts == NULL)
630-
return;
640+
/*
641+
* Resolve the relation's catalog options up front. We need the full
642+
* resolved struct (host, credentials, ...), not just the user-facing
643+
* identifier, because XACT_EVENT_PRE_COMMIT reuses these fields when
644+
* issuing the REST API requests and is not allowed to do syscache lookups
645+
* by then.
646+
*/
647+
RestCatalogOptions *resolvedOpts = GetRestCatalogOptionsForRelation(relationId);
631648

632-
char *catalog = GetStringOption(GetForeignTable(relationId)->options,
633-
"catalog", false);
649+
InitRestCatalogRequestsHashIfNeeded();
634650

635-
if (catalog == NULL)
651+
if (PgLakeXactRestCatalog->catalogOpts == NULL)
652+
{
653+
PgLakeXactRestCatalog->catalogOpts =
654+
CopyRestCatalogOptions(TopTransactionContext, resolvedOpts);
636655
return;
656+
}
637657

638-
if (pg_strcasecmp(PgLakeXactRestCatalog->catalogOpts->catalog, catalog) != 0)
658+
/*
659+
* Both sides of the comparison are the canonical catalog name (lowercase
660+
* "rest" for the built-in catalog, server name as stored in
661+
* pg_foreign_server for user-defined ones). pg_strcasecmp matches the
662+
* casing rules PostgreSQL applies to identifier resolution.
663+
*/
664+
if (pg_strcasecmp(PgLakeXactRestCatalog->catalogOpts->catalog,
665+
resolvedOpts->catalog) != 0)
639666
ereport(ERROR,
640667
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
641668
errmsg("cannot modify tables from different REST catalogs "
642669
"in the same transaction"),
643670
errdetail("This transaction already targets catalog \"%s\", "
644671
"but the current statement targets \"%s\".",
645-
PgLakeXactRestCatalog->catalogOpts->catalog, catalog)));
672+
PgLakeXactRestCatalog->catalogOpts->catalog,
673+
resolvedOpts->catalog)));
646674
}
647675

648676

@@ -668,17 +696,26 @@ RecordRestCatalogRequestInTx(Oid relationId, RestCatalogOperationType operationT
668696
/* Resolve the options for this relation's REST catalog */
669697
RestCatalogOptions *resolvedOpts = GetRestCatalogOptionsForRelation(relationId);
670698

699+
/*
700+
* DDL paths (CREATE TABLE / DROP TABLE) call us directly from the
701+
* utility hook and may be the very first thing to touch a REST
702+
* catalog in this transaction, so this branch is still genuinely
703+
* reached. DML paths reach us only from XACT_EVENT_PRE_COMMIT, by
704+
* which time BindRelationToXactRestCatalog() has already populated
705+
* catalogOpts.
706+
*/
671707
if (PgLakeXactRestCatalog->catalogOpts == NULL)
672708
{
673709
PgLakeXactRestCatalog->catalogOpts =
674710
CopyRestCatalogOptions(TopTransactionContext, resolvedOpts);
675711
}
676712

677713
/*
678-
* Belt-and-suspenders check. All DML and DDL entry points already
679-
* call ValidateXactRestCatalog() at statement time, so in practice we
680-
* should never reach here with a mismatched catalog. Kept as a last
681-
* line of defense for any future code path that forgets to do so.
714+
* Belt-and-suspenders check. All DML and DDL entry points either
715+
* bind through BindRelationToXactRestCatalog() at statement time or
716+
* populate catalogOpts via the branch above, so in practice we never
717+
* reach here with a mismatched catalog. Kept as a last line of
718+
* defense for any future code path that forgets to do so.
682719
*/
683720
else if (pg_strcasecmp(PgLakeXactRestCatalog->catalogOpts->catalog, resolvedOpts->catalog) != 0)
684721
ereport(ERROR,

pg_lake_table/tests/pytests/test_iceberg_catalog_server.py

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -111,19 +111,43 @@ def test_create_server_horizon_auth(superuser_conn, extension):
111111

112112

113113
def test_reject_unknown_server_option(superuser_conn, extension):
114-
"""Unknown options should be rejected by the validator."""
115-
err = run_command(
116-
"""
117-
CREATE SERVER test_bad_opt TYPE 'rest'
118-
FOREIGN DATA WRAPPER iceberg_catalog
119-
OPTIONS (rest_endpoint 'http://localhost:8181', bogus_option 'x')
120-
""",
121-
superuser_conn,
122-
raise_error=False,
123-
)
124-
assert "invalid option" in str(err)
125-
assert "bogus_option" in str(err)
126-
superuser_conn.rollback()
114+
"""
115+
Unknown options should be rejected by the validator.
116+
117+
Issued twice on the same connection because the validator caches the
118+
hint string in a static; the second call must hit the cached path and
119+
must still produce a well-formed hint (regression guard against the
120+
hint being palloc'd in a per-statement memory context).
121+
"""
122+
EXPECTED_OPTIONS = [
123+
"rest_endpoint",
124+
"rest_auth_type",
125+
"oauth_endpoint",
126+
"scope",
127+
"enable_vended_credentials",
128+
"location_prefix",
129+
"catalog_name",
130+
"client_id",
131+
"client_secret",
132+
]
133+
134+
for typo in ("bogus_option", "another_typo"):
135+
err = run_command(
136+
f"""
137+
CREATE SERVER test_bad_opt_{typo} TYPE 'rest'
138+
FOREIGN DATA WRAPPER iceberg_catalog
139+
OPTIONS (rest_endpoint 'http://localhost:8181', {typo} 'x')
140+
""",
141+
superuser_conn,
142+
raise_error=False,
143+
)
144+
msg = str(err)
145+
assert "invalid option" in msg
146+
assert typo in msg
147+
assert "Valid options are:" in msg
148+
for opt in EXPECTED_OPTIONS:
149+
assert opt in msg, f"hint missing {opt!r} on attempt {typo!r}: {msg}"
150+
superuser_conn.rollback()
127151

128152

129153
def test_reject_invalid_auth_type(superuser_conn, extension):

0 commit comments

Comments
 (0)