Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion kernel/src/engine/default/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use delta_kernel_derive::internal_api;
use futures::stream::StreamExt;
use itertools::Itertools;
use object_store::path::Path;
use object_store::{DynObjectStore, ObjectStore};
use object_store::{DynObjectStore, ObjectStore, PutMode};
use url::Url;

use super::UrlExt;
Expand Down Expand Up @@ -175,6 +175,31 @@ impl<E: TaskExecutor> StorageHandler for ObjectStoreStorageHandler<E> {

Ok(Box::new(receiver.into_iter()))
}

fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()> {
let src_path = Path::from_url_path(src.path())?;
let dest_path = Path::from_url_path(dest.path())?;
let dest_path_str = dest_path.to_string();
let store = self.inner.clone();

// Read source file then write atomically with PutMode::Create. Note that a GET/PUT is not
// necessarily atomic, but since the source file is immutable, we aren't exposed to the
// possiblilty of source file changing while we do the PUT.
self.task_executor.block_on(async move {
let data = store.get(&src_path).await?.bytes().await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there concerns about the size of the data we're loading into memory?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but i'm advocating not to optimize here. i'll open a follow up to track this tho

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


store
.put_opts(&dest_path, data.into(), PutMode::Create.into())
.await
.map_err(|e| match e {
object_store::Error::AlreadyExists { .. } => {
Error::FileAlreadyExists(dest_path_str)
}
e => e.into(),
})?;
Ok(())
})
}
}

#[cfg(test)]
Expand Down Expand Up @@ -300,4 +325,36 @@ mod tests {
}
assert_eq!(len, 10, "list_from should have returned 10 files");
}

#[tokio::test]
async fn test_copy() {
let tmp = tempfile::tempdir().unwrap();
let store = Arc::new(LocalFileSystem::new());
let executor = Arc::new(TokioBackgroundExecutor::new());
let handler = ObjectStoreStorageHandler::new(store.clone(), executor);

// basic
let data = Bytes::from("test-data");
let src_path = Path::from_absolute_path(tmp.path().join("src.txt")).unwrap();
store.put(&src_path, data.clone().into()).await.unwrap();
let src_url = Url::from_file_path(tmp.path().join("src.txt")).unwrap();
let dest_url = Url::from_file_path(tmp.path().join("dest.txt")).unwrap();
assert!(handler.copy_atomic(&src_url, &dest_url).is_ok());
let dest_path = Path::from_absolute_path(tmp.path().join("dest.txt")).unwrap();
assert_eq!(
store.get(&dest_path).await.unwrap().bytes().await.unwrap(),
data
);

// copy to existing fails
assert!(matches!(
handler.copy_atomic(&src_url, &dest_url),
Err(Error::FileAlreadyExists(_))
));

// copy from non-existing fails
let missing_url = Url::from_file_path(tmp.path().join("missing.txt")).unwrap();
let new_dest_url = Url::from_file_path(tmp.path().join("new_dest.txt")).unwrap();
assert!(handler.copy_atomic(&missing_url, &new_dest_url).is_err());
}
}
4 changes: 4 additions & 0 deletions kernel/src/engine/sync/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ impl StorageHandler for SyncStorageHandler {
});
Ok(Box::new(iter))
}

fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> {
unimplemented!("SyncStorageHandler does not implement copy");
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,10 @@ pub trait StorageHandler: AsAny {
&self,
files: Vec<FileSlice>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>>;

/// Copy a file atomically from source to destination. If the destination file already exists,
/// it must return Err(Error::FileAlreadyExists).
fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()>;
}

/// Provides JSON handling functionality to Delta Kernel.
Expand Down
3 changes: 3 additions & 0 deletions kernel/src/listed_log_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,9 @@ mod list_log_files_with_log_tail_tests {
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<bytes::Bytes>>>> {
panic!("read_files used");
}
fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()> {
panic!("copy used from {src} to {dest}");
}
}

// when log_tail covers the entire requested range, no filesystem listing should occur
Expand Down
Loading