From ed327035c3670423a830cab7741899173fd78df6 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 16 Oct 2025 13:44:42 -0700 Subject: [PATCH 1/4] Engine copy API --- kernel/src/engine/default/filesystem.rs | 71 ++++++++++++++++++++++++- kernel/src/engine/sync/storage.rs | 4 ++ kernel/src/error.rs | 12 +++++ kernel/src/lib.rs | 4 ++ kernel/src/listed_log_files.rs | 4 ++ 5 files changed, 94 insertions(+), 1 deletion(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 08d03107c..1ffa47216 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -5,11 +5,12 @@ use delta_kernel_derive::internal_api; use futures::stream::StreamExt; use itertools::Itertools; use object_store::path::Path; -use object_store::{DynObjectStore, ObjectStore}; +use object_store::{DynObjectStore, ObjectStore, PutMode}; use url::Url; use super::UrlExt; use crate::engine::default::executor::TaskExecutor; +use crate::error::CopyError; use crate::{DeltaResult, Error, FileMeta, FileSlice, StorageHandler}; #[derive(Debug)] @@ -175,6 +176,39 @@ impl StorageHandler for ObjectStoreStorageHandler { Ok(Box::new(receiver.into_iter())) } + + fn copy(&self, src: &Url, dest: &Url) -> Result<(), CopyError> { + let src_path = + Path::from_url_path(src.path()).map_err(|e| CopyError::Other(Box::new(e)))?; + let dest_path = + Path::from_url_path(dest.path()).map_err(|e| CopyError::Other(Box::new(e)))?; + let dest_path_str = dest_path.to_string(); + let store = self.inner.clone(); + + // object_store has a copy_if_not_exists method, but it doesn't work with all cloud stores + // (namely S3). for now we just do the 'dumb' manual way of get/put. note we could do a + // HEAD then the non-atomic copy but that still has a race condition. + self.task_executor.block_on(async move { + let data = store + .get(&src_path) + .await + .map_err(|e| CopyError::Other(Box::new(e)))? + .bytes() + .await + .map_err(|e| CopyError::Other(Box::new(e)))?; + + store + .put_opts(&dest_path, data.into(), PutMode::Create.into()) + .await + .map_err(|e| match e { + object_store::Error::AlreadyExists { .. } => { + CopyError::DestinationAlreadyExists(dest_path_str) + } + e => CopyError::Other(Box::new(e)), + })?; + Ok(()) + }) + } } #[cfg(test)] @@ -300,4 +334,39 @@ mod tests { } assert_eq!(len, 10, "list_from should have returned 10 files"); } + + #[tokio::test] + async fn test_copy() { + let tmp = tempfile::tempdir().unwrap(); + let store = Arc::new(LocalFileSystem::new()); + let executor = Arc::new(TokioBackgroundExecutor::new()); + let handler = ObjectStoreStorageHandler::new(store.clone(), executor); + + // basic + let data = Bytes::from("test-data"); + let src_path = Path::from_absolute_path(tmp.path().join("src.txt")).unwrap(); + store.put(&src_path, data.clone().into()).await.unwrap(); + let src_url = Url::from_file_path(tmp.path().join("src.txt")).unwrap(); + let dest_url = Url::from_file_path(tmp.path().join("dest.txt")).unwrap(); + assert!(handler.copy(&src_url, &dest_url).is_ok()); + let dest_path = Path::from_absolute_path(tmp.path().join("dest.txt")).unwrap(); + assert_eq!( + store.get(&dest_path).await.unwrap().bytes().await.unwrap(), + data + ); + + // copy to existing fails + assert!(matches!( + handler.copy(&src_url, &dest_url), + Err(CopyError::DestinationAlreadyExists(_)) + )); + + // copy from non-existing fails + let missing_url = Url::from_file_path(tmp.path().join("missing.txt")).unwrap(); + let new_dest_url = Url::from_file_path(tmp.path().join("new_dest.txt")).unwrap(); + assert!(matches!( + handler.copy(&missing_url, &new_dest_url), + Err(CopyError::Other(_)) + )); + } } diff --git a/kernel/src/engine/sync/storage.rs b/kernel/src/engine/sync/storage.rs index fd48e7f8e..b6a02eeab 100644 --- a/kernel/src/engine/sync/storage.rs +++ b/kernel/src/engine/sync/storage.rs @@ -70,6 +70,10 @@ impl StorageHandler for SyncStorageHandler { }); Ok(Box::new(iter)) } + + fn copy(&self, _src: &Url, _dest: &Url) -> Result<(), crate::error::CopyError> { + unimplemented!("SyncStorageHandler does not implement copy"); + } } #[cfg(test)] diff --git a/kernel/src/error.rs b/kernel/src/error.rs index d42cd6de4..31cf663a7 100644 --- a/kernel/src/error.rs +++ b/kernel/src/error.rs @@ -210,6 +210,10 @@ pub enum Error { /// Schema mismatch has occurred or invalid schema used somewhere #[error("Schema error: {0}")] Schema(String), + + /// Error during Engine's storage handler copy operation + #[error(transparent)] + CopyError(#[from] CopyError), } // Convenience constructors for Error types that take a String argument @@ -342,3 +346,11 @@ impl From for Error { } } } + +#[derive(thiserror::Error, Debug)] +pub enum CopyError { + #[error("Destination file already exists: {0}")] + DestinationAlreadyExists(String), + #[error(transparent)] + Other(#[from] Box), +} diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index b3d0a3899..d05265c7f 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -534,6 +534,10 @@ pub trait StorageHandler: AsAny { &self, files: Vec, ) -> DeltaResult>>>; + + /// Copy a file atomically from source to destination. If the destination file already exists, + /// it must return Err(CopyError::DestinationAlreadyExists). + fn copy(&self, src: &Url, dest: &Url) -> Result<(), error::CopyError>; } /// Provides JSON handling functionality to Delta Kernel. diff --git a/kernel/src/listed_log_files.rs b/kernel/src/listed_log_files.rs index 95ca61ce9..ccae7713f 100644 --- a/kernel/src/listed_log_files.rs +++ b/kernel/src/listed_log_files.rs @@ -388,6 +388,7 @@ mod list_log_files_with_log_tail_tests { use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::filesystem::ObjectStoreStorageHandler; + use crate::error::CopyError; use crate::FileMeta; use super::*; @@ -634,6 +635,9 @@ mod list_log_files_with_log_tail_tests { ) -> DeltaResult>>> { panic!("read_files used"); } + fn copy(&self, src: &Url, dest: &Url) -> Result<(), CopyError> { + panic!("copy used from {src} to {dest}"); + } } // when log_tail covers the entire requested range, no filesystem listing should occur From 9215d24e22d0ce4939c7e045c63cb3b423d1ae39 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Mon, 20 Oct 2025 17:16:03 -0700 Subject: [PATCH 2/4] revert CopyError --- kernel/src/engine/default/filesystem.rs | 31 +++++++++---------------- kernel/src/engine/sync/storage.rs | 2 +- kernel/src/error.rs | 12 ---------- kernel/src/lib.rs | 4 ++-- kernel/src/listed_log_files.rs | 3 +-- 5 files changed, 15 insertions(+), 37 deletions(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 1ffa47216..c2a29f75d 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -10,7 +10,6 @@ use url::Url; use super::UrlExt; use crate::engine::default::executor::TaskExecutor; -use crate::error::CopyError; use crate::{DeltaResult, Error, FileMeta, FileSlice, StorageHandler}; #[derive(Debug)] @@ -177,34 +176,29 @@ impl StorageHandler for ObjectStoreStorageHandler { Ok(Box::new(receiver.into_iter())) } - fn copy(&self, src: &Url, dest: &Url) -> Result<(), CopyError> { - let src_path = - Path::from_url_path(src.path()).map_err(|e| CopyError::Other(Box::new(e)))?; - let dest_path = - Path::from_url_path(dest.path()).map_err(|e| CopyError::Other(Box::new(e)))?; + fn copy(&self, src: &Url, dest: &Url) -> DeltaResult<()> { + let src_path = Path::from_url_path(src.path())?; + let dest_path = Path::from_url_path(dest.path())?; let dest_path_str = dest_path.to_string(); let store = self.inner.clone(); - // object_store has a copy_if_not_exists method, but it doesn't work with all cloud stores - // (namely S3). for now we just do the 'dumb' manual way of get/put. note we could do a - // HEAD then the non-atomic copy but that still has a race condition. + // Read source file then write atomically with PutMode::Create + // This ensures: 1) atomicity 2) fails if destination exists self.task_executor.block_on(async move { let data = store .get(&src_path) - .await - .map_err(|e| CopyError::Other(Box::new(e)))? + .await? .bytes() - .await - .map_err(|e| CopyError::Other(Box::new(e)))?; + .await?; store .put_opts(&dest_path, data.into(), PutMode::Create.into()) .await .map_err(|e| match e { object_store::Error::AlreadyExists { .. } => { - CopyError::DestinationAlreadyExists(dest_path_str) + Error::FileAlreadyExists(dest_path_str) } - e => CopyError::Other(Box::new(e)), + e => e.into(), })?; Ok(()) }) @@ -358,15 +352,12 @@ mod tests { // copy to existing fails assert!(matches!( handler.copy(&src_url, &dest_url), - Err(CopyError::DestinationAlreadyExists(_)) + Err(Error::FileAlreadyExists(_)) )); // copy from non-existing fails let missing_url = Url::from_file_path(tmp.path().join("missing.txt")).unwrap(); let new_dest_url = Url::from_file_path(tmp.path().join("new_dest.txt")).unwrap(); - assert!(matches!( - handler.copy(&missing_url, &new_dest_url), - Err(CopyError::Other(_)) - )); + assert!(handler.copy(&missing_url, &new_dest_url).is_err()); } } diff --git a/kernel/src/engine/sync/storage.rs b/kernel/src/engine/sync/storage.rs index b6a02eeab..0f6a593b1 100644 --- a/kernel/src/engine/sync/storage.rs +++ b/kernel/src/engine/sync/storage.rs @@ -71,7 +71,7 @@ impl StorageHandler for SyncStorageHandler { Ok(Box::new(iter)) } - fn copy(&self, _src: &Url, _dest: &Url) -> Result<(), crate::error::CopyError> { + fn copy(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> { unimplemented!("SyncStorageHandler does not implement copy"); } } diff --git a/kernel/src/error.rs b/kernel/src/error.rs index 31cf663a7..d42cd6de4 100644 --- a/kernel/src/error.rs +++ b/kernel/src/error.rs @@ -210,10 +210,6 @@ pub enum Error { /// Schema mismatch has occurred or invalid schema used somewhere #[error("Schema error: {0}")] Schema(String), - - /// Error during Engine's storage handler copy operation - #[error(transparent)] - CopyError(#[from] CopyError), } // Convenience constructors for Error types that take a String argument @@ -346,11 +342,3 @@ impl From for Error { } } } - -#[derive(thiserror::Error, Debug)] -pub enum CopyError { - #[error("Destination file already exists: {0}")] - DestinationAlreadyExists(String), - #[error(transparent)] - Other(#[from] Box), -} diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index d05265c7f..adaa9f024 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -536,8 +536,8 @@ pub trait StorageHandler: AsAny { ) -> DeltaResult>>>; /// Copy a file atomically from source to destination. If the destination file already exists, - /// it must return Err(CopyError::DestinationAlreadyExists). - fn copy(&self, src: &Url, dest: &Url) -> Result<(), error::CopyError>; + /// it must return Err(Error::FileAlreadyExists). + fn copy(&self, src: &Url, dest: &Url) -> DeltaResult<()>; } /// Provides JSON handling functionality to Delta Kernel. diff --git a/kernel/src/listed_log_files.rs b/kernel/src/listed_log_files.rs index ccae7713f..3701563af 100644 --- a/kernel/src/listed_log_files.rs +++ b/kernel/src/listed_log_files.rs @@ -388,7 +388,6 @@ mod list_log_files_with_log_tail_tests { use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::filesystem::ObjectStoreStorageHandler; - use crate::error::CopyError; use crate::FileMeta; use super::*; @@ -635,7 +634,7 @@ mod list_log_files_with_log_tail_tests { ) -> DeltaResult>>> { panic!("read_files used"); } - fn copy(&self, src: &Url, dest: &Url) -> Result<(), CopyError> { + fn copy(&self, src: &Url, dest: &Url) -> DeltaResult<()> { panic!("copy used from {src} to {dest}"); } } From 7e96f28b4377bd7920d80e0aee5ef42f598f5fce Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 21 Oct 2025 14:35:51 -0700 Subject: [PATCH 3/4] rename to copy_atomic --- kernel/src/engine/default/filesystem.rs | 14 +++++--------- kernel/src/engine/sync/storage.rs | 2 +- kernel/src/lib.rs | 2 +- kernel/src/listed_log_files.rs | 2 +- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index c2a29f75d..51368d9a5 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -176,7 +176,7 @@ impl StorageHandler for ObjectStoreStorageHandler { Ok(Box::new(receiver.into_iter())) } - fn copy(&self, src: &Url, dest: &Url) -> DeltaResult<()> { + fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()> { let src_path = Path::from_url_path(src.path())?; let dest_path = Path::from_url_path(dest.path())?; let dest_path_str = dest_path.to_string(); @@ -185,11 +185,7 @@ impl StorageHandler for ObjectStoreStorageHandler { // Read source file then write atomically with PutMode::Create // This ensures: 1) atomicity 2) fails if destination exists self.task_executor.block_on(async move { - let data = store - .get(&src_path) - .await? - .bytes() - .await?; + let data = store.get(&src_path).await?.bytes().await?; store .put_opts(&dest_path, data.into(), PutMode::Create.into()) @@ -342,7 +338,7 @@ mod tests { store.put(&src_path, data.clone().into()).await.unwrap(); let src_url = Url::from_file_path(tmp.path().join("src.txt")).unwrap(); let dest_url = Url::from_file_path(tmp.path().join("dest.txt")).unwrap(); - assert!(handler.copy(&src_url, &dest_url).is_ok()); + assert!(handler.copy_atomic(&src_url, &dest_url).is_ok()); let dest_path = Path::from_absolute_path(tmp.path().join("dest.txt")).unwrap(); assert_eq!( store.get(&dest_path).await.unwrap().bytes().await.unwrap(), @@ -351,13 +347,13 @@ mod tests { // copy to existing fails assert!(matches!( - handler.copy(&src_url, &dest_url), + handler.copy_atomic(&src_url, &dest_url), Err(Error::FileAlreadyExists(_)) )); // copy from non-existing fails let missing_url = Url::from_file_path(tmp.path().join("missing.txt")).unwrap(); let new_dest_url = Url::from_file_path(tmp.path().join("new_dest.txt")).unwrap(); - assert!(handler.copy(&missing_url, &new_dest_url).is_err()); + assert!(handler.copy_atomic(&missing_url, &new_dest_url).is_err()); } } diff --git a/kernel/src/engine/sync/storage.rs b/kernel/src/engine/sync/storage.rs index 0f6a593b1..9729b119e 100644 --- a/kernel/src/engine/sync/storage.rs +++ b/kernel/src/engine/sync/storage.rs @@ -71,7 +71,7 @@ impl StorageHandler for SyncStorageHandler { Ok(Box::new(iter)) } - fn copy(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> { + fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> { unimplemented!("SyncStorageHandler does not implement copy"); } } diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index adaa9f024..8a3780911 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -537,7 +537,7 @@ pub trait StorageHandler: AsAny { /// Copy a file atomically from source to destination. If the destination file already exists, /// it must return Err(Error::FileAlreadyExists). - fn copy(&self, src: &Url, dest: &Url) -> DeltaResult<()>; + fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()>; } /// Provides JSON handling functionality to Delta Kernel. diff --git a/kernel/src/listed_log_files.rs b/kernel/src/listed_log_files.rs index 3701563af..5283ec089 100644 --- a/kernel/src/listed_log_files.rs +++ b/kernel/src/listed_log_files.rs @@ -634,7 +634,7 @@ mod list_log_files_with_log_tail_tests { ) -> DeltaResult>>> { panic!("read_files used"); } - fn copy(&self, src: &Url, dest: &Url) -> DeltaResult<()> { + fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()> { panic!("copy used from {src} to {dest}"); } } From a24e6b733c98d5d7d0c2840779a54c20740631a8 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Wed, 22 Oct 2025 13:23:44 -0700 Subject: [PATCH 4/4] comment --- kernel/src/engine/default/filesystem.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 51368d9a5..73d06088c 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -182,8 +182,9 @@ impl StorageHandler for ObjectStoreStorageHandler { let dest_path_str = dest_path.to_string(); let store = self.inner.clone(); - // Read source file then write atomically with PutMode::Create - // This ensures: 1) atomicity 2) fails if destination exists + // Read source file then write atomically with PutMode::Create. Note that a GET/PUT is not + // necessarily atomic, but since the source file is immutable, we aren't exposed to the + // possiblilty of source file changing while we do the PUT. self.task_executor.block_on(async move { let data = store.get(&src_path).await?.bytes().await?;