Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pg_lake_engine/include/pg_lake/cleanup/deletion_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ extern PGDLLEXPORT void InsertDeletionQueueRecord(char *path, Oid relationId, Ti
extern PGDLLEXPORT void InsertPrefixDeletionRecord(char *path, TimestampTz orphanedAt);
extern PGDLLEXPORT void InsertDeletionQueueRecordExtended(char *path, Oid relationId, TimestampTz orphanedAt,
bool isPrefix);
extern PGDLLEXPORT void DeleteDeletionQueueRecordsByPath(List *paths);
16 changes: 16 additions & 0 deletions pg_lake_engine/src/cleanup/deletion_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,19 @@ InsertDeletionQueueRecordExtended(char *path, Oid relationId, TimestampTz orphan

SetUserIdAndSecContext(savedUserId, savedSecurityContext);
}


/*
* DeleteDeletionQueueRecordsByPath removes the given paths from the
* deletion queue table without deleting the actual remote files.
* Used to undo premature deletion queue entries when a REST catalog
* commit fails.
*/
void
DeleteDeletionQueueRecordsByPath(List *paths)
{
if (paths == NIL)
return;

RemoveDeletionQueuePathsFromCatalog(paths);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "nodes/pg_list.h"

extern PGDLLEXPORT List *ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allTransforms, int maxSnapshotAgeInSecs, bool isVerbose);
extern PGDLLEXPORT List *ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allTransforms, int maxSnapshotAgeInSecs, bool isVerbose,
char **deletionQueueMetadataPath);
extern PGDLLEXPORT bool ShouldSkipMetadataChangeToIceberg(List *metadataOperationTypes);
extern PGDLLEXPORT List *GetMetadataOperationTypes(List *metadataOperations);
15 changes: 14 additions & 1 deletion pg_lake_iceberg/src/iceberg/metadata_operations.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,14 @@ static void DeleteInProgressManifests(Oid relationId, List *manifests);
*/
List *
ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allTransforms,
int maxSnapshotAgeInSecs, bool isVerbose)
int maxSnapshotAgeInSecs, bool isVerbose,
char **deletionQueueMetadataPath)
{
List *restCatalogRequests = NIL;

if (deletionQueueMetadataPath)
*deletionQueueMetadataPath = NULL;

Assert(metadataOperations != NIL);

#ifdef USE_ASSERT_CHECKING
Expand Down Expand Up @@ -344,8 +348,17 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT
if (writableRestCatalogTable)
{
if (metadataPath)
{
InsertDeletionQueueRecord(metadataPath, relationId, GetCurrentTransactionStartTimestamp());

/*
* Report the path we inserted so the caller can undo it if
* the REST catalog commit fails later.
*/
if (deletionQueueMetadataPath)
*deletionQueueMetadataPath = metadataPath;
}

/*
* We are done, writable rest catalog iceberg tables have their
* metadata updated in the catalog itself.
Expand Down
9 changes: 9 additions & 0 deletions pg_lake_table/include/pg_lake/util/injection_points.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,25 @@
#define INJECTION_POINT_COMPAT(name) \
INJECTION_POINT(name, NULL)

#define IS_INJECTION_POINT_ATTACHED_COMPAT(name) \
IS_INJECTION_POINT_ATTACHED(name)

#elif PG_VERSION_NUM >= 170000

#include "utils/injection_point.h"

#define INJECTION_POINT_COMPAT(name) \
INJECTION_POINT(name)

#define IS_INJECTION_POINT_ATTACHED_COMPAT(name) \
(false)

#else

#define INJECTION_POINT_COMPAT(name) \
((void) name)

#define IS_INJECTION_POINT_ATTACHED_COMPAT(name) \
(false)

#endif
113 changes: 111 additions & 2 deletions pg_lake_table/src/transaction/track_iceberg_metadata_changes.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "common/int.h"
#include "utils/memutils.h"

#include "pg_lake/cleanup/deletion_queue.h"
#include "pg_lake/cleanup/in_progress_files.h"
#include "pg_lake/data_file/data_files.h"
#include "pg_lake/fdw/data_files_catalog.h"
Expand Down Expand Up @@ -78,9 +79,18 @@ typedef struct RestCatalogRequestPerTable


List *tableModifyRequests;

/*
* For writable REST catalog tables, the metadata path that was
* optimistically inserted into the deletion queue during pre-commit.
* If the REST commit fails, this path needs to be removed from the
* deletion queue to undo the premature insertion.
*/
char *deletionQueueMetadataPath;
} RestCatalogRequestPerTable;

static void ApplyTrackedIcebergMetadataChanges(bool isVerbose);
static void UndoFailedRestCatalogDeletionQueueEntries(void);
static void RecordIcebergMetadataOperation(Oid relationId, TableMetadataOperationType operationType);
static void InitTableMetadataTrackerHashIfNeeded(void);
static void InitRestCatalogRequestsHashIfNeeded(void);
Expand Down Expand Up @@ -117,6 +127,15 @@ static HTAB *RestCatalogRequestsHash = NULL;
/* some pre-allocated memory so we don't palloc() ever in XACT_COMMIT */
static MemoryContext PgLakeXactCommitContext = NULL;

/*
* Paths that were optimistically inserted into the deletion queue during
* pre-commit, but whose REST catalog commit subsequently failed. These
* are saved in TopMemoryContext so they survive across transactions, and
* removed from the deletion_queue at the next pre-commit to undo the
* premature insertion.
*/
static List *FailedRestCatalogDeletionPaths = NIL;

/*
* TrackIcebergMetadataChangesInTx tracks metadata changes for a given relation
* within a transaction. It acquires the necessary locks before applying the changes
Expand Down Expand Up @@ -215,6 +234,33 @@ ResetRestCatalogRequests(void)
}


/*
* UndoFailedRestCatalogDeletionQueueEntries removes deletion queue entries
* that were optimistically inserted during pre-commit of a previous
* transaction, but whose REST catalog commit subsequently failed. Must be
* called in a context where SPI is available (e.g., during pre-commit with
* an active snapshot).
*/
static void
UndoFailedRestCatalogDeletionQueueEntries(void)
{
if (FailedRestCatalogDeletionPaths == NIL)
return;

DeleteDeletionQueueRecordsByPath(FailedRestCatalogDeletionPaths);

ListCell *lc = NULL;

foreach(lc, FailedRestCatalogDeletionPaths)
{
pfree(lfirst(lc));
}

list_free(FailedRestCatalogDeletionPaths);
FailedRestCatalogDeletionPaths = NIL;
}


/*
* PostAllRestCatalogRequests posts all the tracked REST catalog requests
* to the REST catalog at transaction commit time. This is called at post-commit
Expand Down Expand Up @@ -414,11 +460,47 @@ PostAllRestCatalogRequests(void)
appendStringInfoChar(batchRequestBody, '}'); /* close json body */

char *url = psprintf(REST_CATALOG_TRANSACTION_COMMIT, RestCatalogHost, catalogName);
HttpResult httpResult = SendRequestToRestCatalog(HTTP_POST, url, batchRequestBody->data, PostHeadersWithAuth());
HttpResult httpResult;

if (IS_INJECTION_POINT_ATTACHED_COMPAT("rest-catalog-batch-commit"))
{
memset(&httpResult, 0, sizeof(HttpResult));
httpResult.status = 503;
httpResult.errorMsg = "injected failure for testing";
}
else
{
httpResult = SendRequestToRestCatalog(HTTP_POST, url, batchRequestBody->data, PostHeadersWithAuth());
}

if (httpResult.status != 204)
{
ReportHTTPError(httpResult, WARNING);

/*
* REST catalog commit failed. Any metadata paths that were
* optimistically inserted into the deletion queue during
* pre-commit need to be undone, since the old metadata is
* still the active version. We save them in TopMemoryContext
* so they survive the current transaction, and delete them
* from the deletion_queue at the next pre-commit.
*/
MemoryContext undoContext = MemoryContextSwitchTo(TopMemoryContext);

hash_seq_init(&status, RestCatalogRequestsHash);

while ((requestPerTable = hash_seq_search(&status)) != NULL)
{
if (requestPerTable->isValid &&
requestPerTable->deletionQueueMetadataPath != NULL)
{
FailedRestCatalogDeletionPaths =
lappend(FailedRestCatalogDeletionPaths,
pstrdup(requestPerTable->deletionQueueMetadataPath));
}
}

MemoryContextSwitchTo(undoContext);
}
}

Expand Down Expand Up @@ -679,6 +761,14 @@ RecordRestCatalogRequestInTx(Oid relationId, RestCatalogOperationType operationT
void
ConsumeTrackedIcebergMetadataChanges(bool isVerbose)
{
/*
* Undo any deletion queue entries from previous failed REST catalog
* commits. Those entries were optimistically inserted during pre-commit
* but the REST commit failed, so the metadata files are still active
* and must not be deleted.
*/
UndoFailedRestCatalogDeletionQueueEntries();

ApplyTrackedIcebergMetadataChanges(isVerbose);
ResetTrackedIcebergMetadataOperation();
}
Expand Down Expand Up @@ -799,7 +889,9 @@ ApplyTrackedIcebergMetadataChanges(bool isVerbose)
if (metadataOperations != NIL)
{
int maxSnapshotAgeInSecs = GetEffectiveMaxSnapshotAgeInSecs(relationId);
List *restRequests = ApplyIcebergMetadataChanges(relationId, metadataOperations, allTransforms, maxSnapshotAgeInSecs, isVerbose);
char *deletionQueuePath = NULL;
List *restRequests = ApplyIcebergMetadataChanges(relationId, metadataOperations, allTransforms, maxSnapshotAgeInSecs, isVerbose,
&deletionQueuePath);
ListCell *requestCell = NULL;

foreach(requestCell, restRequests)
Expand All @@ -810,6 +902,23 @@ ApplyTrackedIcebergMetadataChanges(bool isVerbose)
request->body);
}

/*
* If a metadata path was inserted into the deletion queue,
* record it so PostAllRestCatalogRequests can undo it if the
* REST commit fails.
*/
if (deletionQueuePath != NULL)
{
bool isFound = false;
RestCatalogRequestPerTable *requestPerTable =
hash_search(RestCatalogRequestsHash,
&relationId, HASH_FIND, &isFound);

if (isFound && requestPerTable->isValid)
requestPerTable->deletionQueueMetadataPath =
MemoryContextStrdup(TopTransactionContext,
deletionQueuePath);
}
}
}

Expand Down
109 changes: 109 additions & 0 deletions pg_lake_table/tests/pytests/test_polaris_catalog_writable.py
Original file line number Diff line number Diff line change
Expand Up @@ -3954,6 +3954,115 @@ def test_rest_rename_col_same_name(
pg_conn.commit()


def test_rest_commit_failure_undo_deletion_queue(
pg_conn,
superuser_conn,
s3,
polaris_session,
set_polaris_gucs,
with_default_location,
installcheck,
create_http_helper_functions,
):
"""
When a REST catalog commit fails (e.g. HTTP 429), the old metadata path
was optimistically inserted into the deletion_queue during pre-commit.
Without the undo mechanism, the next INSERT would crash with:
ERROR: duplicate key value violates unique constraint "deletion_queue_pkey"

This test verifies that the stale deletion_queue entry is cleaned up at
the next pre-commit, allowing the subsequent INSERT to succeed.

Uses the "rest-catalog-batch-commit" injection point (PG 18+) to
simulate a batch commit failure without breaking the metadata fetch.
"""
if installcheck:
return

if get_pg_version_num(pg_conn) < 180000:
return

SCHEMA = "test_rest_commit_failure_undo"
TABLE_FQN = f"{SCHEMA}.t_rest"
INJECTION_POINT = "rest-catalog-batch-commit"

conn = open_pg_conn()

try:
run_command(
f"SET pg_lake_iceberg.default_location_prefix TO 's3://{TEST_BUCKET}'",
conn,
)
run_command("CREATE EXTENSION IF NOT EXISTS injection_points", conn)
conn.commit()

run_command(f"CREATE SCHEMA {SCHEMA}", conn)
run_command(
f"CREATE TABLE {TABLE_FQN} (a int) USING iceberg "
f"WITH (catalog='rest', autovacuum_enabled=False)",
conn,
)
run_command(f"INSERT INTO {TABLE_FQN} VALUES (1)", conn)
conn.commit()

rows = run_query(f"SELECT a FROM {TABLE_FQN} ORDER BY a", conn)
assert len(rows) == 1 and rows[0][0] == 1

# Attach injection point so PostAllRestCatalogRequests skips the
# real HTTP call and simulates a non-204 response.
run_command(
f"SELECT injection_points_attach('{INJECTION_POINT}', 'notice')",
conn,
)
conn.commit()

# INSERT succeeds at PG level; REST batch commit is skipped (WARNING).
# The old metadata path is now a stale entry in deletion_queue.
run_command(f"INSERT INTO {TABLE_FQN} VALUES (2)", conn)
conn.commit()

# Detach injection point so the next commit goes through to Polaris.
run_command(
f"SELECT injection_points_detach('{INJECTION_POINT}')",
conn,
)
conn.commit()

# This INSERT must NOT crash with "duplicate key violates unique
# constraint deletion_queue_pkey". The undo mechanism removes the
# stale deletion_queue entry at the start of pre-commit.
run_command(f"INSERT INTO {TABLE_FQN} VALUES (3)", conn)
conn.commit()

rows = run_query(f"SELECT a FROM {TABLE_FQN} ORDER BY a", conn)
assert len(rows) == 3
assert [r[0] for r in rows] == [1, 2, 3]

finally:
try:
conn.rollback()
except Exception:
pass
run_command(
f"SELECT injection_points_detach('{INJECTION_POINT}')",
conn,
raise_error=False,
)
try:
conn.rollback()
except Exception:
pass
try:
run_command(f"DROP SCHEMA {SCHEMA} CASCADE", conn)
conn.commit()
except Exception:
try:
conn.rollback()
except Exception:
pass
conn.close()


def assert_metadata_on_pg_catalog_and_rest_matches(
namespace, table_name, superuser_conn
):
Expand Down
Loading