diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 08d03107c..73d06088c 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -5,7 +5,7 @@ use delta_kernel_derive::internal_api; use futures::stream::StreamExt; use itertools::Itertools; use object_store::path::Path; -use object_store::{DynObjectStore, ObjectStore}; +use object_store::{DynObjectStore, ObjectStore, PutMode}; use url::Url; use super::UrlExt; @@ -175,6 +175,31 @@ impl StorageHandler for ObjectStoreStorageHandler { Ok(Box::new(receiver.into_iter())) } + + fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()> { + let src_path = Path::from_url_path(src.path())?; + let dest_path = Path::from_url_path(dest.path())?; + let dest_path_str = dest_path.to_string(); + let store = self.inner.clone(); + + // Read source file then write atomically with PutMode::Create. Note that a GET/PUT is not + // necessarily atomic, but since the source file is immutable, we aren't exposed to the + // possiblilty of source file changing while we do the PUT. + self.task_executor.block_on(async move { + let data = store.get(&src_path).await?.bytes().await?; + + store + .put_opts(&dest_path, data.into(), PutMode::Create.into()) + .await + .map_err(|e| match e { + object_store::Error::AlreadyExists { .. } => { + Error::FileAlreadyExists(dest_path_str) + } + e => e.into(), + })?; + Ok(()) + }) + } } #[cfg(test)] @@ -300,4 +325,36 @@ mod tests { } assert_eq!(len, 10, "list_from should have returned 10 files"); } + + #[tokio::test] + async fn test_copy() { + let tmp = tempfile::tempdir().unwrap(); + let store = Arc::new(LocalFileSystem::new()); + let executor = Arc::new(TokioBackgroundExecutor::new()); + let handler = ObjectStoreStorageHandler::new(store.clone(), executor); + + // basic + let data = Bytes::from("test-data"); + let src_path = Path::from_absolute_path(tmp.path().join("src.txt")).unwrap(); + store.put(&src_path, data.clone().into()).await.unwrap(); + let src_url = Url::from_file_path(tmp.path().join("src.txt")).unwrap(); + let dest_url = Url::from_file_path(tmp.path().join("dest.txt")).unwrap(); + assert!(handler.copy_atomic(&src_url, &dest_url).is_ok()); + let dest_path = Path::from_absolute_path(tmp.path().join("dest.txt")).unwrap(); + assert_eq!( + store.get(&dest_path).await.unwrap().bytes().await.unwrap(), + data + ); + + // copy to existing fails + assert!(matches!( + handler.copy_atomic(&src_url, &dest_url), + Err(Error::FileAlreadyExists(_)) + )); + + // copy from non-existing fails + let missing_url = Url::from_file_path(tmp.path().join("missing.txt")).unwrap(); + let new_dest_url = Url::from_file_path(tmp.path().join("new_dest.txt")).unwrap(); + assert!(handler.copy_atomic(&missing_url, &new_dest_url).is_err()); + } } diff --git a/kernel/src/engine/sync/storage.rs b/kernel/src/engine/sync/storage.rs index fd48e7f8e..9729b119e 100644 --- a/kernel/src/engine/sync/storage.rs +++ b/kernel/src/engine/sync/storage.rs @@ -70,6 +70,10 @@ impl StorageHandler for SyncStorageHandler { }); Ok(Box::new(iter)) } + + fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> { + unimplemented!("SyncStorageHandler does not implement copy"); + } } #[cfg(test)] diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index b3d0a3899..8a3780911 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -534,6 +534,10 @@ pub trait StorageHandler: AsAny { &self, files: Vec, ) -> DeltaResult>>>; + + /// Copy a file atomically from source to destination. If the destination file already exists, + /// it must return Err(Error::FileAlreadyExists). + fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()>; } /// Provides JSON handling functionality to Delta Kernel. diff --git a/kernel/src/listed_log_files.rs b/kernel/src/listed_log_files.rs index 95ca61ce9..5283ec089 100644 --- a/kernel/src/listed_log_files.rs +++ b/kernel/src/listed_log_files.rs @@ -634,6 +634,9 @@ mod list_log_files_with_log_tail_tests { ) -> DeltaResult>>> { panic!("read_files used"); } + fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()> { + panic!("copy used from {src} to {dest}"); + } } // when log_tail covers the entire requested range, no filesystem listing should occur