Skip to content

Make pull and read_dataset from Studio atomic#1573

Open
shcheklein wants to merge 22 commits intomainfrom
make-pull-atomic
Open

Make pull and read_dataset from Studio atomic#1573
shcheklein wants to merge 22 commits intomainfrom
make-pull-atomic

Conversation

@shcheklein
Copy link
Contributor

@shcheklein shcheklein commented Jan 31, 2026

Downloading a remote dataset (via datachain pull or read_dataset with a remote source) wrote directly to the final table. A crash mid-download left corrupt partial data with no cleanup path.

Fix: downloads now stage into a temporary table, then atomically swap it into place. Failures drop the temp table — the final table is never left in a partial state.

  • File lock per dataset version prevents concurrent downloads from corrupting each other
  • GC now detects and cleans up orphaned partial downloads (with a 1h grace period for in-flight ones)
  • SQLite retry logic narrowed to only retry on actual lock contention

@shcheklein shcheklein self-assigned this Jan 31, 2026
@shcheklein shcheklein marked this pull request as draft January 31, 2026 22:06
@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Jan 31, 2026

Deploying datachain with  Cloudflare Pages  Cloudflare Pages

Latest commit: 879b7d2
Status: ✅  Deploy successful!
Preview URL: https://abb77e07.datachain-2g6.pages.dev
Branch Preview URL: https://make-pull-atomic.datachain-2g6.pages.dev

View logs

@codecov
Copy link

codecov bot commented Jan 31, 2026

Codecov Report

❌ Patch coverage is 89.23077% with 21 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/datachain/data_storage/sqlite.py 31.57% 12 Missing and 1 partial ⚠️
src/datachain/utils.py 87.87% 6 Missing and 2 partials ⚠️

📢 Thoughts on this report? Let us know!

yield source, source.ls(fields)

def pull_dataset( # noqa: C901, PLR0915
def pull_dataset( # noqa: C901, PLR0915, PLR0912
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Def need a refactoring, comes as a next PR to this ... including some cleanups to progress bars

) -> sa.Table:
"""Creates a dataset rows table for the given dataset name and columns"""

def create_temp_dataset_table(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ilongin will some of these clash with the checkpoints PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it won't clash. I do think that maybe this wrapper is not even needed since you actually just call 2 methods in it and I think it's used only in one place, but it's a minor

@shcheklein shcheklein force-pushed the make-pull-atomic branch 6 times, most recently from 1a73529 to e69e554 Compare February 5, 2026 22:01
@shcheklein shcheklein requested a review from Copilot February 8, 2026 22:07
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR is a WIP to make Studio-backed dataset pull / read_dataset more failure-safe by staging remote rows into temporary tables and cleaning up incomplete local state so retries can succeed after mid-flight failures.

Changes:

  • Reworked Catalog.pull_dataset() to stage into a tmp_ table, then rename/commit and mark the dataset version COMPLETE only at the end.
  • Updated DatasetRowsFetcher to insert into an arbitrary staging table instead of inserting directly into the final dataset table.
  • Added/expanded functional tests covering cleanup and successful retry after export/download/parse/insert failures and simulated process kills.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
tests/func/test_read_dataset_remote.py Adds atomicity/cleanup and SIGKILL-retry functional tests for remote read_dataset.
tests/func/test_pull.py Adds atomicity/cleanup-and-retry tests for pull_dataset under multiple failure modes.
src/datachain/data_storage/warehouse.py Introduces generic temp-table creation, dataframe insertion by table name, and table rename helpers for staging/commit.
src/datachain/data_storage/sqlite.py Removes the now-obsolete insert_dataset_rows path in the SQLite warehouse implementation.
src/datachain/catalog/catalog.py Implements the staged temp-table download flow and commit sequence for atomic-ish pull_dataset.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@shcheklein shcheklein force-pushed the make-pull-atomic branch 4 times, most recently from 5c90d36 to def059c Compare February 11, 2026 02:02
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

shcheklein and others added 3 commits February 12, 2026 11:13
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI commented Feb 12, 2026

@shcheklein I've opened a new pull request, #1585, to work on those changes. Once the pull request is ready, I'll request review from you.

…aise (#1585)

* Initial plan

* Use `except Exception:` instead of bare `except:` for better interrupt handling

Co-authored-by: shcheklein <3659196+shcheklein@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: shcheklein <3659196+shcheklein@users.noreply.github.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Resolved conflicts:
- metastore.py: merged imports (timedelta from our branch + nullcontext from main)
- test_utils.py: kept both our interprocess_file_lock tests and main's
  checkpoint tests

Reconciled duplicate warehouse.rename_table:
- Removed our string-based rename_table(old_name, new_name) from warehouse.py
- Adopted main's Table-based rename_table(old_table, new_name) -> sa.Table
- Updated catalog.py to get_table() before calling rename_table()
- Metadata cache eviction now handled by db_engine.rename_table (sqlite.py)

Related API changes from main absorbed cleanly:
- copy_table -> insert_into rename
- insert_dataset_rows removal (already removed in our branch)
- create_table_from_query (new staging pattern for UDF tables)
- create_pre_udf_table now takes name parameter
- TableRenameError / TableMissingError in error.py
- is_temp_table_name no longer matches UDF prefix
Comment on lines +207 to +211
lock.release()
try:
os.remove(lock_path)
except OSError:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock file is deleted after release, which can break inter-process exclusion. Removing the lock file after release() creates a race where another process can acquire a lock on the old inode while a third process recreates the path and acquires a second lock, breaking mutual exclusion.

projects.
"""

def get_dataset_by_version_uuid(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this to be abstract method?

) -> sa.Table:
"""Creates a dataset rows table for the given dataset name and columns"""

def create_temp_dataset_table(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it won't clash. I do think that maybe this wrapper is not even needed since you actually just call 2 methods in it and I think it's used only in one place, but it's a minor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants