Skip to content

feat(checkpoint): add idempotent commit support for Delta#6872

Closed
chenghuichen wants to merge 5 commits into
Eventual-Inc:mainfrom
chenghuichen:delta-ck
Closed

feat(checkpoint): add idempotent commit support for Delta#6872
chenghuichen wants to merge 5 commits into
Eventual-Inc:mainfrom
chenghuichen:delta-ck

Conversation

@chenghuichen
Copy link
Copy Markdown
Contributor

@chenghuichen chenghuichen commented May 3, 2026

Changes Made

Extract shared checkpoint boilerplate (pending detection, query_id validation, file decode) into lightweight utility functions in _checkpoint_commit.py, then apply the same idempotent commit mechanism to write_deltalake — crash recovery via commit-info markers, skip-on-restart, incremental dedup. Iceberg's existing checkpoint flow is preserved, just calls the shared utilities instead of inlining the same bookkeeping.

Related Issues

@github-actions github-actions Bot added the feat label May 3, 2026
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 3, 2026

Greptile Summary

This PR extracts the connector-agnostic checkpoint-commit orchestration from the Iceberg write path into a shared commit_with_checkpoint helper, then wires write_deltalake into that same helper to give Delta Lake full crash-safe idempotent-commit semantics (file staging → recovery detection → catalog commit → mark-committed).

  • P1 — potential double-write on recovery: refresh_and_check_committed for Delta Lake calls history(limit=50), so if the table has received more than 50 commits between a crash and the recovery attempt the idempotency marker is invisible, commit_files runs again, and duplicate data is silently appended. The Iceberg path iterates snapshots without any limit — the same guarantee should apply here.

Confidence Score: 3/5

Not safe to merge without addressing the history-limit cap that can cause silent data duplication on recovery

One P1 correctness defect (history limit in the Delta Lake recovery check can produce duplicate commits) pulls the score below the P1 ceiling of 4; the rest are P2 style issues

daft/dataframe/dataframe.py — specifically the refresh_and_check_committed closure inside _write_deltalake_with_checkpoint

Important Files Changed

Filename Overview
daft/dataframe/dataframe.py Adds _write_deltalake_with_checkpoint; the history(limit=50) cap in the recovery check can silently allow double-writes on busy tables; import json is inline rather than top-level
daft/dataframe/_checkpoint_commit.py New shared orchestration module; logic is correct but from daft import from_pydict is deferred inside helper functions instead of at the top level
tests/io/delta_lake/test_deltalake_writes_checkpoint.py Good coverage of crash-recovery scenarios; data verification relies solely on daft.read_deltalake rather than an external reader; redundant inline import inside crash_after_stage
tests/io/delta_lake/init.py Empty package init; no issues

Sequence Diagram

sequenceDiagram
    participant Caller
    participant commit_with_checkpoint
    participant CheckpointStore
    participant decode_files
    participant refresh_and_check_committed
    participant commit_files

    Caller->>commit_with_checkpoint: write_df, checkpoint, callbacks
    commit_with_checkpoint->>CheckpointStore: list_checkpoints()
    alt No pending entries (fresh run)
        commit_with_checkpoint->>commit_with_checkpoint: write_df.collect()
        commit_with_checkpoint->>CheckpointStore: list_checkpoints() [again]
    end
    alt Still no pending entries
        commit_with_checkpoint-->>Caller: _empty_write_result()
    end
    commit_with_checkpoint->>CheckpointStore: get_checkpointed_files()
    commit_with_checkpoint->>decode_files: file_metadata blobs
    decode_files-->>commit_with_checkpoint: connector file objects
    alt No file objects
        commit_with_checkpoint->>CheckpointStore: mark_committed()
        commit_with_checkpoint-->>Caller: _empty_write_result()
    end
    loop up to max_retries
        commit_with_checkpoint->>refresh_and_check_committed: store_path, query_id
        alt Already committed (recovery)
            refresh_and_check_committed-->>commit_with_checkpoint: True
            commit_with_checkpoint->>CheckpointStore: mark_committed()
            commit_with_checkpoint-->>Caller: _build_result_df()
        else Not yet committed
            refresh_and_check_committed-->>commit_with_checkpoint: False
            commit_with_checkpoint->>commit_files: files, store_path, query_id
            alt Commit succeeds
                commit_files-->>commit_with_checkpoint: OK
                commit_with_checkpoint->>CheckpointStore: mark_committed()
                commit_with_checkpoint-->>Caller: _build_result_df()
            else Retryable error
                commit_files-->>commit_with_checkpoint: error
                Note over commit_with_checkpoint: retry
            end
        end
    end
Loading

Comments Outside Diff (1)

  1. daft/dataframe/dataframe.py, line 518-524 (link)

    P1 history(limit=50) can miss the idempotency marker, causing double-writes

    resolved_table.history(limit=50) only inspects the 50 most recent commits. If the Delta table receives more than 50 new commits between the crash and the recovery attempt, the daft.checkpoint-* marker commit falls outside the window, refresh_and_check_committed returns False, and commit_files runs again — committing the same files a second time and silently duplicating data.

    The Iceberg path has no such limit: it iterates table.metadata.snapshots exhaustively. Either remove the limit entirely (note: DeltaTable.history() without a limit reads the full log) or raise it substantially to match realistic recovery windows.

Reviews (1): Last reviewed commit: "extract shared commit orchestration" | Re-trigger Greptile

Comment thread daft/dataframe/_checkpoint_commit.py Outdated
Comment thread tests/io/delta_lake/test_deltalake_writes_checkpoint.py
@codspeed-hq
Copy link
Copy Markdown

codspeed-hq Bot commented May 3, 2026

Merging this PR will not alter performance

✅ 40 untouched benchmarks
⏩ 10 skipped benchmarks1


Comparing chenghuichen:delta-ck (0bc746d) with main (044b833)

Open in CodSpeed

Footnotes

  1. 10 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

@chenghuichen chenghuichen changed the title feat(checkpoint): add Delta Lake checkpoint support and extract shared commit orchestration feat(checkpoint): add idempotent commit support for write_deltalake May 4, 2026
@chenghuichen chenghuichen changed the title feat(checkpoint): add idempotent commit support for write_deltalake feat(checkpoint): add idempotent commit support for Delta May 4, 2026
@rohitkulshreshtha
Copy link
Copy Markdown
Contributor

Hey @chenghuichen — apologies for sitting on this. I was unwell + OOO for a few days, and in the meantime we shaped a design change (PR #6905) that affects the foundations this PR is built on. Wanted to give you a heads-up before you spend more time on it.

Why the change

The original design conflated execution identity (which run staged a row) with commit identity (which logical commit a snapshot represents). They're independent concerns:

  • The user-visible guarantee is "one logical commit run = exactly one snapshot/commit", regardless of crashes, retries, or how many internal executions backed it.
  • That's a commit-identity concern, not an execution-identity one. Per-entry query_id stamping was solving a different problem from the one users actually have.

Decoupling them lets recovery be keyed on a single user-supplied token (the idempotence_key, typically an orchestrator run-ID). Recovery becomes "did this key already commit?" — one snapshot lookup, no per-entry bookkeeping. The exactly-once invariant is checkable directly: count snapshots tagged with the key, assert exactly one.

What changed in #6905

1. API shape — paired kwargs collapsed into a single strong type:

# Before (this PR):
df.write_iceberg(table, checkpoint=ckpt, idempotence_key="run-1")

# After:
df.write_iceberg(
    table,
    checkpoint=daft.IdempotentCommit(store=ckpt, idempotence_key="run-1"),
)

daft.IdempotentCommit is a frozen dataclass bundling store + idempotence_key. The key is per-call, not per-store.

2. Snapshot marker: daft.checkpoint-store + daft.checkpoint-query collapse into a single daft.idempotence-key. The daft.idempotence- prefix is reserved — raises if a user passes a key with that prefix in snapshot_properties / custom_metadata, regardless of whether the checkpoint flow is in use.

3. Per-entry query_id removed. The Checkpoint struct no longer carries it. The single-query-id invariant your validate_query_id helper enforces is gone — recovery is keyed entirely on the user-supplied idempotence_key.

4. Check-first commit flow. Walk snapshot history for the marker before running the pipeline. Found → mark Checkpointed → Committed and bail (no pipeline run). Not found → run pipeline, commit from store, mark.

What this means for #6872

  • _checkpoint_commit.py: drop validate_query_id. Reshape get_pending_or_execute to check-first semantics (or split into a snapshot-history-walk helper + a pipeline-execute helper). decode_file_metadata and empty_write_result survive as-is.
  • _write_deltalake_with_checkpoint: take checkpoint: IdempotentCommit | None. Walk Delta table.history() for daft.idempotence-key matching the call's key. Stamp that single key in custom_metadata instead of the two old markers. Always-reserve the prefix.
  • Tests: use daft.IdempotentCommit(...) consistently, assert a single-commit invariant per logical run (exactly one new Delta commit, tagged with daft.idempotence-key).

Happy to pair on this once #6905 merges. Sorry again for the delay.

@rohitkulshreshtha
Copy link
Copy Markdown
Contributor

Quick update on this PR, @chenghuichen — change of plan from my last comment.

Rather than ask you to do the rework against PR #6905 yourself, we'll consolidate the work into a follow-up PR we drive. Two reasons:

  1. The conceptual gap between this PR's foundations and feat(iceberg): idempotent commits via daft.idempotence-key snapshot marker #6905's reshape is wider than I initially conveyed, and rework-via-PR-comments is slower than just doing it directly.
  2. We want to extend the same pattern to Lance in the same pass; consolidating saves a round of stacked PRs.

The architectural direction in this PR is what we're following. Specifically:

  • daft/dataframe/_checkpoint_commit.py as a sink-glue module.
  • decode_file_metadata(checkpoint, column_name) and empty_write_result(write_df) — verbatim. Lifting these out of inline iceberg was the right call once there are ≥2 sinks; feat(iceberg): idempotent commits via daft.idempotence-key snapshot marker #6905 inlined them only because iceberg was the only sink at the time.
  • Delta plumbing (create_table_with_add_actions delegate, version forks, AddAction.stats interpretation, custom_metadata stamping).
  • test_incremental_writes_dedupe_committed_keys test pattern — useful enough that we'll add the iceberg analog too.

What changes (mostly mechanics, since #6905 reshaped the underlying API):

  • Drop validate_query_id — per-entry query_id no longer exists.
  • Single marker daft.idempotence-key instead of paired daft.checkpoint-store + daft.checkpoint-query.
  • IdempotentCommit dataclass at the call site (replaces paired kwargs).
  • Smaller fixes: bounded history() walk, retry loop on commit failure, post-commit update_incremental(), reserved-prefix guard tightened to fire regardless of checkpoint=.

Plan:

  • Consolidated PR going up shortly. You'll be Co-Authored-By: on the helper-extraction and Delta-plumbing commits.
  • This PR closes as superseded when the consolidated PR lands.

Apologies again for the calendar friction — your contribution is meaningfully shaping what we're shipping. Will tag you on the consolidated PR when it's up.

@chenghuichen
Copy link
Copy Markdown
Contributor Author

@rohitkulshreshtha Hope you're feeling better!

The execution-identity vs commit-identity split makes a lot of sense — wish I'd caught that distinction during my original review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants