feat: implement DataFrame.checkpoint and DataFrame.localCheckpoint#1882
Closed
Copilot wants to merge 22 commits into
Closed
feat: implement DataFrame.checkpoint and DataFrame.localCheckpoint#1882Copilot wants to merge 22 commits into
DataFrame.checkpoint and DataFrame.localCheckpoint#1882Copilot wants to merge 22 commits into
Conversation
Agent-Logs-Url: https://github.com/lakehq/sail/sessions/5f3f46a3-3a38-4586-ac3b-5ff83088166b Co-authored-by: shehabgamin <11789402+shehabgamin@users.noreply.github.com>
Agent-Logs-Url: https://github.com/lakehq/sail/sessions/5f3f46a3-3a38-4586-ac3b-5ff83088166b Co-authored-by: shehabgamin <11789402+shehabgamin@users.noreply.github.com>
Copilot created this pull request from a session on behalf of
shehabgamin
May 10, 2026 01:15
View session
Contributor
There was a problem hiding this comment.
Pull request overview
Implements Spark Connect parity for DataFrame.checkpoint and DataFrame.localCheckpoint by adding full server-side support for CheckpointCommand, tracking checkpointed plans as CachedRemoteRelations, and resolving those cached relations in subsequent query plans.
Changes:
- Implement Spark Connect
CheckpointCommandround-trip (optionally eager materialization) andRemoveCachedRemoteRelationCommandcleanup handling. - Add catalog tracking APIs for cached relations and integrate them into plan resolution (
QueryNode::CachedRemoteRelation). - Add Python integration tests covering checkpoint/localCheckpoint behavior across eager/lazy variants and common downstream operations.
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| python/pysail/tests/spark/test_checkpoint.py | Adds end-to-end tests for checkpoint/localCheckpoint (eager/lazy, transformations, joins, nulls, empty DF, chaining). |
| crates/sail-spark-connect/src/service/plan_executor.rs | Implements checkpoint execution, optional eager materialization into MemTable, cached relation tracking, and cached relation removal command handling. |
| crates/sail-spark-connect/src/server.rs | Wires RemoveCachedRemoteRelationCommand to the new handler. |
| crates/sail-spark-connect/Cargo.toml | Adds sail-catalog dependency required for cached relation tracking. |
| crates/sail-plan/src/resolver/query/mod.rs | Enables resolving QueryNode::CachedRemoteRelation via a dedicated resolver method. |
| crates/sail-plan/src/resolver/query/misc.rs | Implements cached remote relation lookup and field-id rebinding for the current resolver state. |
| crates/sail-catalog/src/manager/tracker.rs | Adds an in-memory cached_relations map with track/get/remove operations. |
| crates/sail-catalog/src/manager/mod.rs | Re-exports cached relation tracking APIs from CatalogManager. |
| Cargo.lock | Locks the new sail-catalog dependency for sail-spark-connect. |
Codecov Report❌ Patch coverage is @@ Coverage Diff @@
## main #1882 +/- ##
==========================================
- Coverage 75.77% 74.91% -0.87%
==========================================
Files 942 944 +2
Lines 135320 142074 +6754
==========================================
+ Hits 102544 106428 +3884
- Misses 32776 35646 +2870
*This pull request uses carry forward flags. Click here to find out more.
... and 50 files with indirect coverage changes 🚀 New features to boost your workflow:
|
Spark 3.5.7 Test ReportCommit Information
Test Summary
Test DetailsError CountsPassed Tests Diff--- before.txt 2026-05-13 01:37:28.465869053 +0000
+++ after.txt 2026-05-13 01:37:28.663870356 +0000
@@ -261,0 +262,2 @@
+pyspark/sql/functions.py::pyspark.sql.functions.from_csv
+pyspark/sql/functions.py::pyspark.sql.functions.from_json
@@ -506 +507,0 @@
-pyspark/sql/tests/connect/test_connect_basic.py::SparkConnectBasicTests::test_csv
@@ -602,0 +604 @@
+pyspark/sql/tests/connect/test_connect_function.py::SparkConnectFunctionTests::test_string_functions_one_argFailed Tests |
Spark 4.1.1 Test ReportCommit Information
Test Summary
Test DetailsError Counts(truncated) Passed Tests Diff--- before.txt 2026-05-13 01:39:37.409318453 +0000
+++ after.txt 2026-05-13 01:39:37.722335464 +0000
@@ -91,0 +92 @@
+pyspark/sql/dataframe.py::pyspark.sql.dataframe.DataFrame.localCheckpoint
@@ -274,0 +276 @@
+pyspark/sql/functions/builtin.py::pyspark.sql.functions.builtin.from_csv
@@ -1400,0 +1403 @@
+pyspark/sql/tests/connect/test_connect_function.py::SparkConnectFunctionTests::test_string_functions_one_arg
@@ -1459 +1461,0 @@
-pyspark/sql/tests/connect/test_connect_readwriter.py::SparkConnectReadWriterTests::test_csv
@@ -1584,0 +1587 @@
+pyspark/sql/tests/connect/test_parity_dataframe.py::DataFrameParityTests::test_local_checkpoint_dataframe_with_storage_level
@@ -1948,0 +1952 @@
+pyspark/sql/tests/connect/test_parity_udtf.py::ArrowUDTFParityTests::test_struct_output_type_casting_rowFailed Tests(truncated) |
Ibis Test ReportCommit Information
Test Summary
Test DetailsError CountsPassed Tests Diff--- before.txt 2026-05-13 01:38:42.928560015 +0000
+++ after.txt 2026-05-13 01:38:43.172562599 +0000
@@ -726 +725,0 @@
-ibis/backends/tests/test_generic.py::test_uncorrelated_subquery[pyspark]
@@ -1482 +1481 @@
-ibis/backends/tests/test_udf.py::test_vectorized_udf[pyspark-add_one_pyarrow]
+ibis/backends/tests/test_udf.py::test_vectorized_udf[pyspark-add_one_pandas]Failed Tests |
Agent-Logs-Url: https://github.com/lakehq/sail/sessions/8133bd14-cb94-4c56-ac57-5629cfed18d9 Co-authored-by: shehabgamin <11789402+shehabgamin@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Agent-Logs-Url: https://github.com/lakehq/sail/sessions/aa2d0845-e4a5-4df6-96f7-42b8415124c8 Co-authored-by: shehabgamin <11789402+shehabgamin@users.noreply.github.com>
Agent-Logs-Url: https://github.com/lakehq/sail/sessions/aa2d0845-e4a5-4df6-96f7-42b8415124c8 Co-authored-by: shehabgamin <11789402+shehabgamin@users.noreply.github.com>
Agent-Logs-Url: https://github.com/lakehq/sail/sessions/31be1e08-c59a-4597-86fd-f617c6186791 Co-authored-by: shehabgamin <11789402+shehabgamin@users.noreply.github.com>
shehabgamin
reviewed
May 12, 2026
shehabgamin
reviewed
May 12, 2026
Co-authored-by: Shehab Amin <11789402+shehabgamin@users.noreply.github.com>
Agent-Logs-Url: https://github.com/lakehq/sail/sessions/52f051c1-f32c-4857-a6d0-3bcf42c09452 Co-authored-by: shehabgamin <11789402+shehabgamin@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Agent-Logs-Url: https://github.com/lakehq/sail/sessions/d72a8b0a-1114-4626-abaa-4566793a6975 Co-authored-by: shehabgamin <11789402+shehabgamin@users.noreply.github.com>
Comment on lines
+819
to
+856
| async fn cleanup_checkpoint_path(ctx: &SessionContext, checkpoint_uri: &str) { | ||
| let Ok(checkpoint_url) = url::Url::parse(checkpoint_uri) else { | ||
| warn!("failed to parse checkpoint location for cleanup: {checkpoint_uri}"); | ||
| return; | ||
| }; | ||
| let Ok((object_store, checkpoint_path)) = checkpoint_store_and_path(ctx, &checkpoint_url) | ||
| else { | ||
| warn!("failed to resolve checkpoint object store for cleanup: {checkpoint_uri}"); | ||
| return; | ||
| }; | ||
| let locations = object_store | ||
| .list(Some(&checkpoint_path)) | ||
| .map_ok(|meta| meta.location) | ||
| .boxed(); | ||
| if let Err(e) = object_store | ||
| .delete_stream(locations) | ||
| .try_collect::<Vec<_>>() | ||
| .await | ||
| { | ||
| warn!("failed to remove checkpoint location {checkpoint_uri}: {e}"); | ||
| } | ||
| cleanup_file_checkpoint_directory(&checkpoint_url, checkpoint_uri).await; | ||
| } | ||
|
|
||
| async fn cleanup_file_checkpoint_directory(checkpoint_url: &url::Url, checkpoint_uri: &str) { | ||
| if checkpoint_url.scheme() != "file" { | ||
| return; | ||
| } | ||
| let Ok(path) = checkpoint_url.to_file_path() else { | ||
| warn!("failed to convert checkpoint file URL to path for cleanup: {checkpoint_uri}"); | ||
| return; | ||
| }; | ||
| if let Err(e) = tokio::fs::remove_dir_all(&path).await { | ||
| if e.kind() != std::io::ErrorKind::NotFound { | ||
| warn!("failed to remove checkpoint directory {checkpoint_uri}: {e}"); | ||
| } | ||
| } | ||
| } |
Agent-Logs-Url: https://github.com/lakehq/sail/sessions/ed3c4316-4757-49ea-bbf1-6831281f90b9 Co-authored-by: shehabgamin <11789402+shehabgamin@users.noreply.github.com>
| // Stream batches directly to a single Parquet file inside the checkpoint | ||
| // directory without collecting them all into memory first. | ||
| let write_result: SparkResult<()> = async { | ||
| let parquet_path = ObjectStorePath::from(format!("{checkpoint_path}/part-0.parquet")); |
Contributor
|
Superseded by #1969 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
named_plan_fieldsfallback and rely on resolved named-plan fieldsCatalogObjectTracker::drop