-
Couldn't load subscription status.
- Fork 118
feat!(catalog-managed): New copy_atomic StorageHandler method #1400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ed32703
9215d24
7e96f28
a24e6b7
9432bca
0c5f48f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,7 +5,7 @@ use delta_kernel_derive::internal_api; | |
| use futures::stream::StreamExt; | ||
| use itertools::Itertools; | ||
| use object_store::path::Path; | ||
| use object_store::{DynObjectStore, ObjectStore}; | ||
| use object_store::{DynObjectStore, ObjectStore, PutMode}; | ||
| use url::Url; | ||
|
|
||
| use super::UrlExt; | ||
|
|
@@ -175,6 +175,31 @@ impl<E: TaskExecutor> StorageHandler for ObjectStoreStorageHandler<E> { | |
|
|
||
| Ok(Box::new(receiver.into_iter())) | ||
| } | ||
|
|
||
| fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()> { | ||
| let src_path = Path::from_url_path(src.path())?; | ||
| let dest_path = Path::from_url_path(dest.path())?; | ||
| let dest_path_str = dest_path.to_string(); | ||
| let store = self.inner.clone(); | ||
|
|
||
| // Read source file then write atomically with PutMode::Create. Note that a GET/PUT is not | ||
| // necessarily atomic, but since the source file is immutable, we aren't exposed to the | ||
| // possiblilty of source file changing while we do the PUT. | ||
| self.task_executor.block_on(async move { | ||
| let data = store.get(&src_path).await?.bytes().await?; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there concerns about the size of the data we're loading into memory? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes but i'm advocating not to optimize here. i'll open a follow up to track this tho There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| store | ||
| .put_opts(&dest_path, data.into(), PutMode::Create.into()) | ||
| .await | ||
| .map_err(|e| match e { | ||
| object_store::Error::AlreadyExists { .. } => { | ||
| Error::FileAlreadyExists(dest_path_str) | ||
| } | ||
| e => e.into(), | ||
| })?; | ||
| Ok(()) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
|
|
@@ -300,4 +325,36 @@ mod tests { | |
| } | ||
| assert_eq!(len, 10, "list_from should have returned 10 files"); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_copy() { | ||
| let tmp = tempfile::tempdir().unwrap(); | ||
| let store = Arc::new(LocalFileSystem::new()); | ||
| let executor = Arc::new(TokioBackgroundExecutor::new()); | ||
| let handler = ObjectStoreStorageHandler::new(store.clone(), executor); | ||
|
|
||
| // basic | ||
| let data = Bytes::from("test-data"); | ||
| let src_path = Path::from_absolute_path(tmp.path().join("src.txt")).unwrap(); | ||
| store.put(&src_path, data.clone().into()).await.unwrap(); | ||
| let src_url = Url::from_file_path(tmp.path().join("src.txt")).unwrap(); | ||
| let dest_url = Url::from_file_path(tmp.path().join("dest.txt")).unwrap(); | ||
| assert!(handler.copy_atomic(&src_url, &dest_url).is_ok()); | ||
| let dest_path = Path::from_absolute_path(tmp.path().join("dest.txt")).unwrap(); | ||
| assert_eq!( | ||
| store.get(&dest_path).await.unwrap().bytes().await.unwrap(), | ||
| data | ||
| ); | ||
|
|
||
| // copy to existing fails | ||
| assert!(matches!( | ||
| handler.copy_atomic(&src_url, &dest_url), | ||
| Err(Error::FileAlreadyExists(_)) | ||
| )); | ||
|
|
||
| // copy from non-existing fails | ||
| let missing_url = Url::from_file_path(tmp.path().join("missing.txt")).unwrap(); | ||
| let new_dest_url = Url::from_file_path(tmp.path().join("new_dest.txt")).unwrap(); | ||
| assert!(handler.copy_atomic(&missing_url, &new_dest_url).is_err()); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.