Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 171 additions & 23 deletions pg_lake_iceberg/src/iceberg/external_metadata_modification.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
#include "catalog/pg_type.h"
#include "catalog/namespace.h"
#include "commands/dbcommands.h"
#include "commands/extension.h"
#include "commands/trigger.h"
#include "executor/spi.h"
#include "nodes/makefuncs.h"
#include "parser/parse_func.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/rel.h"
Expand All @@ -33,9 +36,14 @@

#include "pg_lake/iceberg/catalog.h"
#include "pg_lake/extensions/pg_lake_iceberg.h"
#include "pg_lake/extensions/pg_lake_table.h"
#include "pg_extension_base/spi_helpers.h"

PG_FUNCTION_INFO_V1(external_catalog_modification);

static void HandleInternalCatalogUpdate(char *namespaceName, char *tableName,
char *metadataLocation, char *prevMetadataLocation);


/*
* external_catalog_modification is an INSTEAD OF trigger that currently
Expand Down Expand Up @@ -106,38 +114,178 @@ external_catalog_modification(PG_FUNCTION_ARGS)
prevMetadataLocationIsNull ? NULL : TextDatumGetCString(prevMetadataLocationDatum);

char *databaseName = get_database_name(MyDatabaseId);
bool isInternalCatalog = (strcmp(catalogName, databaseName) == 0);

if (strcmp(catalogName, databaseName) == 0)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("writes to the %s catalog are currently only supported via pg_lake_iceberg tables",
databaseName)));
}

/*
* Postgres only allows INSTEAD OF triggers on views. We are using this
* trigger to prevent external tools from modifying the iceberg catalog.
* But given that we use INSTEAD OF trigger on a view, we still need to
* handle the INSERT, UPDATE, DELETE operations on the base table.
*/
/* For UPDATE, check if catalog_name is being changed */
if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
{
UpdateExternalCatalogMetadataLocation(catalogName, namespaceName, tableName, metadataLocation, prevMetadataLocation);
}
else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
{
InsertExternalIcebergCatalogTable(catalogName, namespaceName, tableName, metadataLocation);
Datum oldCatalogNameDatum = heap_getattr(trigdata->tg_trigtuple, 1,
trigdata->tg_relation->rd_att, &isnull);
char *oldCatalogName = TextDatumGetCString(oldCatalogNameDatum);
bool wasInternalCatalog = (strcmp(oldCatalogName, databaseName) == 0);

if (isInternalCatalog != wasInternalCatalog)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("modifying the internal catalog is currently only supported via pg_lake_iceberg tables")));
}
}
else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))

if (isInternalCatalog)
{
DeleteExternalIcebergCatalogTable(catalogName, namespaceName, tableName);
/*
* For the current database catalog, only UPDATE is supported. This
* allows external Iceberg clients to write new metadata and then
* update the metadata_location, triggering a sync of the internal
* pg_lake catalog state.
*/
if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
{
HandleInternalCatalogUpdate(namespaceName, tableName,
metadataLocation, prevMetadataLocation);
}
else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("modifying the internal catalog is currently only supported via pg_lake_iceberg tables")));
}
else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("modifying the internal catalog is currently only supported via pg_lake_iceberg tables")));
}
else
{
pg_unreachable();
}
}
else
{
/* no other command is supported on view triggers */
pg_unreachable();
/*
* Postgres only allows INSTEAD OF triggers on views. We are using
* this trigger to prevent external tools from modifying the iceberg
* catalog. But given that we use INSTEAD OF trigger on a view, we
* still need to handle the INSERT, UPDATE, DELETE operations on the
* base table.
*/
if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
{
UpdateExternalCatalogMetadataLocation(catalogName, namespaceName, tableName, metadataLocation, prevMetadataLocation);
}
else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
{
InsertExternalIcebergCatalogTable(catalogName, namespaceName, tableName, metadataLocation);
}
else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
{
DeleteExternalIcebergCatalogTable(catalogName, namespaceName, tableName);
}
else
{
/* no other command is supported on view triggers */
pg_unreachable();
}
}

return PointerGetDatum(rettuple);
}


/*
* HandleInternalCatalogUpdate handles UPDATE to the iceberg_tables view
* for tables that belong to the current database catalog (i.e., internal
* pg_lake iceberg tables).
*
* This allows external Iceberg clients (Spark, PyIceberg) to:
* 1. Write new data/metadata files to object storage
* 2. UPDATE iceberg_tables SET metadata_location = <new>, previous_metadata_location = <old>
*
* The function validates optimistic concurrency via previous_metadata_location,
* updates the internal catalog, and triggers a sync of the pg_lake catalog
* state (data files, schema, partition specs) from the new metadata.
*/
static void
HandleInternalCatalogUpdate(char *namespaceName, char *tableName,
char *metadataLocation, char *prevMetadataLocation)
{
if (metadataLocation == NULL)
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("metadata_location cannot be NULL")));

if (prevMetadataLocation == NULL)
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("previous_metadata_location is required for optimistic concurrency control")));

/* resolve namespace + table name to a relation OID */
bool missingOk = false;
Oid namespaceOid = get_namespace_oid(namespaceName, missingOk);
Oid relationId = get_relname_relid(tableName, namespaceOid);

if (!OidIsValid(relationId))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("table \"%s.%s\" does not exist",
namespaceName, tableName)));

/*
* Lock the row and get the current metadata_location for optimistic
* concurrency validation.
*/
bool forUpdate = true;
char *currentMetadataLocation =
GetIcebergMetadataLocation(relationId, forUpdate);

if (currentMetadataLocation == NULL ||
strcmp(currentMetadataLocation, prevMetadataLocation) != 0)
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("metadata_location has been modified concurrently"),
errdetail("Expected previous_metadata_location \"%s\" but found \"%s\".",
prevMetadataLocation,
currentMetadataLocation ? currentMetadataLocation : "(null)")));

/* update the internal catalog with the new metadata location */
UpdateInternalCatalogMetadataLocation(relationId, metadataLocation,
prevMetadataLocation);

/*
* If pg_lake_table is installed, trigger a sync of the internal catalog
* (data files, schema, partition specs) from the new metadata.
*/
Oid pgLakeTableExtOid = get_extension_oid(PG_LAKE_TABLE, true);

if (OidIsValid(pgLakeTableExtOid))
{
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;

GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(ExtensionOwnerId(PgLakeIceberg),
SECURITY_LOCAL_USERID_CHANGE);

/*
* Use OidFunctionCall1 to call the sync function. This is safe
* because OidFunctionCall doesn't use SPI - it calls the function
* directly via the function manager. The sync function itself uses
* SPI internally, but that's fine since we're not in an SPI context
* here.
*/
Oid argTypes[1] = {REGCLASSOID};
Oid syncFuncOid = LookupFuncName(
list_make2(makeString("lake_table"),
makeString("sync_iceberg_metadata_from_external_write")),
1, argTypes, true);

if (OidIsValid(syncFuncOid))
{
OidFunctionCall1(syncFuncOid, ObjectIdGetDatum(relationId));
}

SetUserIdAndSecContext(savedUserId, savedSecurityContext);
}
}
8 changes: 8 additions & 0 deletions pg_lake_table/include/pg_lake/ddl/alter_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,11 @@ typedef void (*PgLakeAlterTableRenameColumnHookType) (Oid relationOid, RenameStm

extern PGDLLEXPORT PgLakeAlterTableHookType PgLakeAlterTableHook;
extern PGDLLEXPORT PgLakeAlterTableRenameColumnHookType PgLakeAlterTableRenameColumnHook;

/*
* When true, ProcessAlterTable skips Iceberg DDL processing (field_id
* registration and metadata tracking). Used by
* sync_iceberg_metadata_from_external_write to add/drop columns without
* triggering a new metadata write.
*/
extern bool SkipIcebergDDLProcessing;
2 changes: 2 additions & 0 deletions pg_lake_table/include/pg_lake/fdw/data_files_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ bool PartitionFieldsCatalogExists(void);
/* functions to write to files catalog */
extern PGDLLEXPORT void ApplyDataFileCatalogChanges(Oid relationId, List *metadataOperations);
int64 GenerateDataFileId(void);
int64 AddDataFileToTable(Oid relationId, const char *path, int64 rowCount,
int64 fileSize, DataFileContent content, int64 rowIdStart);

/* when enabling row_ids, we need to explicit update first_row_id */
void UpdateDataFileFirstRowId(Oid relationId, int64 fileId, int64 firstRowId);
21 changes: 21 additions & 0 deletions pg_lake_table/include/pg_lake/sync/sync_external_metadata.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2025 Snowflake Inc.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "postgres.h"
#include "fmgr.h"
7 changes: 7 additions & 0 deletions pg_lake_table/pg_lake_table--3.3--3.4.sql
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
-- Upgrade script for pg_lake_table from 3.3 to 3.4

-- Sync function for external writes to Iceberg tables.
-- Called by the iceberg_tables INSTEAD OF trigger when an external client
-- updates metadata_location for a table in the current database catalog.
CREATE FUNCTION lake_table.sync_iceberg_metadata_from_external_write(regclass)
RETURNS void AS 'MODULE_PATHNAME', 'sync_iceberg_metadata_from_external_write'
LANGUAGE C STRICT;
17 changes: 17 additions & 0 deletions pg_lake_table/src/ddl/alter_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ static bool DisallowedForWritableRestSetSchema(Node *arg, Oid relationId);

PgLakeAlterTableHookType PgLakeAlterTableHook = NULL;
PgLakeAlterTableRenameColumnHookType PgLakeAlterTableRenameColumnHook = NULL;
bool SkipIcebergDDLProcessing = false;

/*
* Below is the support matrix for ALTER TABLE commands, which pg_lake
Expand Down Expand Up @@ -305,6 +306,14 @@ ProcessAlterTable(ProcessUtilityParams * processUtilityParams, void *arg)
*/
PgLakeCommonParentProcessUtility(processUtilityParams);

/*
* When syncing from externally-written Iceberg metadata, we only need the
* ALTER TABLE to modify pg_attribute. Skip field_id registration and
* metadata tracking so we don't conflict with the external metadata.
*/
if (SkipIcebergDDLProcessing)
return true;

List *schemaDDLOperations = NIL;

if (tableType == PG_LAKE_ICEBERG_TABLE_TYPE && RequiresNewIcebergSchema(alterStmt))
Expand Down Expand Up @@ -683,6 +692,14 @@ PostProcessRenameWritablePgLakeTable(ProcessUtilityParams * params, void *arg)
!(renameStmt->renameType == OBJECT_TABLE ||
renameStmt->renameType == OBJECT_FOREIGN_TABLE))
{
/*
* When syncing from externally-written Iceberg metadata, the rename
* is already reflected in the new metadata file; skip our own DDL
* tracking and metadata write.
*/
if (SkipIcebergDDLProcessing)
return;

/* schema has changed, update the metadata */
IcebergDDLOperation *ddlOperation = palloc0(sizeof(IcebergDDLOperation));

Expand Down
4 changes: 1 addition & 3 deletions pg_lake_table/src/fdw/data_files_catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ PgLakeAddDataFileHookType PgLakeAddDataFileHook = NULL;
static void FillDataFileColumnStats(TableDataFile * dataFile, int64 fieldId, int rowIndex);
static void FillPartitionFieldFromCatalog(TableDataFile * dataFile, List *partitionTransforms,
int64 partitionFieldId, int rowIndex);
static int64 AddDataFileToTable(Oid relationId, const char *path, int64 rowCount,
int64 fileSize, DataFileContent content, int64 rowIdStart);
static void AddDeletionFileMapping(Oid relationId, const char *path,
const char *sourcePath);
static void AddNewRowIdMapping(Oid relationId, const char *path, List *rowIdRanges);
Expand Down Expand Up @@ -937,7 +935,7 @@ GetTotalDeletedRowCountFromCatalog(Oid relationId)
* For deletion files, deletedFrom indicates which file we are deleting from (can
* be NULL if deleting from multiple files/unknown).
*/
static int64
int64
AddDataFileToTable(Oid relationId, const char *path, int64 rowCount, int64 fileSize,
DataFileContent content, int64 rowIdStart)
{
Expand Down
Loading
Loading