Skip to content

Commit f89e58e

Browse files
Support external write to rest catalog
Signed-off-by: Aykut Bozkurt <aykut.bozkurt@snowflake.com>
1 parent c7ae556 commit f89e58e

19 files changed

Lines changed: 823 additions & 111 deletions

pg_lake_iceberg/include/pg_lake/iceberg/api/snapshot.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ extern PGDLLEXPORT bool IsCurrentSnapshot(IcebergTableMetadata * metadata, Icebe
5050
/* read api */
5151
extern PGDLLEXPORT List *FetchSnapshotsFromTableMetadata(IcebergTableMetadata * metadata, SnapshotPredicateFn snapshotPredicateFn);
5252
extern PGDLLEXPORT IcebergSnapshot * GetCurrentSnapshot(IcebergTableMetadata * metadata, bool missingOk);
53-
extern PGDLLEXPORT IcebergSnapshot * GetIcebergSnapshotViaId(IcebergTableMetadata * metadata, uint64_t snapshotId);
53+
extern PGDLLEXPORT IcebergSnapshot * GetIcebergSnapshotViaId(IcebergTableMetadata * metadata, uint64_t snapshotId, bool missingOk);
5454

5555

5656
/* write api */

pg_lake_iceberg/include/pg_lake/iceberg/catalog.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,11 @@ extern PGDLLEXPORT char *IcebergDefaultCatalog;
5050
*/
5151
#define ICEBERG_INTERNAL_CATALOG_TABLE_NAME "tables_internal"
5252
#define ICEBERG_EXTERNAL_CATALOG_TABLE_NAME "tables_external"
53+
#define REST_CATALOG_SYNC_TABLE_NAME "rest_catalog_sync"
5354

5455
#define ICEBERG_INTERNAL_CATALOG_TABLE_QUALIFIED PG_LAKE_ICEBERG_SCHEMA "." ICEBERG_INTERNAL_CATALOG_TABLE_NAME
5556
#define ICEBERG_EXTERNAL_CATALOG_TABLE_QUALIFIED PG_LAKE_ICEBERG_SCHEMA "." ICEBERG_EXTERNAL_CATALOG_TABLE_NAME
57+
#define REST_CATALOG_SYNC_TABLE_QUALIFIED PG_LAKE_ICEBERG_SCHEMA "." REST_CATALOG_SYNC_TABLE_NAME
5658

5759
extern PGDLLEXPORT void InsertInternalIcebergCatalogTable(Oid relationId, const char *metadataLocation, bool hasCustomLocation);
5860
extern PGDLLEXPORT void InsertExternalIcebergCatalogTable(const char *catalogName, const char *tableNamespace,
@@ -76,3 +78,7 @@ extern PGDLLEXPORT bool RelationExistsInTheIcebergCatalog(Oid relationId);
7678
extern PGDLLEXPORT bool HasCustomLocation(Oid relationId);
7779
extern PGDLLEXPORT bool IsWritableIcebergTable(Oid relationId);
7880
extern PGDLLEXPORT bool IsReadOnlyIcebergTable(Oid relationId);
81+
82+
/* REST Catalog Sync Table Functions */
83+
extern PGDLLEXPORT void UpdateLastSyncedSnapshotId(Oid relationId, int64 snapshotId);
84+
extern PGDLLEXPORT int64_t GetLastSyncedSnapshotId(Oid relationId);

pg_lake_iceberg/include/pg_lake/iceberg/data_file_stats.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
#include "pg_lake/iceberg/api.h"
2424

2525
extern PGDLLEXPORT void SetIcebergDataFileStats(const DataFileStats * dataFileStats,
26-
int64_t *recordCount,
27-
int64_t *fileSizeInBytes,
26+
int64_t * recordCount,
27+
int64_t * fileSizeInBytes,
2828
ColumnBound * *lowerBounds,
2929
size_t *nLowerBounds,
3030
ColumnBound * *upperBounds,

pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ typedef struct RestCatalogRequest
6565
* holds the full request body.
6666
*/
6767
char *body;
68+
69+
/*
70+
* For ADD_SNAPSHOT operations, this holds the new snapshot ID being
71+
* added. Set to -1 for other operation types. Used to track what snapshot
72+
* was successfully pushed to the REST catalog for external change
73+
* detection.
74+
*/
75+
int64 newSnapshotId;
6876
} RestCatalogRequest;
6977

7078

pg_lake_iceberg/pg_lake_iceberg--3.0--3.1.sql

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,13 @@ AS 'MODULE_PATHNAME', $function$force_push_object_store_catalog$function$;
2929
-- any role who can write into a table should be able to trigger this
3030
REVOKE ALL ON FUNCTION lake_iceberg.force_push_object_store_catalog() FROM public;
3131
GRANT EXECUTE ON FUNCTION lake_iceberg.force_push_object_store_catalog() TO lake_read_write;
32+
33+
-- Create rest_catalog_sync table if it doesn't exist (for upgrades from 3.0)
34+
CREATE TABLE lake_iceberg.rest_catalog_sync (
35+
table_name regclass NOT NULL,
36+
last_synced_snapshot_id BIGINT,
37+
PRIMARY KEY (table_name)
38+
);
39+
40+
REVOKE ALL ON TABLE lake_iceberg.rest_catalog_sync FROM public;
41+
GRANT SELECT, INSERT, UPDATE ON TABLE lake_iceberg.rest_catalog_sync TO lake_read_write;

pg_lake_iceberg/src/iceberg/api/snapshot.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ GetCurrentSnapshot(IcebergTableMetadata * metadata, bool missingOk)
114114
* an error is thrown.
115115
*/
116116
IcebergSnapshot *
117-
GetIcebergSnapshotViaId(IcebergTableMetadata * metadata, uint64_t snapshotId)
117+
GetIcebergSnapshotViaId(IcebergTableMetadata * metadata, uint64_t snapshotId, bool missingOk)
118118
{
119119
int snapshotIndex = 0;
120120

@@ -128,6 +128,9 @@ GetIcebergSnapshotViaId(IcebergTableMetadata * metadata, uint64_t snapshotId)
128128
}
129129
}
130130

131+
if (missingOk)
132+
return NULL;
133+
131134
ereport(ERROR,
132135
(errcode(ERRCODE_INTERNAL_ERROR),
133136
errmsg("snapshot with id %" PRIu64 " not found", snapshotId)));

pg_lake_iceberg/src/iceberg/catalog.c

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -790,3 +790,89 @@ IsReadOnlyIcebergTable(Oid relationId)
790790
{
791791
return IsIcebergTable(relationId) && !IsWritableIcebergTable(relationId);
792792
}
793+
794+
795+
/*
796+
* UpdateLastSyncedSnapshotId upserts the last synced snapshot ID for a given relation into the REST catalog sync table.
797+
* This tracks what was last successfully pushed to the REST catalog.
798+
*/
799+
void
800+
UpdateLastSyncedSnapshotId(Oid relationId, int64 snapshotId)
801+
{
802+
/* switch to schema owner */
803+
Oid savedUserId = InvalidOid;
804+
int savedSecurityContext = 0;
805+
806+
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
807+
SetUserIdAndSecContext(ExtensionOwnerId(PgLakeIceberg), SECURITY_LOCAL_USERID_CHANGE);
808+
809+
StringInfo query = makeStringInfo();
810+
811+
appendStringInfo(query,
812+
"INSERT INTO %s (table_name, last_synced_snapshot_id) "
813+
"VALUES ($1, $2) "
814+
"ON CONFLICT (table_name) "
815+
"DO UPDATE SET last_synced_snapshot_id = EXCLUDED.last_synced_snapshot_id",
816+
REST_CATALOG_SYNC_TABLE_QUALIFIED);
817+
818+
DECLARE_SPI_ARGS(2);
819+
SPI_ARG_VALUE(1, OIDOID, relationId, false);
820+
SPI_ARG_VALUE(2, INT8OID, snapshotId, false);
821+
822+
SPI_START();
823+
824+
bool readOnly = false;
825+
826+
SPI_EXECUTE(query->data, readOnly);
827+
828+
SPI_END();
829+
830+
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
831+
}
832+
833+
834+
/*
835+
* GetLastSyncedSnapshotId returns the last synced snapshot id for a given
836+
* relation from the REST catalog sync table. Returns NULL if not found.
837+
*/
838+
int64_t
839+
GetLastSyncedSnapshotId(Oid relationId)
840+
{
841+
/* switch to schema owner */
842+
Oid savedUserId = InvalidOid;
843+
int savedSecurityContext = 0;
844+
845+
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
846+
SetUserIdAndSecContext(ExtensionOwnerId(PgLakeIceberg), SECURITY_LOCAL_USERID_CHANGE);
847+
848+
StringInfo query = makeStringInfo();
849+
850+
appendStringInfo(query,
851+
"SELECT last_synced_snapshot_id FROM %s "
852+
"WHERE table_name OPERATOR(pg_catalog.=) $1",
853+
REST_CATALOG_SYNC_TABLE_QUALIFIED);
854+
855+
DECLARE_SPI_ARGS(1);
856+
SPI_ARG_VALUE(1, OIDOID, relationId, false);
857+
858+
SPI_START();
859+
860+
bool readOnly = true;
861+
862+
SPI_EXECUTE(query->data, readOnly);
863+
864+
int64_t lastSyncedSnapshotId = -1;
865+
866+
if (SPI_processed > 0)
867+
{
868+
bool isNull = false;
869+
870+
lastSyncedSnapshotId = GET_SPI_VALUE(INT8OID, 0, 1, &isNull);
871+
}
872+
873+
SPI_END();
874+
875+
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
876+
877+
return lastSyncedSnapshotId;
878+
}

pg_lake_iceberg/src/iceberg/data_file_stats.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ static ColumnBound * CreateColumnBoundForLeafField(LeafField * leafField, char *
3838
*/
3939
void
4040
SetIcebergDataFileStats(const DataFileStats * dataFileStats,
41-
int64_t *recordCount,
42-
int64_t *fileSizeInBytes,
41+
int64_t * recordCount,
42+
int64_t * fileSizeInBytes,
4343
ColumnBound * *lowerBounds,
4444
size_t *nLowerBounds,
4545
ColumnBound * *upperBounds,

pg_lake_iceberg/src/iceberg/operations/find_referenced_files.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ find_all_referenced_files_via_snapshot_ids(PG_FUNCTION_ARGS)
118118
foreach(snapshotIdCell, snapshotIdList)
119119
{
120120
int64 *snapshotId = (int64 *) lfirst(snapshotIdCell);
121-
IcebergSnapshot *snapshot = GetIcebergSnapshotViaId(metadata, *snapshotId);
121+
IcebergSnapshot *snapshot = GetIcebergSnapshotViaId(metadata, *snapshotId, false);
122122

123123
IcebergSnapshotAddAllReferencedFiles(snapshot, fileHash);
124124
}
@@ -249,7 +249,7 @@ GetIcebergSnapshotsViaSnapshotIdList(IcebergTableMetadata * metadata, List *snap
249249
foreach(snapshotIdCell, snapshotIdList)
250250
{
251251
int64 *snapshotId = (int64 *) lfirst(snapshotIdCell);
252-
IcebergSnapshot *snapshot = GetIcebergSnapshotViaId(metadata, *snapshotId);
252+
IcebergSnapshot *snapshot = GetIcebergSnapshotViaId(metadata, *snapshotId, false);
253253

254254
snapshots[snapshotIndex] = *snapshot;
255255
snapshotIndex++;

pg_lake_iceberg/src/rest_catalog/rest_catalog.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,7 @@ GetAddSnapshotCatalogRequest(IcebergSnapshot * newSnapshot, Oid relationId)
889889
request->relationId = relationId;
890890
request->operationType = REST_CATALOG_ADD_SNAPSHOT;
891891
request->body = body->data;
892+
request->newSnapshotId = newSnapshot->snapshot_id;
892893

893894
return request;
894895
}

0 commit comments

Comments
 (0)