diff --git a/Cargo.lock b/Cargo.lock index bef06ca17..71b236a2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1785,6 +1785,23 @@ dependencies = [ "zstd", ] +[[package]] +name = "icechunk-export" +version = "0.1.0" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "err-into", + "futures", + "icechunk", + "indicatif", + "itertools", + "thiserror 2.0.16", + "tokio", + "tokio-util", +] + [[package]] name = "icechunk-macros" version = "0.1.0" @@ -1804,6 +1821,7 @@ dependencies = [ "clap", "futures", "icechunk", + "icechunk-export", "itertools", "miette", "pyo3", @@ -2002,6 +2020,19 @@ dependencies = [ "serde", ] +[[package]] +name = "indicatif" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70a646d946d06bedbbc4cac4c218acf4bbf2d87757a784857025f4d447e4e1cd" +dependencies = [ + "console", + "portable-atomic", + "unicode-width 0.2.0", + "unit-prefix", + "web-time", +] + [[package]] name = "indoc" version = "2.0.5" @@ -3793,6 +3824,7 @@ dependencies = [ "futures-core", "futures-io", "futures-sink", + "futures-util", "pin-project-lite", "tokio", ] @@ -3975,6 +4007,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7de7d73e1754487cb58364ee906a499937a0dfabd86bcb980fa99ec8c8fa2ce" +[[package]] +name = "unit-prefix" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "323402cff2dd658f39ca17c789b502021b3f18707c91cdf22e3838e1b4023817" + [[package]] name = "unsafe-libyaml" version = "0.2.11" diff --git a/Cargo.toml b/Cargo.toml index 4d8604eca..2f1f3b003 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["icechunk", "icechunk-python", "icechunk-macros"] +members = ["icechunk", "icechunk-python", "icechunk-macros", "icechunk-export"] default-members = ["icechunk"] resolver = "2" diff --git a/icechunk-export/Cargo.toml b/icechunk-export/Cargo.toml new file mode 100644 index 000000000..5f01452d0 --- /dev/null +++ b/icechunk-export/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "icechunk-export" +version = "0.1.0" + +description = "Library to export Icechunk repositories" +repository = "https://github.com/earth-mover/icechunk" +homepage = "https://icechunk.io" +license = "Apache-2.0" +keywords = ["zarr", "xarray", "database"] +categories = ["database", "science", "science::geo"] +authors = ["Earthmover PBC"] +edition = "2024" + +[dependencies] +icechunk = { path = "../icechunk", version = "0.3.8" } +tokio = { version = "1.47.1", features = ["rt-multi-thread", "macros"] } +bytes = { version = "1.10.1" } +err-into = "1.0.1" +itertools = "0.14.0" +futures = "0.3.31" +chrono = "0.4.42" +indicatif = "0.18.0" +thiserror = "2.0.16" +tokio-util = { version = "0.7.16", features = ["rt"] } +async-trait = "0.1.89" + +[lints] +workspace = true diff --git a/icechunk-export/examples/export.rs b/icechunk-export/examples/export.rs new file mode 100644 index 000000000..f1f541a98 --- /dev/null +++ b/icechunk-export/examples/export.rs @@ -0,0 +1,41 @@ +use std::{path::Path, sync::Arc}; + +use icechunk::{ + Repository, Storage, config::S3Credentials, new_local_filesystem_storage, + new_s3_storage, +}; +use icechunk_export::{ProgressBars, export}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let repo_path = Path::new("./icechunk-python/tests/data/test-repo-v2"); + let source_storage = new_local_filesystem_storage(repo_path).await?; + // let source_storage = new_s3_storage( + // icechunk::config::S3Options { + // region: Some("us-east-1".to_string()), + // endpoint_url: None, + // anonymous: false, + // allow_http: false, + // force_path_style: false, + // network_stream_timeout_seconds: None, + // }, + // "icechunk-public-data".to_string(), + // Some("v1/glad".to_string()), + // Some(S3Credentials::FromEnv), + // )?; + + let source = Repository::open(None, source_storage, Default::default()).await?; + let destination_path = Path::new("/tmp/test-export"); + let destination = new_local_filesystem_storage(destination_path).await?; + export( + &source, + destination, + &icechunk_export::VersionSelection::AllHistory, + Arc::new(ProgressBars::new()), + 100, + ) + .await?; + println!("done"); + + Ok(()) +} diff --git a/icechunk-export/examples/saturate_chunks.rs b/icechunk-export/examples/saturate_chunks.rs new file mode 100644 index 000000000..2be7f1124 --- /dev/null +++ b/icechunk-export/examples/saturate_chunks.rs @@ -0,0 +1,13 @@ +use std::path::Path; + +use icechunk_export::test_saturation; + +#[tokio::main] +async fn main() { + //let path = Path::new("/tmp/chunk_ids.txt"); + let args: Vec = std::env::args().collect(); + let path = Path::new(args[1].as_str()); + let max_concurrent = args[2].parse().unwrap(); + let copy_chunks = args[3].parse().unwrap(); + test_saturation(max_concurrent, copy_chunks, path).await.unwrap(); +} diff --git a/icechunk-export/src/lib.rs b/icechunk-export/src/lib.rs new file mode 100644 index 000000000..2fe2292a1 --- /dev/null +++ b/icechunk-export/src/lib.rs @@ -0,0 +1,627 @@ +use std::{ + collections::HashSet, + sync::{Arc, atomic::AtomicUsize}, +}; + +use async_trait::async_trait; +use bytes::Bytes; +use chrono::Utc; +use icechunk::{ + Repository, Storage, + asset_manager::AssetManager, + error::ICError, + format::{ + ChunkId, IcechunkFormatError, IcechunkFormatErrorKind, IcechunkResult, + ManifestId, SnapshotId, + format_constants::SpecVersionBin, + manifest::ChunkPayload, + repo_info::{RepoInfo, UpdateType}, + snapshot::Snapshot, + }, + ops::gc::{GCConfig, find_retained}, + refs::Ref, + repository::{RepositoryError, RepositoryErrorKind, RepositoryResult}, + storage::{Settings, StorageErrorKind}, +}; +use indicatif::{MultiProgress, ProgressBar}; +use itertools::Itertools as _; +use tokio::{ + io::AsyncReadExt as _, + sync::{ + OwnedSemaphorePermit, Semaphore, + mpsc::{self, UnboundedReceiver, UnboundedSender}, + }, +}; +use tokio_util::task::TaskTracker; + +#[derive(Debug, PartialEq, Eq)] +pub enum VersionSelection { + SingleSnapshot(SnapshotId), + AllHistory, + // TODO: can we do refs here instead of String? + RefsHistory { + branches: Vec, + tags: Vec, + main_points_to: Option, + }, +} + +fn all_history_as_refs_history(repo: &RepoInfo) -> IcechunkResult { + let branches = repo.branch_names()?.map(|s| s.to_string()); + let tags = repo.tag_names()?.map(|s| s.to_string()); + Ok(VersionSelection::RefsHistory { + branches: branches.collect(), + tags: tags.collect(), + main_points_to: None, + }) +} + +fn collect_requested_snapshots( + repo: &RepoInfo, + branches: &Vec, + tags: &Vec, +) -> IcechunkResult> { + let branch_roots = branches + .iter() + .map(|name| repo.resolve_branch(name)) + .map_ok(|snap_id| repo.ancestry(&snap_id)); + let tag_roots = tags + .iter() + .map(|name| repo.resolve_tag(name)) + .map_ok(|snap_id| repo.ancestry(&snap_id)); + let mut res = HashSet::new(); + for it in branch_roots.chain(tag_roots) { + for snap_id in it?? { + if !res.insert(snap_id?.id) { + // we don'n need to continue because + // we have already seen this snapshot and all its ancestry + break; + } + } + } + Ok(res) +} + +fn select_snapshots( + repo: &RepoInfo, + versions: &VersionSelection, +) -> ExportResult> { + match versions { + VersionSelection::SingleSnapshot(snapshot_id) => { + Ok(HashSet::from([snapshot_id.clone()])) + } + VersionSelection::AllHistory => { + let selection = all_history_as_refs_history(repo) + .map_err(|e| ExportError::from(ExportErrorKind::FormatError(e.kind)))?; + select_snapshots(repo, &selection) + } + VersionSelection::RefsHistory { branches, tags, main_points_to } => { + if main_points_to.is_none() + && !branches.contains(&Ref::DEFAULT_BRANCH.to_string()) + { + return Err(ExportErrorKind::InvalidVersionSelection( + "target repository needs a `main` branch, if `main_points_to` is not specified, `main` must be included in the exported branches".to_string(), + ).into()); + } + let res = collect_requested_snapshots(repo, branches, tags) + .map_err(|e| ExportError::from(ExportErrorKind::FormatError(e.kind)))?; + if let Some(sid) = main_points_to + && !res.contains(sid) + { + return Err(ExportErrorKind::InvalidVersionSelection( + "target repository needs a valid `main` branch, the snapshot in `main_points_to` is not exported by this version selection".to_string(), + ).into()); + } + Ok(res) + } + } +} + +pub enum ObjectType { + Chunk, + Manifest, + TransactionLog, + Snapshot, +} + +enum Operation { + Copy { path: String, object_type: ObjectType, object_size: Option }, +} + +async fn calculate_diff( + requested_snaps: &HashSet, + destination: Arc, +) -> RepositoryResult<(HashSet, HashSet, HashSet)> { + // FIXME: tune caches + let destination = + Repository::open_or_create(None, destination, Default::default()).await?; + //TODO: bring outside of GC + let gc_config = GCConfig::clean_all( + Utc::now(), + Utc::now(), + None, + 50.try_into().unwrap(), + (50 * 1_024 * 1_024).try_into().unwrap(), + 500.try_into().unwrap(), + true, + ); + let (dest_chunks, dest_manifests, dest_snapshots) = + find_retained(destination.asset_manager().clone(), &gc_config).await?; + + let missing_snaps = requested_snaps - &dest_snapshots; + + Ok((missing_snaps, dest_manifests, dest_chunks)) +} + +pub type ObjectSize = u64; +pub type OperationResult = (ObjectType, ObjectSize); + +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum ExportErrorKind { + #[error(transparent)] + StorageError(StorageErrorKind), + #[error(transparent)] + RepositoryError(RepositoryErrorKind), + #[error(transparent)] + FormatError(IcechunkFormatErrorKind), + #[error("I/O error")] + IOError(#[from] std::io::Error), + + #[error("invalid selection of versions to export: {0}")] + InvalidVersionSelection(String), +} + +pub type ExportError = ICError; + +pub type ExportResult = Result; + +impl From for ExportError { + fn from(value: ExportErrorKind) -> Self { + Self::new(value) + } +} + +#[derive(Debug, Clone)] +struct Endpoint { + pub storage: Arc, + pub settings: Arc, +} + +async fn copy_object( + path: String, + object_type: ObjectType, + object_size: Option, + source: Endpoint, + destination: Endpoint, + result: UnboundedSender>, + semaphore: OwnedSemaphorePermit, +) { + let res = do_copy_object(path.as_str(), object_size, source, destination) + .await + .map(|size| (object_type, size)); + + drop(semaphore); + + // we can expect here because the result Receiver is never closed + #[allow(clippy::expect_used)] + result + .send(res) + .expect("Unexpected error: Failed to send copy_object operation response"); +} + +async fn do_copy_object( + path: &str, + object_size: Option, + source: Endpoint, + destination: Endpoint, +) -> ExportResult { + let range = object_size.map(|n| 0..n); + let (mut reader, _) = source + .storage + .get_object(source.settings.as_ref(), path, range.as_ref()) + .await + .map_err(|e| ExportError::from(ExportErrorKind::StorageError(e.kind)))?; + + // TODO: better capacity + let mut buffer = Vec::with_capacity(object_size.unwrap_or(1024 * 1024) as usize); + reader + .read_to_end(&mut buffer) + .await + .map_err(|e| ExportError::from(ExportErrorKind::IOError(e)))?; + let bytes = Bytes::from_owner(buffer); + let len = bytes.len() as u64; + + destination + .storage + .put_object( + destination.settings.as_ref(), + path, + bytes, + None, + Default::default(), + None, + ) + .await + .map_err(|e| ExportError::from(ExportErrorKind::StorageError(e.kind)))?; + Ok(len) +} + +async fn execute_operations( + mut rec: UnboundedReceiver, + result: UnboundedSender>, + source: Arc, + destination: Arc, + max_concurrent_operations: usize, + task_tracker: TaskTracker, +) -> usize { + let settings = Arc::new(source.default_settings()); + let source = Endpoint { storage: source, settings }; + let settings = Arc::new(destination.default_settings()); + let destination = Endpoint { storage: destination, settings }; + + let mut spawned = 0; + let semaphore = Arc::new(Semaphore::new(max_concurrent_operations)); + while let Some(op) = rec.recv().await { + spawned += 1; + match op { + Operation::Copy { path, object_type, object_size } => { + // we can expect here because the semaphore is never closed + #[allow(clippy::expect_used)] + let guard = + semaphore.clone().acquire_owned().await.expect( + "Unexpected error executing operation: semaphore is closed", + ); + task_tracker.spawn(copy_object( + path, + object_type, + object_size, + source.clone(), + destination.clone(), + result.clone(), + guard, + )); + } + } + } + spawned +} + +#[async_trait] +pub trait ProgresListener { + async fn completed(&self, object_type: ObjectType, object_size: u64); + async fn discovered(&self, object_type: ObjectType, count: u64); + async fn done(&self); +} + +#[derive(Debug)] +pub struct ProgressBars { + chunk_progress: ProgressBar, + manifest_progress: ProgressBar, + snapshot_progress: ProgressBar, + transaction_progress: ProgressBar, + bytes_progress: ProgressBar, + _multi: MultiProgress, +} + +impl Default for ProgressBars { + fn default() -> Self { + Self::new() + } +} + +impl ProgressBars { + pub fn new() -> Self { + let multi = indicatif::MultiProgress::new(); + // unwrapping is allowed when creating templates because the + // template string is verified to work + #[allow(clippy::unwrap_used)] + let bytes_sty = indicatif::ProgressStyle::with_template( + "{prefix:16.green} {binary_bytes} [{binary_bytes_per_sec}]", + ) + .unwrap(); + #[allow(clippy::unwrap_used)] + let chunks_sty = indicatif::ProgressStyle::with_template( + "{prefix:16.green} {bar:60} {human_pos:>}/{human_len} [{eta}]", + ) + .unwrap(); + #[allow(clippy::unwrap_used)] + let others_sty = indicatif::ProgressStyle::with_template( + "{prefix:16.green} {bar:60} {human_pos:>}/{human_len}", + ) + .unwrap(); + let bytes_progress = indicatif::ProgressBar::new(0); + let snapshot_progress = indicatif::ProgressBar::new(0); + let transaction_progress = indicatif::ProgressBar::new(0); + let manifest_progress = indicatif::ProgressBar::new(0); + let chunk_progress = indicatif::ProgressBar::new(0); + multi + .add(chunk_progress.clone()) + .with_style(chunks_sty.clone()) + .with_prefix("🧊 Chunks:"); + multi + .add(manifest_progress.clone()) + .with_style(others_sty.clone()) + .with_prefix("📜 Manifests:"); + multi + .add(snapshot_progress.clone()) + .with_style(others_sty.clone()) + .with_prefix("📸 Snapshots:"); + multi + .add(transaction_progress.clone()) + .with_style(others_sty.clone()) + .with_prefix("🤝 Transactions:"); + multi.add(bytes_progress.clone()).with_style(bytes_sty).with_prefix("💾 Copied:"); + ProgressBars { + chunk_progress, + manifest_progress, + snapshot_progress, + transaction_progress, + bytes_progress, + _multi: multi, + } + } +} + +#[async_trait] +impl ProgresListener for ProgressBars { + async fn completed(&self, object_type: ObjectType, object_size: u64) { + match object_type { + ObjectType::Chunk => self.chunk_progress.inc(1), + ObjectType::Manifest => self.manifest_progress.inc(1), + ObjectType::TransactionLog => self.transaction_progress.inc(1), + ObjectType::Snapshot => self.snapshot_progress.inc(1), + } + self.bytes_progress.inc(object_size); + } + async fn discovered(&self, object_type: ObjectType, count: u64) { + match object_type { + ObjectType::Chunk => self.chunk_progress.inc_length(count), + ObjectType::Manifest => self.manifest_progress.inc_length(count), + ObjectType::TransactionLog => self.transaction_progress.inc_length(count), + ObjectType::Snapshot => self.snapshot_progress.inc_length(count), + } + } + async fn done(&self) { + self.chunk_progress.abandon(); + self.manifest_progress.abandon(); + self.transaction_progress.abandon(); + self.snapshot_progress.abandon(); + self.bytes_progress.abandon(); + } +} + +async fn receive_operation_result( + mut rec: UnboundedReceiver>, + progress_listener: Arc, +) -> ExportResult { + let results = AtomicUsize::new(0); + while let Some(res) = rec.recv().await { + results.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + match res { + Ok((object_type, object_size)) => match object_type { + ObjectType::Chunk => { + progress_listener.completed(ObjectType::Chunk, object_size).await; + } + ObjectType::Manifest => { + progress_listener.completed(ObjectType::Manifest, object_size).await; + } + ObjectType::TransactionLog => { + progress_listener + .completed(ObjectType::TransactionLog, object_size) + .await; + } + ObjectType::Snapshot => { + progress_listener.completed(ObjectType::Snapshot, object_size).await; + } + }, + Err(err) => return Err(err), + } + } + Ok(results.into_inner()) +} + +pub async fn export( + source: &Repository, + destination: Arc, + versions: &VersionSelection, + progress_listener: Arc, + max_concurrent_operations: usize, +) -> Result<(), Box> { + if source.spec_version() < SpecVersionBin::V2dot0 { + return Err("export cannot run on Icechunk version 1 repositories.".into()); + } + + let (source_repo_info, _) = source.asset_manager().fetch_repo_info().await?; + let requested_snaps = select_snapshots(&source_repo_info, versions)?; + let (missing_snapshots, mut dest_manifests, mut dest_chunks) = + calculate_diff(&requested_snaps, destination.clone()).await?; + progress_listener + .discovered(ObjectType::Snapshot, missing_snapshots.len() as u64) + .await; + progress_listener + .discovered(ObjectType::TransactionLog, missing_snapshots.len() as u64) + .await; + + // TODO: should we limit these channels? + let (op_result_sender, op_result_receiver) = mpsc::unbounded_channel(); + let (op_execute_sender, op_execute_receiver) = mpsc::unbounded_channel(); + + let operations_tracker = TaskTracker::new(); + + let op_exec_handle = tokio::spawn(execute_operations( + op_execute_receiver, + op_result_sender, + source.storage().clone(), + destination.clone(), + max_concurrent_operations, + operations_tracker.clone(), + )); + + let op_result_handle = tokio::spawn(receive_operation_result( + op_result_receiver, + Arc::clone(&progress_listener), + )); + + for snap_id in missing_snapshots { + let snap = source.asset_manager().fetch_snapshot(&snap_id).await?; + for mfile in snap.manifest_files() { + if !dest_manifests.contains(&mfile.id) { + progress_listener.discovered(ObjectType::Manifest, 1).await; + dest_manifests.insert(mfile.id.clone()); + let manifest = source + .asset_manager() + .fetch_manifest(&mfile.id, mfile.size_bytes) + .await?; + + //copy all chunks + for chunk_payload in manifest.chunk_payloads() { + match chunk_payload? { + ChunkPayload::Inline(_) => { //TODO: materialize + } + ChunkPayload::Virtual(_) => { //TODO: materialize + } + ChunkPayload::Ref(chunk_ref) => { + dest_chunks.insert(chunk_ref.id.clone()); + progress_listener.discovered(ObjectType::Chunk, 1).await; + let path = AssetManager::chunk_path(&chunk_ref.id); + let op = Operation::Copy { + path, + object_type: ObjectType::Chunk, + object_size: Some(chunk_ref.length), + }; + op_execute_sender.send(op)?; + } + _ => { + return Err( + "bug in export, unknown chunk payload type".into() + ); + } + } + } + + //copy manifest + let path = AssetManager::manifest_path(&manifest.id()); + let op = Operation::Copy { + path, + object_type: ObjectType::Manifest, + object_size: Some(mfile.size_bytes), + }; + op_execute_sender.send(op)?; + } + } + // copy tx log + let path = AssetManager::transaction_path(&snap.id()); + let op = Operation::Copy { + path, + object_size: None, + object_type: ObjectType::TransactionLog, + }; + op_execute_sender.send(op)?; + + // copy snapshot + let path = AssetManager::snapshot_path(&snap.id()); + let op = Operation::Copy { + path, + object_size: None, + object_type: ObjectType::Snapshot, + }; + op_execute_sender.send(op)?; + } + + drop(op_execute_sender); + let spawned_ops = op_exec_handle.await?; + let received_results = op_result_handle.await??; + + operations_tracker.close(); + operations_tracker.wait().await; + + let new_repo_info = + update_repo_info(source_repo_info.as_ref(), &requested_snaps, versions); + + let x = destination.; + + progress_listener.done().await; + + dbg!(spawned_ops); + dbg!(received_results); + println!("---------------- {}", spawned_ops); + println!(" ///////// {received_results}"); + Ok(()) +} + +fn update_repo_info( + original: &RepoInfo, + requested_snaps: &HashSet, + selection: &VersionSelection, +) -> IcechunkResult { + match selection { + VersionSelection::SingleSnapshot(snapshot_id) => { + let tags = original.tags()?.filter(|(_, sid)| requested_snaps.contains(sid)); + let branches = original.branches()?.filter_map(|(name, sid)| { + if name == Ref::DEFAULT_BRANCH { + Some((Ref::DEFAULT_BRANCH, snapshot_id.clone())) + } else if requested_snaps.contains(&sid) { + Some((name, sid)) + } else { + None + } + }); + let initial_snap = original.find_snapshot(&Snapshot::INITIAL_SNAPSHOT_ID)?; + let mut target_snap = original.find_snapshot(snapshot_id)?; + target_snap.parent_id = Some(initial_snap.id.clone()); + + RepoInfo::new( + tags, + branches, + std::iter::empty(), + [initial_snap, target_snap], + &UpdateType::RepoInitializedUpdate, + None, + ) + } + VersionSelection::AllHistory => { + let snapshots: Vec<_> = original.all_snapshots()?.try_collect()?; + RepoInfo::new( + original.tags()?, + original.branches()?, + std::iter::empty(), + snapshots, + &UpdateType::RepoInitializedUpdate, + None, + ) + } + VersionSelection::RefsHistory { main_points_to, branches, .. } => { + assert!( + branches.contains(&Ref::DEFAULT_BRANCH.to_string()) + || main_points_to.is_some() + ); + let tags = original.tags()?.filter(|(_, sid)| requested_snaps.contains(sid)); + let branches = original.branches()?.filter_map(|(name, sid)| { + // at the beginning of export is checked that main_points_to points to an exported + // snapshot + if name == Ref::DEFAULT_BRANCH + && let Some(sid) = main_points_to + { + Some((Ref::DEFAULT_BRANCH, sid.clone())) + } else if requested_snaps.contains(&sid) { + Some((name, sid)) + } else { + None + } + }); + let snapshots: Vec<_> = original.all_snapshots()?.try_collect()?; + RepoInfo::new( + tags, + branches, + std::iter::empty(), + snapshots, + &UpdateType::RepoInitializedUpdate, + None, + ) + } + } +} + +#[cfg(test)] +mod tests {} diff --git a/icechunk-python/Cargo.toml b/icechunk-python/Cargo.toml index 6198cfa65..b03154d83 100644 --- a/icechunk-python/Cargo.toml +++ b/icechunk-python/Cargo.toml @@ -22,6 +22,7 @@ bytes = "1.10.1" chrono = { version = "0.4.42" } futures = "0.3.31" icechunk = { path = "../icechunk", version = "0.3.9", features = ["logs"] } +icechunk-export = { path = "../icechunk-export", version = "0.1.0" } itertools = "0.14.0" pyo3 = { version = "0.24.2", features = [ "chrono", diff --git a/icechunk-python/python/icechunk/__init__.py b/icechunk-python/python/icechunk/__init__.py index ee9597aec..9ebcdb7b9 100644 --- a/icechunk-python/python/icechunk/__init__.py +++ b/icechunk-python/python/icechunk/__init__.py @@ -22,6 +22,10 @@ Credentials, Diff, ExpirationRanUpdate, + ExportAllHistory, + ExportRefsHistory, + ExportSingleSnapshot, + ExportVersionSelection, GCRanUpdate, GcsBearerCredential, GcsCredentials, @@ -56,6 +60,7 @@ VirtualChunkContainer, VirtualChunkSpec, __version__, + _export_repository, _upgrade_icechunk_repository, initialize_logs, set_logs_filter, @@ -127,6 +132,10 @@ "Credentials", "Diff", "ExpirationRanUpdate", + "ExportAllHistory", + "ExportRefsHistory", + "ExportSingleSnapshot", + "ExportVersionSelection", "ForkSession", "GCRanUpdate", "GCSummary", @@ -260,6 +269,12 @@ def upgrade_icechunk_repository( ) +def export_repository( + repo: Repository, destination: Storage, versions: ExportVersionSelection +) -> None: + _export_repository(repo._repository, destination, versions) + + ManifestSplittingConfig.from_dict = from_dict # type: ignore[method-assign] ManifestSplittingConfig.to_dict = to_dict # type: ignore[attr-defined] diff --git a/icechunk-python/src/errors.rs b/icechunk-python/src/errors.rs index d1bb5329d..6c6eef156 100644 --- a/icechunk-python/src/errors.rs +++ b/icechunk-python/src/errors.rs @@ -2,7 +2,7 @@ use icechunk::{ StorageError, format::{IcechunkFormatError, manifest::VirtualReferenceError}, migrations::MigrationError, - ops::{gc::GCError, manifests::ManifestOpsError}, + ops::manifests::ManifestOpsError, repository::RepositoryError, session::{SessionError, SessionErrorKind}, store::{StoreError, StoreErrorKind}, @@ -40,8 +40,6 @@ pub(crate) enum PyIcechunkStoreError { #[error(transparent)] IcechunkFormatError(#[from] IcechunkFormatError), #[error(transparent)] - GCError(#[from] GCError), - #[error(transparent)] ManifestOpsError(#[from] ManifestOpsError), #[error(transparent)] VirtualReferenceError(#[from] VirtualReferenceError), diff --git a/icechunk-python/src/export.rs b/icechunk-python/src/export.rs new file mode 100644 index 000000000..d0d33af29 --- /dev/null +++ b/icechunk-python/src/export.rs @@ -0,0 +1,49 @@ +use pyo3::{pyclass, pymethods}; + +#[pyclass(name = "ExportVersionSelection", eq, subclass)] +#[derive(Debug, PartialEq, Eq)] +pub struct PyVersionSelection; + +#[pyclass(name = "ExportSingleSnapshot", eq, extends=PyVersionSelection)] +#[derive(Debug, PartialEq, Eq)] +pub struct PySingleSnapshot { + #[pyo3(get)] + pub snapshot_id: String, +} + +#[pymethods] +impl PySingleSnapshot { + #[new] + fn new(snapshot_id: String) -> (Self, PyVersionSelection) { + (PySingleSnapshot { snapshot_id }, PyVersionSelection) + } +} + +#[pyclass(name = "ExportAllHistory", eq, extends=PyVersionSelection)] +#[derive(Debug, PartialEq, Eq)] +pub struct PyAllHistory; + +#[pymethods] +impl PyAllHistory { + #[new] + fn new() -> (Self, PyVersionSelection) { + (PyAllHistory, PyVersionSelection) + } +} + +#[pyclass(name = "ExportRefsHistory", eq, extends=PyVersionSelection)] +#[derive(Debug, PartialEq, Eq)] +pub struct PyRefsHistory { + #[pyo3(get)] + pub branches: Vec, + #[pyo3(get)] + pub tags: Vec, +} + +#[pymethods] +impl PyRefsHistory { + #[new] + fn new(branches: Vec, tags: Vec) -> (Self, PyVersionSelection) { + (PyRefsHistory { branches, tags }, PyVersionSelection) + } +} diff --git a/icechunk-python/src/lib.rs b/icechunk-python/src/lib.rs index dead4421c..cf779051a 100644 --- a/icechunk-python/src/lib.rs +++ b/icechunk-python/src/lib.rs @@ -1,6 +1,7 @@ mod config; mod conflicts; mod errors; +mod export; mod pickle; mod repository; mod session; @@ -152,6 +153,16 @@ fn _upgrade_icechunk_repository( repo.migrate_1_to_2(py, dry_run, delete_unused_v1_files) } +#[pyfunction] +fn _export_repository<'py>( + py: Python<'py>, + source: &PyRepository, + destination: PyStorage, + versions: &Bound<'py, PyAny>, +) -> PyResult<()> { + source.export(py, destination, versions) +} + fn pep440_version() -> String { let cargo_version = env!("CARGO_PKG_VERSION"); cargo_version.replace("-rc.", "rc").replace("-alpha.", "a").replace("-beta.", "b") @@ -210,12 +221,17 @@ fn _icechunk_python(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_function(wrap_pyfunction!(initialize_logs, m)?)?; m.add_function(wrap_pyfunction!(set_logs_filter, m)?)?; m.add_function(wrap_pyfunction!(spec_version, m)?)?; m.add_function(wrap_pyfunction!(cli_entrypoint, m)?)?; m.add_function(wrap_pyfunction!(_upgrade_icechunk_repository, m)?)?; + m.add_function(wrap_pyfunction!(_export_repository, m)?)?; m.add("__version__", pep440_version())?; // Exceptions diff --git a/icechunk-python/src/repository.rs b/icechunk-python/src/repository.rs index 03905eac0..ac53a4863 100644 --- a/icechunk-python/src/repository.rs +++ b/icechunk-python/src/repository.rs @@ -42,6 +42,7 @@ use crate::{ format_option_to_string, }, errors::PyIcechunkStoreError, + export::{PyAllHistory, PyRefsHistory, PySingleSnapshot}, impl_pickle, session::PySession, streams::PyAsyncGenerator, @@ -748,6 +749,40 @@ impl PyRepository { }) }) } + + pub fn export<'py>( + &self, + py: Python<'py>, + destination: PyStorage, + versions: &Bound<'py, PyAny>, + ) -> PyResult<()> { + let versions = if let Ok(_) = versions.extract::>() { + icechunk_export::VersionSelection::AllHistory + } else if let Ok(cell) = versions.extract::>() { + icechunk_export::VersionSelection::SingleSnapshot( + cell.snapshot_id.clone().try_into().unwrap(), //FIXME: unwrap + ) + } else if let Ok(cell) = versions.extract::>() { + icechunk_export::VersionSelection::RefsHistory { + tags: cell.tags.clone(), + branches: cell.branches.clone(), + } + } else { + panic!("unknown versions value") // FIXME: panic + }; + py.allow_threads(move || { + pyo3_async_runtimes::tokio::get_runtime().block_on(async move { + let repo = self.0.write().await; + let destination = destination.0; + let progress = Arc::new(icechunk_export::ProgressBars::default()); + // FIXME: configuration + icechunk_export::export(&repo, destination, &versions, progress, 100) + .await + .unwrap(); //FIXME:unwrap + Ok(()) + }) + }) + } } #[pymethods] @@ -1806,7 +1841,7 @@ impl PyRepository { }, ) .await - .map_err(PyIcechunkStoreError::GCError)?; + .map_err(PyIcechunkStoreError::RepositoryError)?; Ok::<_, PyIcechunkStoreError>( result .released_snapshots @@ -1850,7 +1885,7 @@ impl PyRepository { }, ) .await - .map_err(PyIcechunkStoreError::GCError)?; + .map_err(PyIcechunkStoreError::RepositoryError)?; Ok(result.released_snapshots.iter().map(|id| id.to_string()).collect()) }) } @@ -1883,7 +1918,7 @@ impl PyRepository { }; let result = garbage_collect(asset_manager, &gc_config) .await - .map_err(PyIcechunkStoreError::GCError)?; + .map_err(PyIcechunkStoreError::RepositoryError)?; Ok::<_, PyIcechunkStoreError>(result.into()) })?; @@ -1917,7 +1952,7 @@ impl PyRepository { }; let result = garbage_collect(asset_manager, &gc_config) .await - .map_err(PyIcechunkStoreError::GCError)?; + .map_err(PyIcechunkStoreError::RepositoryError)?; Ok(result.into()) }) } diff --git a/icechunk/src/asset_manager.rs b/icechunk/src/asset_manager.rs index 8de1ee87d..a71ffa536 100644 --- a/icechunk/src/asset_manager.rs +++ b/icechunk/src/asset_manager.rs @@ -452,6 +452,22 @@ impl AssetManager { .await } + pub fn chunk_path(id: &ChunkId) -> String { + format!("{CHUNKS_FILE_PATH}/{id}") + } + + pub fn manifest_path(id: &ManifestId) -> String { + format!("{MANIFESTS_FILE_PATH}/{id}") + } + + pub fn snapshot_path(id: &SnapshotId) -> String { + format!("{SNAPSHOTS_FILE_PATH}/{id}") + } + + pub fn transaction_path(id: &SnapshotId) -> String { + format!("{TRANSACTION_LOGS_FILE_PATH}/{id}") + } + #[instrument(skip(self, bytes))] pub async fn write_chunk( &self, @@ -460,7 +476,6 @@ impl AssetManager { ) -> RepositoryResult<()> { trace!(%chunk_id, size_bytes=bytes.len(), "Writing chunk"); - let path = format!("{CHUNKS_FILE_PATH}/{chunk_id}"); let _permit = self.request_semaphore.acquire().await?; let settings = storage::Settings { storage_class: self.storage_settings.chunks_storage_class().cloned(), @@ -468,7 +483,14 @@ impl AssetManager { }; // we don't pre-populate the chunk cache, there are too many of them for this to be useful self.storage - .put_object(&settings, path.as_str(), bytes, None, Default::default(), None) + .put_object( + &settings, + Self::chunk_path(&chunk_id).as_str(), + bytes, + None, + Default::default(), + None, + ) .await? .must_write()?; Ok(()) @@ -485,11 +507,14 @@ impl AssetManager { Ok(chunk) => Ok(chunk), Err(guard) => { trace!(%chunk_id, ?range, "Downloading chunk"); - let path = format!("{CHUNKS_FILE_PATH}/{chunk_id}"); let permit = self.request_semaphore.acquire().await?; let (read, _) = self .storage - .get_object(&self.storage_settings, &path, Some(range)) + .get_object( + &self.storage_settings, + &Self::chunk_path(chunk_id), + Some(range), + ) .await?; let chunk = async_reader_to_bytes(read, (range.end - range.start) as usize) @@ -507,11 +532,13 @@ impl AssetManager { snapshot_id: &SnapshotId, ) -> RepositoryResult> { debug!(%snapshot_id, "Getting snapshot timestamp"); - let path = format!("{SNAPSHOTS_FILE_PATH}/{snapshot_id}"); let _permit = self.request_semaphore.acquire().await?; Ok(self .storage - .get_object_last_modified(path.as_str(), &self.storage_settings) + .get_object_last_modified( + Self::snapshot_path(snapshot_id).as_str(), + &self.storage_settings, + ) .await?) } @@ -785,7 +812,6 @@ async fn write_new_manifest( let len = buffer.len() as u64; debug!(%id, size_bytes=len, "Writing manifest"); - let path = format!("{MANIFESTS_FILE_PATH}/{id}"); let settings = storage::Settings { storage_class: storage_settings.metadata_storage_class().cloned(), ..storage_settings.clone() @@ -793,7 +819,14 @@ async fn write_new_manifest( let _permit = semaphore.acquire().await?; storage - .put_object(&settings, path.as_str(), buffer.into(), None, metadata, None) + .put_object( + &settings, + AssetManager::manifest_path(&id).as_str(), + buffer.into(), + None, + metadata, + None, + ) .await? .must_write()?; Ok(len) @@ -808,12 +841,17 @@ async fn fetch_manifest( ) -> RepositoryResult> { debug!(%manifest_id, "Downloading manifest"); - let path = format!("{MANIFESTS_FILE_PATH}/{manifest_id}"); let range = 0..manifest_size; let range = if manifest_size > 0 { Some(&range) } else { None }; let _permit = semaphore.acquire().await?; - let (read, _) = storage.get_object(storage_settings, path.as_str(), range).await?; + let (read, _) = storage + .get_object( + storage_settings, + AssetManager::manifest_path(manifest_id).as_str(), + range, + ) + .await?; let span = Span::current(); tokio::task::spawn_blocking(move || { @@ -887,7 +925,7 @@ async fn write_new_snapshot( .await??; debug!(%id, size_bytes=buffer.len(), "Writing snapshot"); - let path = format!("{SNAPSHOTS_FILE_PATH}/{id}"); + let path = AssetManager::snapshot_path(&id); let settings = storage::Settings { storage_class: storage_settings.metadata_storage_class().cloned(), ..storage_settings.clone() @@ -910,7 +948,7 @@ async fn fetch_snapshot( debug!(%snapshot_id, "Downloading snapshot"); let _permit = semaphore.acquire().await?; - let path = format!("{SNAPSHOTS_FILE_PATH}/{snapshot_id}"); + let path = AssetManager::snapshot_path(snapshot_id); let (read, _) = storage.get_object(storage_settings, path.as_str(), None).await?; let span = Span::current(); @@ -969,7 +1007,7 @@ async fn write_new_tx_log( .await??; debug!(%transaction_id, size_bytes=buffer.len(), "Writing transaction log"); - let path = format!("{TRANSACTION_LOGS_FILE_PATH}/{transaction_id}"); + let path = AssetManager::transaction_path(&transaction_id); let settings = storage::Settings { storage_class: storage_settings.metadata_storage_class().cloned(), ..storage_settings.clone() @@ -991,7 +1029,7 @@ async fn fetch_transaction_log( semaphore: &Semaphore, ) -> RepositoryResult> { debug!(%transaction_id, "Downloading transaction log"); - let path = format!("{TRANSACTION_LOGS_FILE_PATH}/{transaction_id}"); + let path = AssetManager::transaction_path(transaction_id); let _permit = semaphore.acquire().await?; let (read, _) = storage.get_object(storage_settings, path.as_str(), None).await?; @@ -1270,7 +1308,7 @@ mod test { // when we insert we cache, so no fetches assert_eq!( logging.fetch_operations(), - vec![("put_object".to_string(), format!("{MANIFESTS_FILE_PATH}/{id}"))] + vec![("put_object".to_string(), AssetManager::manifest_path(&id))] ); // first time it sees an ID it calls the backend diff --git a/icechunk/src/format/mod.rs b/icechunk/src/format/mod.rs index 0a9759eac..12dcaa5ba 100644 --- a/icechunk/src/format/mod.rs +++ b/icechunk/src/format/mod.rs @@ -360,7 +360,9 @@ pub mod format_constants { } #[repr(u8)] - #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] + #[derive( + Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Ord, + )] pub enum SpecVersionBin { V1dot0 = 1u8, V2dot0 = 2u8, diff --git a/icechunk/src/ops/gc.rs b/icechunk/src/ops/gc.rs index 173ed6570..c72629b3a 100644 --- a/icechunk/src/ops/gc.rs +++ b/icechunk/src/ops/gc.rs @@ -14,13 +14,13 @@ use tracing::{debug, info, instrument}; use crate::{ asset_manager::AssetManager, format::{ - ChunkId, FileTypeTag, IcechunkFormatError, ManifestId, ObjectId, SnapshotId, + ChunkId, FileTypeTag, ManifestId, ObjectId, SnapshotId, manifest::{ChunkPayload, Manifest}, repo_info::{RepoInfo, UpdateType}, snapshot::{ManifestFileInfo, Snapshot, SnapshotInfo}, }, ops::pointed_snapshots, - refs::{Ref, RefError}, + refs::Ref, repository::{RepositoryError, RepositoryErrorKind, RepositoryResult}, storage::{DeleteObjectsResult, ListInfo, VersionInfo}, stream_utils::{StreamLimiter, try_unique_stream}, @@ -170,18 +170,6 @@ pub struct GCSummary { pub transaction_logs_deleted: u64, } -#[derive(Debug, thiserror::Error)] -pub enum GCError { - #[error("ref error {0}")] - Ref(#[from] RefError), - #[error("repository error {0}")] - Repository(#[from] RepositoryError), - #[error("format error {0}")] - FormatError(#[from] IcechunkFormatError), -} - -pub type GCResult = Result; - async fn snapshot_retained( keep_snapshots: Arc>>, snap: Arc, @@ -250,10 +238,10 @@ async fn chunks_retained( } #[instrument(skip_all)] -async fn find_retained( +pub async fn find_retained( asset_manager: Arc, config: &GCConfig, -) -> GCResult<(HashSet, HashSet, HashSet)> { +) -> RepositoryResult<(HashSet, HashSet, HashSet)> { let all_snaps = pointed_snapshots(Arc::clone(&asset_manager), &config.extra_roots).await?; @@ -316,12 +304,12 @@ async fn find_retained( pub async fn garbage_collect( asset_manager: Arc, config: &GCConfig, -) -> GCResult { +) -> RepositoryResult { if !asset_manager.can_write_to_storage() { - return Err(GCError::Repository( - RepositoryErrorKind::ReadonlyStorage("Cannot garbage collect".to_string()) - .into(), - )); + return Err(RepositoryErrorKind::ReadonlyStorage( + "Cannot garbage collect".to_string(), + ) + .into()); } // TODO: this function could have much more parallelism @@ -404,7 +392,7 @@ async fn delete_snapshots_from_repo_info( keep_snapshots: &HashSet, repo_info: Arc, repo_info_version: &VersionInfo, -) -> GCResult<()> { +) -> RepositoryResult<()> { let kept_snaps: Vec<_> = repo_info .all_snapshots()? .filter_ok(|si| keep_snapshots.contains(&si.id)) @@ -449,7 +437,7 @@ async fn gc_chunks( asset_manager: &AssetManager, config: &GCConfig, keep_ids: &HashSet, -) -> GCResult { +) -> RepositoryResult { tracing::info!("Deleting chunks"); let to_delete = asset_manager .list_chunks() @@ -477,7 +465,7 @@ async fn gc_manifests( asset_manager: &AssetManager, config: &GCConfig, keep_ids: &HashSet, -) -> GCResult { +) -> RepositoryResult { tracing::info!("Deleting manifests"); let to_delete = asset_manager .list_manifests() @@ -508,7 +496,7 @@ async fn gc_snapshots( asset_manager: &AssetManager, config: &GCConfig, keep_ids: &HashSet, -) -> GCResult { +) -> RepositoryResult { tracing::info!("Deleting snapshots"); let to_delete = asset_manager .list_snapshots() @@ -539,7 +527,7 @@ async fn gc_transaction_logs( asset_manager: &AssetManager, config: &GCConfig, keep_ids: &HashSet, -) -> GCResult { +) -> RepositoryResult { tracing::info!("Deleting transaction logs"); let to_delete = asset_manager .list_transaction_logs() @@ -600,21 +588,23 @@ pub async fn expire( older_than: DateTime, expired_branches: ExpiredRefAction, expired_tags: ExpiredRefAction, -) -> GCResult { +) -> RepositoryResult { if !asset_manager.can_write_to_storage() { - return Err(GCError::Repository( - RepositoryErrorKind::ReadonlyStorage("Cannot expire".to_string()).into(), - )); + return Err( + RepositoryErrorKind::ReadonlyStorage("Cannot expire".to_string()).into() + ); } info!("Expiration started"); let (repo_info, repo_info_version) = asset_manager.fetch_repo_info().await?; let tags: Vec<(Ref, SnapshotId)> = repo_info .tags()? - .map(|(name, snap)| Ok::<_, GCError>((Ref::Tag(name.to_string()), snap))) + .map(|(name, snap)| Ok::<_, RepositoryError>((Ref::Tag(name.to_string()), snap))) .try_collect()?; let branches: Vec<(Ref, SnapshotId)> = repo_info .branches()? - .map(|(name, snap)| Ok::<_, GCError>((Ref::Branch(name.to_string()), snap))) + .map(|(name, snap)| { + Ok::<_, RepositoryError>((Ref::Branch(name.to_string()), snap)) + }) .try_collect()?; fn split_root( @@ -647,7 +637,7 @@ pub async fn expire( } }; - Ok::<_, GCError>(res) + Ok::<_, RepositoryError>(res) }, )?; diff --git a/icechunk/src/storage/s3.rs b/icechunk/src/storage/s3.rs index c4ac04d30..217d8ae64 100644 --- a/icechunk/src/storage/s3.rs +++ b/icechunk/src/storage/s3.rs @@ -302,7 +302,7 @@ impl S3Storage { } if let Some(klass) = settings.storage_class() { - let klass = klass.as_str().into(); + let klass = klass.clone(); req = req.storage_class(klass); }