Skip to content

feat(checkpoint): add idempotent checkpointed writes for Lance append mode #6967

@everySympathy

Description

@everySympathy

Is your feature request related to a problem?

Yes. This is a feature gap for Lance checkpoint support. Daft checkpointing can skip already-processed source keys on retry, and Iceberg checkpointed writes already have an idempotent final-commit path. Lance writes do not yet have the same checkpoint-aware final commit support.

Today, Lance writes through daft_lance.LanceDataSink produce Lance FragmentMetadata via lance.fragment.write_fragments(...), then finalize(write_results) commits fragments from the current run's write_results.

To support checkpoint recovery for Lance, we need to record the fragments produced by each checkpointed task and read them back from the checkpoint store during retry. In Phase 1, those fragments can be recovered from the staged write_results: each WriteResult.result contains the Lance FragmentMetadata needed for the final append commit. This lets a retry commit fragments that were already produced in a previous run without recomputing the corresponding inputs.

Most of the Lance-specific implementation is expected to live in daft_lance (repo: https://github.com/daft-engine/daft-lance), following the daft.io.lance to daft_lance migration (PR: #6957). Daft core may still need some checkpoint plumbing to stage the DataSink's existing write_results payload and let the Lance final-commit helper read it back.

Describe the solution you'd like

I'd like to add checkpoint-aware / idempotent commit support for write_lance(..., checkpoint=daft.IdempotentCommit(...)), initially for append-only writes through the new daft_lance integration.

Proposed Phase 1 scope:

  • Match the current Iceberg checkpointed-write scope: support append-only writes first. For Lance, this means mode="append" and the target Lance dataset must already exist. The open Delta checkpoint PR (feat(checkpoint): idempotent write_deltalake + cross-sink helpers #6932) follows the same append-only direction.
  • Support file/object-store Lance URIs first, such as local paths or s3://.... REST-backed Lance writes are out of scope for the first pass.
  • Store the existing DataSink write_results payload in the checkpoint store under a Lance checkpoint file format.
  • During recovery / final commit, decode checkpointed write_results and extract WriteResult.result as Lance FragmentMetadata.
  • Commit with a Lance Transaction carrying transaction_properties={"daft.idempotence-key": checkpoint.idempotence_key}.
  • Before committing, scan Lance transaction history for the same daft.idempotence-key.
    • If found, mark pending checkpoint entries committed and return without appending again.
    • If not found, append all checkpointed fragments and mark committed.

High-level normal Lance write path today:

flowchart LR
  A["Daft DataFrame"] --> B["daft_lance.LanceDataSink.write()"]
  B --> C["lance.fragment.write_fragments()"]
  C --> D["Write fragment files"]
  C --> E["Return FragmentMetadata"]
  E --> F["WriteResult(result=[fragments])"]
  F --> G["collect write_results"]
  G --> H["LanceDataSink.finalize(write_results)"]
  H --> I["LanceDataset.commit(Append)"]
Loading

Proposed checkpoint-aware path:

flowchart LR
  A["Partition writes data"] --> B["WriteResult(result=[fragments])"]
  B --> C["Stage write_results into checkpoint store"]
  C --> D["checkpoint(id): seal keys + write metadata"]

  E["finalize / retry"] --> F["Read Checkpointed write_results"]
  F --> G["Extract wr.result fragments"]
  G --> H["Lance Transaction Append"]
  H --> I["Write daft.idempotence-key"]
  I --> J["mark_committed"]
Loading

Python / Rust boundary

Lance is different from Iceberg and Delta in Daft.

Iceberg and Delta use Daft's native catalog write path. Their checkpoint logic can stay close to the existing Rust write and commit flow.

Lance currently goes through the Python DataSink path. The actual Lance write happens in daft_lance, through pylance. Daft core only sees the generic DataSink output: write_results.

flowchart LR
  A["Rust Daft core<br/>checkpoint store"]:::rust --> B["write_results<br/>cross-language bridge"]:::boundary
  B --> C["Python daft_lance<br/>understands Lance fragments"]:::python

  classDef rust fill:#e8f1ff,stroke:#4f7ecb,color:#0f2742;
  classDef boundary fill:#fff7dd,stroke:#c69a22,color:#4a3410;
  classDef python fill:#eaf8ee,stroke:#3f9d5b,color:#12351f;
Loading

For Phase 1, Daft core should stage the existing write_results at the checkpoint boundary. This keeps Lance-specific fragment handling out of the Rust executor.

Then daft_lance can read those staged write_results during final commit. Each WriteResult.result contains the Lance fragments that need to be appended.

In other words:

  • Daft core stores write_results in the checkpoint store.
  • daft_lance reads them back, extracts the Lance fragments, checks the idempotence key, and commits the fragments to Lance.

Describe alternatives you've considered

  1. Store Lance FragmentMetadata directly in the checkpoint store instead of DataSink write_results.

    This is semantically cleaner, but it likely requires a new checkpoint metadata hook / connector-specific plumbing. For Phase 1, storing write_results is less invasive because it matches the generic DataSink output shape already produced by Daft.

  2. Add fingerprint validation in Phase 1.

    A fingerprint could detect accidental reuse of the same idempotence_key with different fragments. This would be a useful safety improvement, but it also adds extra design and compatibility questions. For the initial append-only path, I think we can follow the current contract first: the user-provided idempotence_key identifies one logical commit and must be stable across retries but unique across distinct commits.

  3. Support create, overwrite, and merge modes.

    These modes have more complicated retry semantics. Phase 1 should start with append-only behavior, matching the narrower initial support used by checkpointed Iceberg writes.

Component(s)

  • checkpointing
  • Lance / daft_lance
  • DataSink writes
  • IO / catalog writes

Additional Context

Related work:

Out of scope for Phase 1:

  • mode="create"
  • mode="overwrite"
  • mode="merge"
  • REST-backed Lance writes
  • fingerprint validation
  • multi-process concurrent writers sharing the same checkpoint store path
  • changing the checkpoint payload to a Lance-specific fragments column

Would you like to implement a fix?

Yes.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions