From 0a8b0aadab4b820f0bf3229812c70399e6c269f8 Mon Sep 17 00:00:00 2001 From: Sebastian Galkin Date: Sat, 6 Sep 2025 15:25:05 -0300 Subject: [PATCH 1/2] python bindings' --- Cargo.lock | 34 ++ Cargo.toml | 2 +- icechunk-export/Cargo.toml | 25 + icechunk-export/examples/export.rs | 33 ++ icechunk-export/examples/saturate_chunks.rs | 13 + icechunk-export/src/lib.rs | 588 ++++++++++++++++++++ icechunk-python/Cargo.toml | 1 + icechunk-python/python/icechunk/__init__.py | 15 + icechunk-python/src/errors.rs | 4 +- icechunk-python/src/export.rs | 49 ++ icechunk-python/src/lib.rs | 16 + icechunk-python/src/repository.rs | 39 +- icechunk/src/asset_manager.rs | 60 +- icechunk/src/ops/gc.rs | 56 +- 14 files changed, 881 insertions(+), 54 deletions(-) create mode 100644 icechunk-export/Cargo.toml create mode 100644 icechunk-export/examples/export.rs create mode 100644 icechunk-export/examples/saturate_chunks.rs create mode 100644 icechunk-export/src/lib.rs create mode 100644 icechunk-python/src/export.rs diff --git a/Cargo.lock b/Cargo.lock index bef06ca17..d292b81cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1785,6 +1785,20 @@ dependencies = [ "zstd", ] +[[package]] +name = "icechunk-export" +version = "0.1.0" +dependencies = [ + "bytes", + "chrono", + "err-into", + "futures", + "icechunk", + "indicatif", + "itertools", + "tokio", +] + [[package]] name = "icechunk-macros" version = "0.1.0" @@ -1804,6 +1818,7 @@ dependencies = [ "clap", "futures", "icechunk", + "icechunk-export", "itertools", "miette", "pyo3", @@ -2002,6 +2017,19 @@ dependencies = [ "serde", ] +[[package]] +name = "indicatif" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70a646d946d06bedbbc4cac4c218acf4bbf2d87757a784857025f4d447e4e1cd" +dependencies = [ + "console", + "portable-atomic", + "unicode-width 0.2.0", + "unit-prefix", + "web-time", +] + [[package]] name = "indoc" version = "2.0.5" @@ -3975,6 +4003,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7de7d73e1754487cb58364ee906a499937a0dfabd86bcb980fa99ec8c8fa2ce" +[[package]] +name = "unit-prefix" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "323402cff2dd658f39ca17c789b502021b3f18707c91cdf22e3838e1b4023817" + [[package]] name = "unsafe-libyaml" version = "0.2.11" diff --git a/Cargo.toml b/Cargo.toml index 4d8604eca..2f1f3b003 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["icechunk", "icechunk-python", "icechunk-macros"] +members = ["icechunk", "icechunk-python", "icechunk-macros", "icechunk-export"] default-members = ["icechunk"] resolver = "2" diff --git a/icechunk-export/Cargo.toml b/icechunk-export/Cargo.toml new file mode 100644 index 000000000..c82c61e38 --- /dev/null +++ b/icechunk-export/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "icechunk-export" +version = "0.1.0" + +description = "Library to export Icechunk repositories" +repository = "https://github.com/earth-mover/icechunk" +homepage = "https://icechunk.io" +license = "Apache-2.0" +keywords = ["zarr", "xarray", "database"] +categories = ["database", "science", "science::geo"] +authors = ["Earthmover PBC"] +edition = "2024" + +[dependencies] +icechunk = { path = "../icechunk", version = "0.3.8" } +tokio = { version = "1.47.1", features = ["rt-multi-thread", "macros"] } +bytes = { version = "1.10.1" } +err-into = "1.0.1" +itertools = "0.14.0" +futures = "0.3.31" +chrono = "0.4.42" +indicatif = "0.18.0" + +[lints] +workspace = true diff --git a/icechunk-export/examples/export.rs b/icechunk-export/examples/export.rs new file mode 100644 index 000000000..70695891f --- /dev/null +++ b/icechunk-export/examples/export.rs @@ -0,0 +1,33 @@ +use std::{path::Path, sync::Arc}; + +use icechunk::{ + Repository, Storage, config::S3Credentials, new_local_filesystem_storage, + new_s3_storage, +}; +use icechunk_export::export; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let repo_path = Path::new("./icechunk-python/tests/data/test-repo-v2"); + let source_storage = new_local_filesystem_storage(repo_path).await?; + // let source_storage = new_s3_storage( + // icechunk::config::S3Options { + // region: Some("us-east-1".to_string()), + // endpoint_url: None, + // anonymous: false, + // allow_http: false, + // force_path_style: false, + // network_stream_timeout_seconds: None, + // }, + // "icechunk-public-data".to_string(), + // Some("v1/glad".to_string()), + // Some(S3Credentials::FromEnv), + // )?; + + let source = Repository::open(None, source_storage, Default::default()).await?; + let destination_path = Path::new("/tmp/test-export"); + let destination = new_local_filesystem_storage(destination_path).await?; + export(&source, destination, &icechunk_export::VersionSelection::AllHistory).await?; + + Ok(()) +} diff --git a/icechunk-export/examples/saturate_chunks.rs b/icechunk-export/examples/saturate_chunks.rs new file mode 100644 index 000000000..2be7f1124 --- /dev/null +++ b/icechunk-export/examples/saturate_chunks.rs @@ -0,0 +1,13 @@ +use std::path::Path; + +use icechunk_export::test_saturation; + +#[tokio::main] +async fn main() { + //let path = Path::new("/tmp/chunk_ids.txt"); + let args: Vec = std::env::args().collect(); + let path = Path::new(args[1].as_str()); + let max_concurrent = args[2].parse().unwrap(); + let copy_chunks = args[3].parse().unwrap(); + test_saturation(max_concurrent, copy_chunks, path).await.unwrap(); +} diff --git a/icechunk-export/src/lib.rs b/icechunk-export/src/lib.rs new file mode 100644 index 000000000..9d22ac59f --- /dev/null +++ b/icechunk-export/src/lib.rs @@ -0,0 +1,588 @@ +use std::{ + collections::HashSet, + fs::File, + hash::Hash, + io::{BufRead as _, BufReader}, + path::Path, + sync::{Arc, atomic::AtomicU64}, + time::{Duration, Instant}, +}; + +use bytes::Bytes; +use chrono::Utc; +use err_into::ErrorInto; +use futures::{ + SinkExt, + stream::{self, FuturesUnordered}, +}; +use icechunk::{ + Repository, Storage, StorageError, + asset_manager::AssetManager, + config::S3Options, + format::{ + ChunkId, ManifestId, SnapshotId, manifest::ChunkPayload, repo_info::RepoInfo, + }, + new_s3_storage, + ops::gc::{GCConfig, find_retained}, + repository::{RepositoryError, RepositoryResult}, + storage::Settings, +}; +use indicatif::ProgressBar; +use itertools::Itertools as _; +use tokio::{ + io::AsyncReadExt as _, + sync::{ + OwnedSemaphorePermit, Semaphore, + mpsc::{self, Receiver, Sender}, + }, +}; + +// fn parse_file_lines>( +// path: P, +// ) -> impl Iterator>> { +// let file = File::open(path).expect("Failed to open file"); +// let reader = BufReader::new(file); +// +// reader.lines().map(|line_result| { +// let line = line_result?; +// let parts: Vec<&str> = line.splitn(2, ' ').collect(); +// +// if parts.len() != 2 { +// return Err(format!("Invalid line format: {}", line).into()); +// } +// +// let number = parts[0].parse::()?; +// let text = parts[1].to_string(); +// +// Ok((number, text)) +// }) +// } + +// pub async fn test_saturation( +// max_concurrent: usize, +// copy_chunks: usize, +// chunks_file_path: &Path, +// ) -> Result<(), Box> { +// let (tx, rx) = mpsc::channel(max_concurrent); +// let chunks = parse_file_lines(chunks_file_path).take(copy_chunks); +// let handle = tokio::spawn(copier(rx, max_concurrent)); +// for task in chunks { +// match task { +// Ok(task) => tx.send(task).await?, +// Err(_) => panic!("Error reading file"), +// } +// } +// drop(tx); +// //tx.send((0, "".to_string())).await?; +// +// handle.await??; +// +// Ok(()) +// } + +// async fn copier( +// mut rx: Receiver<(u64, String)>, +// max_concurrent: usize, +// ) -> Result<(), RepositoryError> { +// let config = S3Options { +// region: Some("us-east-1".to_string()), +// endpoint_url: None, +// anonymous: false, +// allow_http: false, +// force_path_style: false, +// network_stream_timeout_seconds: None, +// }; +// let source_bucket = "icechunk-public-data".to_string(); +// let source = new_s3_storage( +// config.clone(), +// source_bucket, +// Some("v1/era5_weatherbench2".to_string()), +// None, +// )?; +// +// let destination_bucket = "icechunk-test".to_string(); +// let destination = new_s3_storage( +// config.clone(), +// destination_bucket, +// Some("test-net-saturation".to_string()), +// None, +// )?; +// +// let semaphore = Arc::new(Semaphore::new(max_concurrent)); +// let settings = Arc::new(source.default_settings()); +// +// let (done_sender, mut done_receiver) = mpsc::channel(10_000); +// +// let done_task = tokio::spawn(async move { +// let mut bytes_copied = 0u64; +// let mut chunks_copied = 0u64; +// let start_time = Instant::now(); +// +// while let Some(size) = done_receiver.recv().await { +// bytes_copied += size; +// chunks_copied += 1; +// if start_time.elapsed().as_secs() > 0 { +// let speed = bytes_copied as f64 +// / 1_000_000.0 +// / start_time.elapsed().as_secs() as f64; +// println!( +// "Copied {chunks_copied} chunks. Average speed: {speed:.2} MB/sec" +// ); +// } +// } +// bytes_copied +// }); +// +// while let Some((size, id)) = rx.recv().await { +// let guard = semaphore.clone().acquire_owned().await?; +// tokio::spawn(copy_chunk( +// size, +// id, +// source.clone(), +// destination.clone(), +// settings.clone(), +// done_sender.clone(), +// guard, +// )); +// } +// +// drop(done_sender); +// done_task.await?; +// +// Ok(()) +// } + +// async fn copy_chunk( +// size: u64, +// id: String, +// source: Arc, +// destination: Arc, +// settings: Arc, +// done: Sender, +// _semaphore: OwnedSemaphorePermit, +// ) -> Result<(), StorageError> { +// let key = format!("chunks/{id}"); +// //println!("Copying {key}"); +// let (mut reader, _) = +// source.get_object(settings.as_ref(), key.as_str(), None).await?; +// +// let mut buffer = Vec::with_capacity(1024 * 1024); +// reader.read_to_end(&mut buffer).await?; +// let bytes = Bytes::from_owner(buffer); +// +// destination +// .put_object( +// settings.as_ref(), +// key.as_str(), +// bytes, +// None, +// Default::default(), +// None, +// ) +// .await?; +// //println!("Done {key}"); +// done.send(size).await.unwrap(); +// Ok(()) +// } + +pub enum VersionSelection { + SingleSnapshot(SnapshotId), + AllHistory, + // TODO: can we do refs here instead of String? + RefsHistory { branches: Vec, tags: Vec }, +} + +pub struct ExportConfig<'a> { + pub source: &'a Repository, + pub destination: Arc, + pub versions: VersionSelection, + pub update_config: bool, +} + +fn select_snapshots( + repo: &RepoInfo, + versions: &VersionSelection, +) -> RepositoryResult> { + match versions { + VersionSelection::SingleSnapshot(snapshot_id) => { + Ok(HashSet::from([snapshot_id.clone()])) + } + VersionSelection::AllHistory => { + let branches = repo.branch_names()?.map(|s| s.to_string()); + let tags = repo.tag_names()?.map(|s| s.to_string()); + let selection = VersionSelection::RefsHistory { + branches: branches.collect(), + tags: tags.collect(), + }; + select_snapshots(repo, &selection) + } + VersionSelection::RefsHistory { branches, tags } => { + let branch_roots = branches + .iter() + .map(|name| repo.resolve_branch(name)) + .map_ok(|snap_id| repo.ancestry(&snap_id)); + let tag_roots = tags + .iter() + .map(|name| repo.resolve_tag(name)) + .map_ok(|snap_id| repo.ancestry(&snap_id)); + let mut res = HashSet::new(); + for it in branch_roots.chain(tag_roots) { + for snap_id in it?? { + if !res.insert(snap_id?.id) { + // we don'n need to continue because + // we have already seen this snapshot and all its ancestry + break; + } + } + } + Ok(res) + } + } +} + +enum ObjectType { + Chunk, + Manifest, + TransactionLog, + Snapshot, +} +// +// struct CopyMetadata { +// object_type: ObjectType, +// object_size: u64, +// } + +enum Operation { + Copy { from: String, to: String, object_type: ObjectType, object_size: Option }, +} + +// async fn process_snap_ids( +// mut rec: Receiver, +// mut sender: Sender, +// asset_manager: Arc, +// known_manifests: &HashSet, +// max_snapshots_in_memory: usize, +// ) { +// let semaphore = Arc::new(Semaphore::new(max_snapshots_in_memory)); +// while let Some(sid) = rec.recv().await { +// let guard = semaphore.acquire().await.unwrap(); +// let snap = asset_manager.fetch_snapshot(&sid).await.unwrap(); +// for mfile in snap.manifest_files() { +// if !known_manifests.contains(&mfile.id) { +// sender.send(mfile.id).await; +// } +// } +// } +// } + +// fn export_snaps( +// source: Arc, +// destination: Arc, +// missing_snaps: HashSet, +// known_manifests: HashSet, +// known_chunks: HashSet, +// ) { +// let max_concurrent_snaps = 100; // FIXME: tune, configurable +// +// let (snap_id_tx, snap_id_rx) = mpsc::channel(max_concurrent_snaps); +// let snap_id_task_handle = tokio::spawn(process_snap_ids(rx, max_concurrent)); +// let handle = tokio::spawn(copier(rx, max_concurrent)); +// for task in chunks { +// match task { +// Ok(task) => tx.send(task).await?, +// Err(_) => panic!("Error reading file"), +// } +// } +// drop(tx); +// //tx.send((0, "".to_string())).await?; +// +// handle.await??; +// +// Ok(()) +// } + +async fn calculate_diff( + repo: &RepoInfo, + versions: &VersionSelection, + destination: Arc, +) -> RepositoryResult<(HashSet, HashSet, HashSet)> { + // FIXME: tune caches + let destination = + Repository::open_or_create(None, destination, Default::default()).await?; + //TODO: bring outside of GC + let gc_config = GCConfig::clean_all( + Utc::now(), + Utc::now(), + None, + 50.try_into().unwrap(), + (50 * 1_024 * 1_024).try_into().unwrap(), + 500.try_into().unwrap(), + true, + ); + let (dest_chunks, dest_manifests, dest_snapshots) = + find_retained(destination.asset_manager().clone(), &gc_config).await?; + + let source_snaps = select_snapshots(repo, versions)?; + let missing_snaps = &source_snaps - &dest_snapshots; + + Ok((missing_snaps, dest_manifests, dest_chunks)) +} + +async fn copy_object( + from_path: String, + to_path: String, + object_type: ObjectType, + object_size: Option, + source: Arc, + destination: Arc, + source_settings: Arc, + destination_settings: Arc, + result: Sender>>, + _semaphore: OwnedSemaphorePermit, +) -> Result<(), Box> { + let object_size = do_copy_object( + from_path, + to_path, + object_size, + source, + destination, + source_settings, + destination_settings, + ) + .await + .map_err(|e| { + let e: Box = Box::new(e); + e + })?; + result.send(Ok((object_type, object_size))).await.unwrap(); + Ok(()) +} + +async fn do_copy_object( + from_path: String, + to_path: String, + object_size: Option, + source: Arc, + destination: Arc, + source_settings: Arc, + destination_settings: Arc, +) -> Result { + let range = object_size.map(|n| 0..n); + let (mut reader, _) = source + .get_object(source_settings.as_ref(), from_path.as_str(), range.as_ref()) + .await?; + + // TODO: better capacity + let mut buffer = Vec::with_capacity(object_size.unwrap_or(1024 * 1024) as usize); + reader.read_to_end(&mut buffer).await?; + let bytes = Bytes::from_owner(buffer); + let len = bytes.len() as u64; + + destination + .put_object( + destination_settings.as_ref(), + to_path.as_str(), + bytes, + None, + Default::default(), + None, + ) + .await?; + Ok(len) +} + +async fn execute_operations( + mut rec: Receiver, + result: Sender>>, + source: Arc, + destination: Arc, + concurrent_operations: usize, +) -> Result<(), Box> { + let source_settings = Arc::new(source.default_settings()); + let destination_settings = Arc::new(destination.default_settings()); + + let semaphore = Arc::new(Semaphore::new(concurrent_operations)); + while let Some(op) = rec.recv().await { + match op { + Operation::Copy { from, to, object_type, object_size } => { + let guard = semaphore.clone().acquire_owned().await.unwrap(); + // FIXME: + //tokio::time::sleep(Duration::from_millis(400)).await; + tokio::spawn(copy_object( + from, + to, + object_type, + object_size, + source.clone(), + destination.clone(), + source_settings.clone(), + destination_settings.clone(), + result.clone(), + guard, + )); + } + } + } + Ok(()) +} + +async fn receive_operation_result( + mut rec: Receiver>>, + chunk_progress: ProgressBar, + manifest_progress: ProgressBar, + snapshot_progress: ProgressBar, + bytes_progress: ProgressBar, +) { + while let Some(res) = rec.recv().await { + match res { + Ok((object_type, object_size)) => match object_type { + ObjectType::Chunk => { + chunk_progress.inc(1); + bytes_progress.inc(object_size); + } + ObjectType::Manifest => { + manifest_progress.inc(1); + bytes_progress.inc(object_size); + } + ObjectType::TransactionLog => todo!(), + ObjectType::Snapshot => { + snapshot_progress.inc(1); + bytes_progress.inc(object_size); + } + }, + Err(err) => panic!("{}", err.to_string()), + } + } +} + +pub async fn export( + source: &Repository, + destination: Arc, + versions: &VersionSelection, +) -> Result<(), Box> { + // FIXME: fail for v1 repos + let multi = indicatif::MultiProgress::new(); + let bytes_sty = indicatif::ProgressStyle::with_template( + "{prefix:15.green} {binary_bytes} [{binary_bytes_per_sec}]", + ) + .unwrap(); + let chunks_sty = indicatif::ProgressStyle::with_template( + "{prefix:15.green} {bar:60} {human_pos:>}/{human_len} [{eta}]", + ) + .unwrap(); + let others_sty = indicatif::ProgressStyle::with_template( + "{prefix:15.green} {bar:60} {human_pos:>}/{human_len}", + ) + .unwrap(); + let bytes_progress = indicatif::ProgressBar::new(0); + let snapshots_progress = indicatif::ProgressBar::new(0); + let manifests_progress = indicatif::ProgressBar::new(0); + let chunks_progress = indicatif::ProgressBar::new(0); + multi + .add(chunks_progress.clone()) + .with_style(chunks_sty.clone()) + .with_prefix("🧊 Chunks:"); + multi + .add(manifests_progress.clone()) + .with_style(others_sty.clone()) + .with_prefix("📜 Manifests:"); + multi + .add(snapshots_progress.clone()) + .with_style(others_sty.clone()) + .with_prefix("📸 Snapshots:"); + multi.add(bytes_progress.clone()).with_style(bytes_sty).with_prefix("💾 Copied:"); + + let (source_repo_info, _) = source.asset_manager().fetch_repo_info().await?; + let (missing_snapshots, mut dest_manifests, mut dest_chunks) = + calculate_diff(&source_repo_info, versions, destination.clone()).await?; + snapshots_progress.set_length(missing_snapshots.len() as u64); + + let (op_result_sender, op_result_receiver) = mpsc::channel(100_000_000); // FIXME: + let (op_execute_sender, op_execute_receiver) = mpsc::channel(100_000_000); // FIXME: + + let op_exec_handle = tokio::spawn(execute_operations( + op_execute_receiver, + op_result_sender, + source.storage().clone(), + destination.clone(), + 512, // FIXME: + )); + + let op_result_handle = tokio::spawn(receive_operation_result( + op_result_receiver, + chunks_progress.clone(), + manifests_progress.clone(), + snapshots_progress.clone(), + bytes_progress.clone(), + )); + + for snap_id in missing_snapshots { + let snap = source.asset_manager().fetch_snapshot(&snap_id).await?; + for mfile in snap.manifest_files() { + if !dest_manifests.contains(&mfile.id) { + manifests_progress.inc_length(1); + dest_manifests.insert(mfile.id.clone()); + let manifest = source + .asset_manager() + .fetch_manifest(&mfile.id, mfile.size_bytes) + .await?; + + //copy all chunks + for chunk_payload in manifest.chunk_payloads() { + match chunk_payload? { + ChunkPayload::Inline(_) => { //TODO: materialize + } + ChunkPayload::Virtual(_) => { //TODO: materialize + } + ChunkPayload::Ref(chunk_ref) => { + dest_chunks.insert(chunk_ref.id.clone()); + chunks_progress.inc_length(1); + let from_path = AssetManager::chunk_path(&chunk_ref.id); + let to_path = from_path.clone(); + let op = Operation::Copy { + from: from_path, + to: to_path, + object_type: ObjectType::Chunk, + object_size: Some(chunk_ref.length), + }; + op_execute_sender.send(op).await?; + } + _ => { + panic!("Unknown payload type"); + } + } + } + + //copy manifest + let from_path = AssetManager::manifest_path(&manifest.id()); + let to_path = from_path.clone(); + let op = Operation::Copy { + from: from_path, + to: to_path, + object_type: ObjectType::Manifest, + object_size: Some(mfile.size_bytes), + }; + op_execute_sender.send(op).await?; + } + } + // copy snapshot + let from_path = AssetManager::snapshot_path(&snap.id()); + let to_path = from_path.clone(); + let op = Operation::Copy { + from: from_path, + to: to_path, + object_size: None, + object_type: ObjectType::Snapshot, + }; + op_execute_sender.send(op).await?; + } + + drop(op_execute_sender); + op_exec_handle.await?.unwrap(); + op_result_handle.await?; + + Ok(()) +} + +#[cfg(test)] +mod tests {} diff --git a/icechunk-python/Cargo.toml b/icechunk-python/Cargo.toml index 6198cfa65..b03154d83 100644 --- a/icechunk-python/Cargo.toml +++ b/icechunk-python/Cargo.toml @@ -22,6 +22,7 @@ bytes = "1.10.1" chrono = { version = "0.4.42" } futures = "0.3.31" icechunk = { path = "../icechunk", version = "0.3.9", features = ["logs"] } +icechunk-export = { path = "../icechunk-export", version = "0.1.0" } itertools = "0.14.0" pyo3 = { version = "0.24.2", features = [ "chrono", diff --git a/icechunk-python/python/icechunk/__init__.py b/icechunk-python/python/icechunk/__init__.py index ee9597aec..9ebcdb7b9 100644 --- a/icechunk-python/python/icechunk/__init__.py +++ b/icechunk-python/python/icechunk/__init__.py @@ -22,6 +22,10 @@ Credentials, Diff, ExpirationRanUpdate, + ExportAllHistory, + ExportRefsHistory, + ExportSingleSnapshot, + ExportVersionSelection, GCRanUpdate, GcsBearerCredential, GcsCredentials, @@ -56,6 +60,7 @@ VirtualChunkContainer, VirtualChunkSpec, __version__, + _export_repository, _upgrade_icechunk_repository, initialize_logs, set_logs_filter, @@ -127,6 +132,10 @@ "Credentials", "Diff", "ExpirationRanUpdate", + "ExportAllHistory", + "ExportRefsHistory", + "ExportSingleSnapshot", + "ExportVersionSelection", "ForkSession", "GCRanUpdate", "GCSummary", @@ -260,6 +269,12 @@ def upgrade_icechunk_repository( ) +def export_repository( + repo: Repository, destination: Storage, versions: ExportVersionSelection +) -> None: + _export_repository(repo._repository, destination, versions) + + ManifestSplittingConfig.from_dict = from_dict # type: ignore[method-assign] ManifestSplittingConfig.to_dict = to_dict # type: ignore[attr-defined] diff --git a/icechunk-python/src/errors.rs b/icechunk-python/src/errors.rs index d1bb5329d..6c6eef156 100644 --- a/icechunk-python/src/errors.rs +++ b/icechunk-python/src/errors.rs @@ -2,7 +2,7 @@ use icechunk::{ StorageError, format::{IcechunkFormatError, manifest::VirtualReferenceError}, migrations::MigrationError, - ops::{gc::GCError, manifests::ManifestOpsError}, + ops::manifests::ManifestOpsError, repository::RepositoryError, session::{SessionError, SessionErrorKind}, store::{StoreError, StoreErrorKind}, @@ -40,8 +40,6 @@ pub(crate) enum PyIcechunkStoreError { #[error(transparent)] IcechunkFormatError(#[from] IcechunkFormatError), #[error(transparent)] - GCError(#[from] GCError), - #[error(transparent)] ManifestOpsError(#[from] ManifestOpsError), #[error(transparent)] VirtualReferenceError(#[from] VirtualReferenceError), diff --git a/icechunk-python/src/export.rs b/icechunk-python/src/export.rs new file mode 100644 index 000000000..d0d33af29 --- /dev/null +++ b/icechunk-python/src/export.rs @@ -0,0 +1,49 @@ +use pyo3::{pyclass, pymethods}; + +#[pyclass(name = "ExportVersionSelection", eq, subclass)] +#[derive(Debug, PartialEq, Eq)] +pub struct PyVersionSelection; + +#[pyclass(name = "ExportSingleSnapshot", eq, extends=PyVersionSelection)] +#[derive(Debug, PartialEq, Eq)] +pub struct PySingleSnapshot { + #[pyo3(get)] + pub snapshot_id: String, +} + +#[pymethods] +impl PySingleSnapshot { + #[new] + fn new(snapshot_id: String) -> (Self, PyVersionSelection) { + (PySingleSnapshot { snapshot_id }, PyVersionSelection) + } +} + +#[pyclass(name = "ExportAllHistory", eq, extends=PyVersionSelection)] +#[derive(Debug, PartialEq, Eq)] +pub struct PyAllHistory; + +#[pymethods] +impl PyAllHistory { + #[new] + fn new() -> (Self, PyVersionSelection) { + (PyAllHistory, PyVersionSelection) + } +} + +#[pyclass(name = "ExportRefsHistory", eq, extends=PyVersionSelection)] +#[derive(Debug, PartialEq, Eq)] +pub struct PyRefsHistory { + #[pyo3(get)] + pub branches: Vec, + #[pyo3(get)] + pub tags: Vec, +} + +#[pymethods] +impl PyRefsHistory { + #[new] + fn new(branches: Vec, tags: Vec) -> (Self, PyVersionSelection) { + (PyRefsHistory { branches, tags }, PyVersionSelection) + } +} diff --git a/icechunk-python/src/lib.rs b/icechunk-python/src/lib.rs index dead4421c..cf779051a 100644 --- a/icechunk-python/src/lib.rs +++ b/icechunk-python/src/lib.rs @@ -1,6 +1,7 @@ mod config; mod conflicts; mod errors; +mod export; mod pickle; mod repository; mod session; @@ -152,6 +153,16 @@ fn _upgrade_icechunk_repository( repo.migrate_1_to_2(py, dry_run, delete_unused_v1_files) } +#[pyfunction] +fn _export_repository<'py>( + py: Python<'py>, + source: &PyRepository, + destination: PyStorage, + versions: &Bound<'py, PyAny>, +) -> PyResult<()> { + source.export(py, destination, versions) +} + fn pep440_version() -> String { let cargo_version = env!("CARGO_PKG_VERSION"); cargo_version.replace("-rc.", "rc").replace("-alpha.", "a").replace("-beta.", "b") @@ -210,12 +221,17 @@ fn _icechunk_python(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_function(wrap_pyfunction!(initialize_logs, m)?)?; m.add_function(wrap_pyfunction!(set_logs_filter, m)?)?; m.add_function(wrap_pyfunction!(spec_version, m)?)?; m.add_function(wrap_pyfunction!(cli_entrypoint, m)?)?; m.add_function(wrap_pyfunction!(_upgrade_icechunk_repository, m)?)?; + m.add_function(wrap_pyfunction!(_export_repository, m)?)?; m.add("__version__", pep440_version())?; // Exceptions diff --git a/icechunk-python/src/repository.rs b/icechunk-python/src/repository.rs index 03905eac0..65adf32a6 100644 --- a/icechunk-python/src/repository.rs +++ b/icechunk-python/src/repository.rs @@ -42,6 +42,7 @@ use crate::{ format_option_to_string, }, errors::PyIcechunkStoreError, + export::{PyAllHistory, PyRefsHistory, PySingleSnapshot, PyVersionSelection}, impl_pickle, session::PySession, streams::PyAsyncGenerator, @@ -748,6 +749,36 @@ impl PyRepository { }) }) } + + pub fn export<'py>( + &self, + py: Python<'py>, + destination: PyStorage, + versions: &Bound<'py, PyAny>, + ) -> PyResult<()> { + let versions = if let Ok(_) = versions.extract::>() { + icechunk_export::VersionSelection::AllHistory + } else if let Ok(cell) = versions.extract::>() { + icechunk_export::VersionSelection::SingleSnapshot( + cell.snapshot_id.clone().try_into().unwrap(), //FIXME: unwrap + ) + } else if let Ok(cell) = versions.extract::>() { + icechunk_export::VersionSelection::RefsHistory { + tags: cell.tags.clone(), + branches: cell.branches.clone(), + } + } else { + panic!("unknown versions value") // FIXME: panic + }; + py.allow_threads(move || { + pyo3_async_runtimes::tokio::get_runtime().block_on(async move { + let repo = self.0.write().await; + let destination = destination.0; + icechunk_export::export(&repo, destination, &versions).await.unwrap(); //FIXME:unwrap + Ok(()) + }) + }) + } } #[pymethods] @@ -1806,7 +1837,7 @@ impl PyRepository { }, ) .await - .map_err(PyIcechunkStoreError::GCError)?; + .map_err(PyIcechunkStoreError::RepositoryError)?; Ok::<_, PyIcechunkStoreError>( result .released_snapshots @@ -1850,7 +1881,7 @@ impl PyRepository { }, ) .await - .map_err(PyIcechunkStoreError::GCError)?; + .map_err(PyIcechunkStoreError::RepositoryError)?; Ok(result.released_snapshots.iter().map(|id| id.to_string()).collect()) }) } @@ -1883,7 +1914,7 @@ impl PyRepository { }; let result = garbage_collect(asset_manager, &gc_config) .await - .map_err(PyIcechunkStoreError::GCError)?; + .map_err(PyIcechunkStoreError::RepositoryError)?; Ok::<_, PyIcechunkStoreError>(result.into()) })?; @@ -1917,7 +1948,7 @@ impl PyRepository { }; let result = garbage_collect(asset_manager, &gc_config) .await - .map_err(PyIcechunkStoreError::GCError)?; + .map_err(PyIcechunkStoreError::RepositoryError)?; Ok(result.into()) }) } diff --git a/icechunk/src/asset_manager.rs b/icechunk/src/asset_manager.rs index 8de1ee87d..c5d5bf62f 100644 --- a/icechunk/src/asset_manager.rs +++ b/icechunk/src/asset_manager.rs @@ -452,6 +452,18 @@ impl AssetManager { .await } + pub fn chunk_path(id: &ChunkId) -> String { + format!("{CHUNKS_FILE_PATH}/{id}") + } + + pub fn manifest_path(id: &ManifestId) -> String { + format!("{MANIFESTS_FILE_PATH}/{id}") + } + + pub fn snapshot_path(id: &SnapshotId) -> String { + format!("{SNAPSHOTS_FILE_PATH}/{id}") + } + #[instrument(skip(self, bytes))] pub async fn write_chunk( &self, @@ -460,7 +472,6 @@ impl AssetManager { ) -> RepositoryResult<()> { trace!(%chunk_id, size_bytes=bytes.len(), "Writing chunk"); - let path = format!("{CHUNKS_FILE_PATH}/{chunk_id}"); let _permit = self.request_semaphore.acquire().await?; let settings = storage::Settings { storage_class: self.storage_settings.chunks_storage_class().cloned(), @@ -468,7 +479,14 @@ impl AssetManager { }; // we don't pre-populate the chunk cache, there are too many of them for this to be useful self.storage - .put_object(&settings, path.as_str(), bytes, None, Default::default(), None) + .put_object( + &settings, + Self::chunk_path(&chunk_id).as_str(), + bytes, + None, + Default::default(), + None, + ) .await? .must_write()?; Ok(()) @@ -485,11 +503,14 @@ impl AssetManager { Ok(chunk) => Ok(chunk), Err(guard) => { trace!(%chunk_id, ?range, "Downloading chunk"); - let path = format!("{CHUNKS_FILE_PATH}/{chunk_id}"); let permit = self.request_semaphore.acquire().await?; let (read, _) = self .storage - .get_object(&self.storage_settings, &path, Some(range)) + .get_object( + &self.storage_settings, + &Self::chunk_path(chunk_id), + Some(range), + ) .await?; let chunk = async_reader_to_bytes(read, (range.end - range.start) as usize) @@ -507,11 +528,13 @@ impl AssetManager { snapshot_id: &SnapshotId, ) -> RepositoryResult> { debug!(%snapshot_id, "Getting snapshot timestamp"); - let path = format!("{SNAPSHOTS_FILE_PATH}/{snapshot_id}"); let _permit = self.request_semaphore.acquire().await?; Ok(self .storage - .get_object_last_modified(path.as_str(), &self.storage_settings) + .get_object_last_modified( + Self::snapshot_path(snapshot_id).as_str(), + &self.storage_settings, + ) .await?) } @@ -785,7 +808,6 @@ async fn write_new_manifest( let len = buffer.len() as u64; debug!(%id, size_bytes=len, "Writing manifest"); - let path = format!("{MANIFESTS_FILE_PATH}/{id}"); let settings = storage::Settings { storage_class: storage_settings.metadata_storage_class().cloned(), ..storage_settings.clone() @@ -793,7 +815,14 @@ async fn write_new_manifest( let _permit = semaphore.acquire().await?; storage - .put_object(&settings, path.as_str(), buffer.into(), None, metadata, None) + .put_object( + &settings, + AssetManager::manifest_path(&id).as_str(), + buffer.into(), + None, + metadata, + None, + ) .await? .must_write()?; Ok(len) @@ -808,12 +837,17 @@ async fn fetch_manifest( ) -> RepositoryResult> { debug!(%manifest_id, "Downloading manifest"); - let path = format!("{MANIFESTS_FILE_PATH}/{manifest_id}"); let range = 0..manifest_size; let range = if manifest_size > 0 { Some(&range) } else { None }; let _permit = semaphore.acquire().await?; - let (read, _) = storage.get_object(storage_settings, path.as_str(), range).await?; + let (read, _) = storage + .get_object( + storage_settings, + AssetManager::manifest_path(manifest_id).as_str(), + range, + ) + .await?; let span = Span::current(); tokio::task::spawn_blocking(move || { @@ -887,7 +921,7 @@ async fn write_new_snapshot( .await??; debug!(%id, size_bytes=buffer.len(), "Writing snapshot"); - let path = format!("{SNAPSHOTS_FILE_PATH}/{id}"); + let path = AssetManager::snapshot_path(&id); let settings = storage::Settings { storage_class: storage_settings.metadata_storage_class().cloned(), ..storage_settings.clone() @@ -910,7 +944,7 @@ async fn fetch_snapshot( debug!(%snapshot_id, "Downloading snapshot"); let _permit = semaphore.acquire().await?; - let path = format!("{SNAPSHOTS_FILE_PATH}/{snapshot_id}"); + let path = AssetManager::snapshot_path(snapshot_id); let (read, _) = storage.get_object(storage_settings, path.as_str(), None).await?; let span = Span::current(); @@ -1270,7 +1304,7 @@ mod test { // when we insert we cache, so no fetches assert_eq!( logging.fetch_operations(), - vec![("put_object".to_string(), format!("{MANIFESTS_FILE_PATH}/{id}"))] + vec![("put_object".to_string(), AssetManager::manifest_path(&id))] ); // first time it sees an ID it calls the backend diff --git a/icechunk/src/ops/gc.rs b/icechunk/src/ops/gc.rs index 173ed6570..c72629b3a 100644 --- a/icechunk/src/ops/gc.rs +++ b/icechunk/src/ops/gc.rs @@ -14,13 +14,13 @@ use tracing::{debug, info, instrument}; use crate::{ asset_manager::AssetManager, format::{ - ChunkId, FileTypeTag, IcechunkFormatError, ManifestId, ObjectId, SnapshotId, + ChunkId, FileTypeTag, ManifestId, ObjectId, SnapshotId, manifest::{ChunkPayload, Manifest}, repo_info::{RepoInfo, UpdateType}, snapshot::{ManifestFileInfo, Snapshot, SnapshotInfo}, }, ops::pointed_snapshots, - refs::{Ref, RefError}, + refs::Ref, repository::{RepositoryError, RepositoryErrorKind, RepositoryResult}, storage::{DeleteObjectsResult, ListInfo, VersionInfo}, stream_utils::{StreamLimiter, try_unique_stream}, @@ -170,18 +170,6 @@ pub struct GCSummary { pub transaction_logs_deleted: u64, } -#[derive(Debug, thiserror::Error)] -pub enum GCError { - #[error("ref error {0}")] - Ref(#[from] RefError), - #[error("repository error {0}")] - Repository(#[from] RepositoryError), - #[error("format error {0}")] - FormatError(#[from] IcechunkFormatError), -} - -pub type GCResult = Result; - async fn snapshot_retained( keep_snapshots: Arc>>, snap: Arc, @@ -250,10 +238,10 @@ async fn chunks_retained( } #[instrument(skip_all)] -async fn find_retained( +pub async fn find_retained( asset_manager: Arc, config: &GCConfig, -) -> GCResult<(HashSet, HashSet, HashSet)> { +) -> RepositoryResult<(HashSet, HashSet, HashSet)> { let all_snaps = pointed_snapshots(Arc::clone(&asset_manager), &config.extra_roots).await?; @@ -316,12 +304,12 @@ async fn find_retained( pub async fn garbage_collect( asset_manager: Arc, config: &GCConfig, -) -> GCResult { +) -> RepositoryResult { if !asset_manager.can_write_to_storage() { - return Err(GCError::Repository( - RepositoryErrorKind::ReadonlyStorage("Cannot garbage collect".to_string()) - .into(), - )); + return Err(RepositoryErrorKind::ReadonlyStorage( + "Cannot garbage collect".to_string(), + ) + .into()); } // TODO: this function could have much more parallelism @@ -404,7 +392,7 @@ async fn delete_snapshots_from_repo_info( keep_snapshots: &HashSet, repo_info: Arc, repo_info_version: &VersionInfo, -) -> GCResult<()> { +) -> RepositoryResult<()> { let kept_snaps: Vec<_> = repo_info .all_snapshots()? .filter_ok(|si| keep_snapshots.contains(&si.id)) @@ -449,7 +437,7 @@ async fn gc_chunks( asset_manager: &AssetManager, config: &GCConfig, keep_ids: &HashSet, -) -> GCResult { +) -> RepositoryResult { tracing::info!("Deleting chunks"); let to_delete = asset_manager .list_chunks() @@ -477,7 +465,7 @@ async fn gc_manifests( asset_manager: &AssetManager, config: &GCConfig, keep_ids: &HashSet, -) -> GCResult { +) -> RepositoryResult { tracing::info!("Deleting manifests"); let to_delete = asset_manager .list_manifests() @@ -508,7 +496,7 @@ async fn gc_snapshots( asset_manager: &AssetManager, config: &GCConfig, keep_ids: &HashSet, -) -> GCResult { +) -> RepositoryResult { tracing::info!("Deleting snapshots"); let to_delete = asset_manager .list_snapshots() @@ -539,7 +527,7 @@ async fn gc_transaction_logs( asset_manager: &AssetManager, config: &GCConfig, keep_ids: &HashSet, -) -> GCResult { +) -> RepositoryResult { tracing::info!("Deleting transaction logs"); let to_delete = asset_manager .list_transaction_logs() @@ -600,21 +588,23 @@ pub async fn expire( older_than: DateTime, expired_branches: ExpiredRefAction, expired_tags: ExpiredRefAction, -) -> GCResult { +) -> RepositoryResult { if !asset_manager.can_write_to_storage() { - return Err(GCError::Repository( - RepositoryErrorKind::ReadonlyStorage("Cannot expire".to_string()).into(), - )); + return Err( + RepositoryErrorKind::ReadonlyStorage("Cannot expire".to_string()).into() + ); } info!("Expiration started"); let (repo_info, repo_info_version) = asset_manager.fetch_repo_info().await?; let tags: Vec<(Ref, SnapshotId)> = repo_info .tags()? - .map(|(name, snap)| Ok::<_, GCError>((Ref::Tag(name.to_string()), snap))) + .map(|(name, snap)| Ok::<_, RepositoryError>((Ref::Tag(name.to_string()), snap))) .try_collect()?; let branches: Vec<(Ref, SnapshotId)> = repo_info .branches()? - .map(|(name, snap)| Ok::<_, GCError>((Ref::Branch(name.to_string()), snap))) + .map(|(name, snap)| { + Ok::<_, RepositoryError>((Ref::Branch(name.to_string()), snap)) + }) .try_collect()?; fn split_root( @@ -647,7 +637,7 @@ pub async fn expire( } }; - Ok::<_, GCError>(res) + Ok::<_, RepositoryError>(res) }, )?; From d2521f203789b1205802f4179d9bc4b40fefa75d Mon Sep 17 00:00:00 2001 From: Sebastian Galkin Date: Fri, 12 Sep 2025 16:22:53 -0300 Subject: [PATCH 2/2] Working on repo_info export --- Cargo.lock | 4 + icechunk-export/Cargo.toml | 3 + icechunk-export/examples/export.rs | 12 +- icechunk-export/src/lib.rs | 801 +++++++++++++++-------------- icechunk-python/src/repository.rs | 8 +- icechunk/src/asset_manager.rs | 8 +- icechunk/src/format/mod.rs | 4 +- icechunk/src/storage/s3.rs | 2 +- 8 files changed, 453 insertions(+), 389 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d292b81cf..71b236a2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1789,6 +1789,7 @@ dependencies = [ name = "icechunk-export" version = "0.1.0" dependencies = [ + "async-trait", "bytes", "chrono", "err-into", @@ -1796,7 +1797,9 @@ dependencies = [ "icechunk", "indicatif", "itertools", + "thiserror 2.0.16", "tokio", + "tokio-util", ] [[package]] @@ -3821,6 +3824,7 @@ dependencies = [ "futures-core", "futures-io", "futures-sink", + "futures-util", "pin-project-lite", "tokio", ] diff --git a/icechunk-export/Cargo.toml b/icechunk-export/Cargo.toml index c82c61e38..5f01452d0 100644 --- a/icechunk-export/Cargo.toml +++ b/icechunk-export/Cargo.toml @@ -20,6 +20,9 @@ itertools = "0.14.0" futures = "0.3.31" chrono = "0.4.42" indicatif = "0.18.0" +thiserror = "2.0.16" +tokio-util = { version = "0.7.16", features = ["rt"] } +async-trait = "0.1.89" [lints] workspace = true diff --git a/icechunk-export/examples/export.rs b/icechunk-export/examples/export.rs index 70695891f..f1f541a98 100644 --- a/icechunk-export/examples/export.rs +++ b/icechunk-export/examples/export.rs @@ -4,7 +4,7 @@ use icechunk::{ Repository, Storage, config::S3Credentials, new_local_filesystem_storage, new_s3_storage, }; -use icechunk_export::export; +use icechunk_export::{ProgressBars, export}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -27,7 +27,15 @@ async fn main() -> Result<(), Box> { let source = Repository::open(None, source_storage, Default::default()).await?; let destination_path = Path::new("/tmp/test-export"); let destination = new_local_filesystem_storage(destination_path).await?; - export(&source, destination, &icechunk_export::VersionSelection::AllHistory).await?; + export( + &source, + destination, + &icechunk_export::VersionSelection::AllHistory, + Arc::new(ProgressBars::new()), + 100, + ) + .await?; + println!("done"); Ok(()) } diff --git a/icechunk-export/src/lib.rs b/icechunk-export/src/lib.rs index 9d22ac59f..2fe2292a1 100644 --- a/icechunk-export/src/lib.rs +++ b/icechunk-export/src/lib.rs @@ -1,309 +1,135 @@ use std::{ collections::HashSet, - fs::File, - hash::Hash, - io::{BufRead as _, BufReader}, - path::Path, - sync::{Arc, atomic::AtomicU64}, - time::{Duration, Instant}, + sync::{Arc, atomic::AtomicUsize}, }; +use async_trait::async_trait; use bytes::Bytes; use chrono::Utc; -use err_into::ErrorInto; -use futures::{ - SinkExt, - stream::{self, FuturesUnordered}, -}; use icechunk::{ - Repository, Storage, StorageError, + Repository, Storage, asset_manager::AssetManager, - config::S3Options, + error::ICError, format::{ - ChunkId, ManifestId, SnapshotId, manifest::ChunkPayload, repo_info::RepoInfo, + ChunkId, IcechunkFormatError, IcechunkFormatErrorKind, IcechunkResult, + ManifestId, SnapshotId, + format_constants::SpecVersionBin, + manifest::ChunkPayload, + repo_info::{RepoInfo, UpdateType}, + snapshot::Snapshot, }, - new_s3_storage, ops::gc::{GCConfig, find_retained}, - repository::{RepositoryError, RepositoryResult}, - storage::Settings, + refs::Ref, + repository::{RepositoryError, RepositoryErrorKind, RepositoryResult}, + storage::{Settings, StorageErrorKind}, }; -use indicatif::ProgressBar; +use indicatif::{MultiProgress, ProgressBar}; use itertools::Itertools as _; use tokio::{ io::AsyncReadExt as _, sync::{ OwnedSemaphorePermit, Semaphore, - mpsc::{self, Receiver, Sender}, + mpsc::{self, UnboundedReceiver, UnboundedSender}, }, }; +use tokio_util::task::TaskTracker; -// fn parse_file_lines>( -// path: P, -// ) -> impl Iterator>> { -// let file = File::open(path).expect("Failed to open file"); -// let reader = BufReader::new(file); -// -// reader.lines().map(|line_result| { -// let line = line_result?; -// let parts: Vec<&str> = line.splitn(2, ' ').collect(); -// -// if parts.len() != 2 { -// return Err(format!("Invalid line format: {}", line).into()); -// } -// -// let number = parts[0].parse::()?; -// let text = parts[1].to_string(); -// -// Ok((number, text)) -// }) -// } - -// pub async fn test_saturation( -// max_concurrent: usize, -// copy_chunks: usize, -// chunks_file_path: &Path, -// ) -> Result<(), Box> { -// let (tx, rx) = mpsc::channel(max_concurrent); -// let chunks = parse_file_lines(chunks_file_path).take(copy_chunks); -// let handle = tokio::spawn(copier(rx, max_concurrent)); -// for task in chunks { -// match task { -// Ok(task) => tx.send(task).await?, -// Err(_) => panic!("Error reading file"), -// } -// } -// drop(tx); -// //tx.send((0, "".to_string())).await?; -// -// handle.await??; -// -// Ok(()) -// } - -// async fn copier( -// mut rx: Receiver<(u64, String)>, -// max_concurrent: usize, -// ) -> Result<(), RepositoryError> { -// let config = S3Options { -// region: Some("us-east-1".to_string()), -// endpoint_url: None, -// anonymous: false, -// allow_http: false, -// force_path_style: false, -// network_stream_timeout_seconds: None, -// }; -// let source_bucket = "icechunk-public-data".to_string(); -// let source = new_s3_storage( -// config.clone(), -// source_bucket, -// Some("v1/era5_weatherbench2".to_string()), -// None, -// )?; -// -// let destination_bucket = "icechunk-test".to_string(); -// let destination = new_s3_storage( -// config.clone(), -// destination_bucket, -// Some("test-net-saturation".to_string()), -// None, -// )?; -// -// let semaphore = Arc::new(Semaphore::new(max_concurrent)); -// let settings = Arc::new(source.default_settings()); -// -// let (done_sender, mut done_receiver) = mpsc::channel(10_000); -// -// let done_task = tokio::spawn(async move { -// let mut bytes_copied = 0u64; -// let mut chunks_copied = 0u64; -// let start_time = Instant::now(); -// -// while let Some(size) = done_receiver.recv().await { -// bytes_copied += size; -// chunks_copied += 1; -// if start_time.elapsed().as_secs() > 0 { -// let speed = bytes_copied as f64 -// / 1_000_000.0 -// / start_time.elapsed().as_secs() as f64; -// println!( -// "Copied {chunks_copied} chunks. Average speed: {speed:.2} MB/sec" -// ); -// } -// } -// bytes_copied -// }); -// -// while let Some((size, id)) = rx.recv().await { -// let guard = semaphore.clone().acquire_owned().await?; -// tokio::spawn(copy_chunk( -// size, -// id, -// source.clone(), -// destination.clone(), -// settings.clone(), -// done_sender.clone(), -// guard, -// )); -// } -// -// drop(done_sender); -// done_task.await?; -// -// Ok(()) -// } - -// async fn copy_chunk( -// size: u64, -// id: String, -// source: Arc, -// destination: Arc, -// settings: Arc, -// done: Sender, -// _semaphore: OwnedSemaphorePermit, -// ) -> Result<(), StorageError> { -// let key = format!("chunks/{id}"); -// //println!("Copying {key}"); -// let (mut reader, _) = -// source.get_object(settings.as_ref(), key.as_str(), None).await?; -// -// let mut buffer = Vec::with_capacity(1024 * 1024); -// reader.read_to_end(&mut buffer).await?; -// let bytes = Bytes::from_owner(buffer); -// -// destination -// .put_object( -// settings.as_ref(), -// key.as_str(), -// bytes, -// None, -// Default::default(), -// None, -// ) -// .await?; -// //println!("Done {key}"); -// done.send(size).await.unwrap(); -// Ok(()) -// } - +#[derive(Debug, PartialEq, Eq)] pub enum VersionSelection { SingleSnapshot(SnapshotId), AllHistory, // TODO: can we do refs here instead of String? - RefsHistory { branches: Vec, tags: Vec }, + RefsHistory { + branches: Vec, + tags: Vec, + main_points_to: Option, + }, +} + +fn all_history_as_refs_history(repo: &RepoInfo) -> IcechunkResult { + let branches = repo.branch_names()?.map(|s| s.to_string()); + let tags = repo.tag_names()?.map(|s| s.to_string()); + Ok(VersionSelection::RefsHistory { + branches: branches.collect(), + tags: tags.collect(), + main_points_to: None, + }) } -pub struct ExportConfig<'a> { - pub source: &'a Repository, - pub destination: Arc, - pub versions: VersionSelection, - pub update_config: bool, +fn collect_requested_snapshots( + repo: &RepoInfo, + branches: &Vec, + tags: &Vec, +) -> IcechunkResult> { + let branch_roots = branches + .iter() + .map(|name| repo.resolve_branch(name)) + .map_ok(|snap_id| repo.ancestry(&snap_id)); + let tag_roots = tags + .iter() + .map(|name| repo.resolve_tag(name)) + .map_ok(|snap_id| repo.ancestry(&snap_id)); + let mut res = HashSet::new(); + for it in branch_roots.chain(tag_roots) { + for snap_id in it?? { + if !res.insert(snap_id?.id) { + // we don'n need to continue because + // we have already seen this snapshot and all its ancestry + break; + } + } + } + Ok(res) } fn select_snapshots( repo: &RepoInfo, versions: &VersionSelection, -) -> RepositoryResult> { +) -> ExportResult> { match versions { VersionSelection::SingleSnapshot(snapshot_id) => { Ok(HashSet::from([snapshot_id.clone()])) } VersionSelection::AllHistory => { - let branches = repo.branch_names()?.map(|s| s.to_string()); - let tags = repo.tag_names()?.map(|s| s.to_string()); - let selection = VersionSelection::RefsHistory { - branches: branches.collect(), - tags: tags.collect(), - }; + let selection = all_history_as_refs_history(repo) + .map_err(|e| ExportError::from(ExportErrorKind::FormatError(e.kind)))?; select_snapshots(repo, &selection) } - VersionSelection::RefsHistory { branches, tags } => { - let branch_roots = branches - .iter() - .map(|name| repo.resolve_branch(name)) - .map_ok(|snap_id| repo.ancestry(&snap_id)); - let tag_roots = tags - .iter() - .map(|name| repo.resolve_tag(name)) - .map_ok(|snap_id| repo.ancestry(&snap_id)); - let mut res = HashSet::new(); - for it in branch_roots.chain(tag_roots) { - for snap_id in it?? { - if !res.insert(snap_id?.id) { - // we don'n need to continue because - // we have already seen this snapshot and all its ancestry - break; - } - } + VersionSelection::RefsHistory { branches, tags, main_points_to } => { + if main_points_to.is_none() + && !branches.contains(&Ref::DEFAULT_BRANCH.to_string()) + { + return Err(ExportErrorKind::InvalidVersionSelection( + "target repository needs a `main` branch, if `main_points_to` is not specified, `main` must be included in the exported branches".to_string(), + ).into()); + } + let res = collect_requested_snapshots(repo, branches, tags) + .map_err(|e| ExportError::from(ExportErrorKind::FormatError(e.kind)))?; + if let Some(sid) = main_points_to + && !res.contains(sid) + { + return Err(ExportErrorKind::InvalidVersionSelection( + "target repository needs a valid `main` branch, the snapshot in `main_points_to` is not exported by this version selection".to_string(), + ).into()); } Ok(res) } } } -enum ObjectType { +pub enum ObjectType { Chunk, Manifest, TransactionLog, Snapshot, } -// -// struct CopyMetadata { -// object_type: ObjectType, -// object_size: u64, -// } enum Operation { - Copy { from: String, to: String, object_type: ObjectType, object_size: Option }, + Copy { path: String, object_type: ObjectType, object_size: Option }, } -// async fn process_snap_ids( -// mut rec: Receiver, -// mut sender: Sender, -// asset_manager: Arc, -// known_manifests: &HashSet, -// max_snapshots_in_memory: usize, -// ) { -// let semaphore = Arc::new(Semaphore::new(max_snapshots_in_memory)); -// while let Some(sid) = rec.recv().await { -// let guard = semaphore.acquire().await.unwrap(); -// let snap = asset_manager.fetch_snapshot(&sid).await.unwrap(); -// for mfile in snap.manifest_files() { -// if !known_manifests.contains(&mfile.id) { -// sender.send(mfile.id).await; -// } -// } -// } -// } - -// fn export_snaps( -// source: Arc, -// destination: Arc, -// missing_snaps: HashSet, -// known_manifests: HashSet, -// known_chunks: HashSet, -// ) { -// let max_concurrent_snaps = 100; // FIXME: tune, configurable -// -// let (snap_id_tx, snap_id_rx) = mpsc::channel(max_concurrent_snaps); -// let snap_id_task_handle = tokio::spawn(process_snap_ids(rx, max_concurrent)); -// let handle = tokio::spawn(copier(rx, max_concurrent)); -// for task in chunks { -// match task { -// Ok(task) => tx.send(task).await?, -// Err(_) => panic!("Error reading file"), -// } -// } -// drop(tx); -// //tx.send((0, "".to_string())).await?; -// -// handle.await??; -// -// Ok(()) -// } - async fn calculate_diff( - repo: &RepoInfo, - versions: &VersionSelection, + requested_snaps: &HashSet, destination: Arc, ) -> RepositoryResult<(HashSet, HashSet, HashSet)> { // FIXME: tune caches @@ -322,205 +148,326 @@ async fn calculate_diff( let (dest_chunks, dest_manifests, dest_snapshots) = find_retained(destination.asset_manager().clone(), &gc_config).await?; - let source_snaps = select_snapshots(repo, versions)?; - let missing_snaps = &source_snaps - &dest_snapshots; + let missing_snaps = requested_snaps - &dest_snapshots; Ok((missing_snaps, dest_manifests, dest_chunks)) } +pub type ObjectSize = u64; +pub type OperationResult = (ObjectType, ObjectSize); + +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum ExportErrorKind { + #[error(transparent)] + StorageError(StorageErrorKind), + #[error(transparent)] + RepositoryError(RepositoryErrorKind), + #[error(transparent)] + FormatError(IcechunkFormatErrorKind), + #[error("I/O error")] + IOError(#[from] std::io::Error), + + #[error("invalid selection of versions to export: {0}")] + InvalidVersionSelection(String), +} + +pub type ExportError = ICError; + +pub type ExportResult = Result; + +impl From for ExportError { + fn from(value: ExportErrorKind) -> Self { + Self::new(value) + } +} + +#[derive(Debug, Clone)] +struct Endpoint { + pub storage: Arc, + pub settings: Arc, +} + async fn copy_object( - from_path: String, - to_path: String, + path: String, object_type: ObjectType, object_size: Option, - source: Arc, - destination: Arc, - source_settings: Arc, - destination_settings: Arc, - result: Sender>>, - _semaphore: OwnedSemaphorePermit, -) -> Result<(), Box> { - let object_size = do_copy_object( - from_path, - to_path, - object_size, - source, - destination, - source_settings, - destination_settings, - ) - .await - .map_err(|e| { - let e: Box = Box::new(e); - e - })?; - result.send(Ok((object_type, object_size))).await.unwrap(); - Ok(()) + source: Endpoint, + destination: Endpoint, + result: UnboundedSender>, + semaphore: OwnedSemaphorePermit, +) { + let res = do_copy_object(path.as_str(), object_size, source, destination) + .await + .map(|size| (object_type, size)); + + drop(semaphore); + + // we can expect here because the result Receiver is never closed + #[allow(clippy::expect_used)] + result + .send(res) + .expect("Unexpected error: Failed to send copy_object operation response"); } async fn do_copy_object( - from_path: String, - to_path: String, + path: &str, object_size: Option, - source: Arc, - destination: Arc, - source_settings: Arc, - destination_settings: Arc, -) -> Result { + source: Endpoint, + destination: Endpoint, +) -> ExportResult { let range = object_size.map(|n| 0..n); let (mut reader, _) = source - .get_object(source_settings.as_ref(), from_path.as_str(), range.as_ref()) - .await?; + .storage + .get_object(source.settings.as_ref(), path, range.as_ref()) + .await + .map_err(|e| ExportError::from(ExportErrorKind::StorageError(e.kind)))?; // TODO: better capacity let mut buffer = Vec::with_capacity(object_size.unwrap_or(1024 * 1024) as usize); - reader.read_to_end(&mut buffer).await?; + reader + .read_to_end(&mut buffer) + .await + .map_err(|e| ExportError::from(ExportErrorKind::IOError(e)))?; let bytes = Bytes::from_owner(buffer); let len = bytes.len() as u64; destination + .storage .put_object( - destination_settings.as_ref(), - to_path.as_str(), + destination.settings.as_ref(), + path, bytes, None, Default::default(), None, ) - .await?; + .await + .map_err(|e| ExportError::from(ExportErrorKind::StorageError(e.kind)))?; Ok(len) } async fn execute_operations( - mut rec: Receiver, - result: Sender>>, + mut rec: UnboundedReceiver, + result: UnboundedSender>, source: Arc, destination: Arc, - concurrent_operations: usize, -) -> Result<(), Box> { - let source_settings = Arc::new(source.default_settings()); - let destination_settings = Arc::new(destination.default_settings()); - - let semaphore = Arc::new(Semaphore::new(concurrent_operations)); + max_concurrent_operations: usize, + task_tracker: TaskTracker, +) -> usize { + let settings = Arc::new(source.default_settings()); + let source = Endpoint { storage: source, settings }; + let settings = Arc::new(destination.default_settings()); + let destination = Endpoint { storage: destination, settings }; + + let mut spawned = 0; + let semaphore = Arc::new(Semaphore::new(max_concurrent_operations)); while let Some(op) = rec.recv().await { + spawned += 1; match op { - Operation::Copy { from, to, object_type, object_size } => { - let guard = semaphore.clone().acquire_owned().await.unwrap(); - // FIXME: - //tokio::time::sleep(Duration::from_millis(400)).await; - tokio::spawn(copy_object( - from, - to, + Operation::Copy { path, object_type, object_size } => { + // we can expect here because the semaphore is never closed + #[allow(clippy::expect_used)] + let guard = + semaphore.clone().acquire_owned().await.expect( + "Unexpected error executing operation: semaphore is closed", + ); + task_tracker.spawn(copy_object( + path, object_type, object_size, source.clone(), destination.clone(), - source_settings.clone(), - destination_settings.clone(), result.clone(), guard, )); } } } - Ok(()) + spawned } -async fn receive_operation_result( - mut rec: Receiver>>, +#[async_trait] +pub trait ProgresListener { + async fn completed(&self, object_type: ObjectType, object_size: u64); + async fn discovered(&self, object_type: ObjectType, count: u64); + async fn done(&self); +} + +#[derive(Debug)] +pub struct ProgressBars { chunk_progress: ProgressBar, manifest_progress: ProgressBar, snapshot_progress: ProgressBar, + transaction_progress: ProgressBar, bytes_progress: ProgressBar, -) { + _multi: MultiProgress, +} + +impl Default for ProgressBars { + fn default() -> Self { + Self::new() + } +} + +impl ProgressBars { + pub fn new() -> Self { + let multi = indicatif::MultiProgress::new(); + // unwrapping is allowed when creating templates because the + // template string is verified to work + #[allow(clippy::unwrap_used)] + let bytes_sty = indicatif::ProgressStyle::with_template( + "{prefix:16.green} {binary_bytes} [{binary_bytes_per_sec}]", + ) + .unwrap(); + #[allow(clippy::unwrap_used)] + let chunks_sty = indicatif::ProgressStyle::with_template( + "{prefix:16.green} {bar:60} {human_pos:>}/{human_len} [{eta}]", + ) + .unwrap(); + #[allow(clippy::unwrap_used)] + let others_sty = indicatif::ProgressStyle::with_template( + "{prefix:16.green} {bar:60} {human_pos:>}/{human_len}", + ) + .unwrap(); + let bytes_progress = indicatif::ProgressBar::new(0); + let snapshot_progress = indicatif::ProgressBar::new(0); + let transaction_progress = indicatif::ProgressBar::new(0); + let manifest_progress = indicatif::ProgressBar::new(0); + let chunk_progress = indicatif::ProgressBar::new(0); + multi + .add(chunk_progress.clone()) + .with_style(chunks_sty.clone()) + .with_prefix("🧊 Chunks:"); + multi + .add(manifest_progress.clone()) + .with_style(others_sty.clone()) + .with_prefix("📜 Manifests:"); + multi + .add(snapshot_progress.clone()) + .with_style(others_sty.clone()) + .with_prefix("📸 Snapshots:"); + multi + .add(transaction_progress.clone()) + .with_style(others_sty.clone()) + .with_prefix("🤝 Transactions:"); + multi.add(bytes_progress.clone()).with_style(bytes_sty).with_prefix("💾 Copied:"); + ProgressBars { + chunk_progress, + manifest_progress, + snapshot_progress, + transaction_progress, + bytes_progress, + _multi: multi, + } + } +} + +#[async_trait] +impl ProgresListener for ProgressBars { + async fn completed(&self, object_type: ObjectType, object_size: u64) { + match object_type { + ObjectType::Chunk => self.chunk_progress.inc(1), + ObjectType::Manifest => self.manifest_progress.inc(1), + ObjectType::TransactionLog => self.transaction_progress.inc(1), + ObjectType::Snapshot => self.snapshot_progress.inc(1), + } + self.bytes_progress.inc(object_size); + } + async fn discovered(&self, object_type: ObjectType, count: u64) { + match object_type { + ObjectType::Chunk => self.chunk_progress.inc_length(count), + ObjectType::Manifest => self.manifest_progress.inc_length(count), + ObjectType::TransactionLog => self.transaction_progress.inc_length(count), + ObjectType::Snapshot => self.snapshot_progress.inc_length(count), + } + } + async fn done(&self) { + self.chunk_progress.abandon(); + self.manifest_progress.abandon(); + self.transaction_progress.abandon(); + self.snapshot_progress.abandon(); + self.bytes_progress.abandon(); + } +} + +async fn receive_operation_result( + mut rec: UnboundedReceiver>, + progress_listener: Arc, +) -> ExportResult { + let results = AtomicUsize::new(0); while let Some(res) = rec.recv().await { + results.fetch_add(1, std::sync::atomic::Ordering::Relaxed); match res { Ok((object_type, object_size)) => match object_type { ObjectType::Chunk => { - chunk_progress.inc(1); - bytes_progress.inc(object_size); + progress_listener.completed(ObjectType::Chunk, object_size).await; } ObjectType::Manifest => { - manifest_progress.inc(1); - bytes_progress.inc(object_size); + progress_listener.completed(ObjectType::Manifest, object_size).await; + } + ObjectType::TransactionLog => { + progress_listener + .completed(ObjectType::TransactionLog, object_size) + .await; } - ObjectType::TransactionLog => todo!(), ObjectType::Snapshot => { - snapshot_progress.inc(1); - bytes_progress.inc(object_size); + progress_listener.completed(ObjectType::Snapshot, object_size).await; } }, - Err(err) => panic!("{}", err.to_string()), + Err(err) => return Err(err), } } + Ok(results.into_inner()) } pub async fn export( source: &Repository, destination: Arc, versions: &VersionSelection, + progress_listener: Arc, + max_concurrent_operations: usize, ) -> Result<(), Box> { - // FIXME: fail for v1 repos - let multi = indicatif::MultiProgress::new(); - let bytes_sty = indicatif::ProgressStyle::with_template( - "{prefix:15.green} {binary_bytes} [{binary_bytes_per_sec}]", - ) - .unwrap(); - let chunks_sty = indicatif::ProgressStyle::with_template( - "{prefix:15.green} {bar:60} {human_pos:>}/{human_len} [{eta}]", - ) - .unwrap(); - let others_sty = indicatif::ProgressStyle::with_template( - "{prefix:15.green} {bar:60} {human_pos:>}/{human_len}", - ) - .unwrap(); - let bytes_progress = indicatif::ProgressBar::new(0); - let snapshots_progress = indicatif::ProgressBar::new(0); - let manifests_progress = indicatif::ProgressBar::new(0); - let chunks_progress = indicatif::ProgressBar::new(0); - multi - .add(chunks_progress.clone()) - .with_style(chunks_sty.clone()) - .with_prefix("🧊 Chunks:"); - multi - .add(manifests_progress.clone()) - .with_style(others_sty.clone()) - .with_prefix("📜 Manifests:"); - multi - .add(snapshots_progress.clone()) - .with_style(others_sty.clone()) - .with_prefix("📸 Snapshots:"); - multi.add(bytes_progress.clone()).with_style(bytes_sty).with_prefix("💾 Copied:"); + if source.spec_version() < SpecVersionBin::V2dot0 { + return Err("export cannot run on Icechunk version 1 repositories.".into()); + } let (source_repo_info, _) = source.asset_manager().fetch_repo_info().await?; + let requested_snaps = select_snapshots(&source_repo_info, versions)?; let (missing_snapshots, mut dest_manifests, mut dest_chunks) = - calculate_diff(&source_repo_info, versions, destination.clone()).await?; - snapshots_progress.set_length(missing_snapshots.len() as u64); + calculate_diff(&requested_snaps, destination.clone()).await?; + progress_listener + .discovered(ObjectType::Snapshot, missing_snapshots.len() as u64) + .await; + progress_listener + .discovered(ObjectType::TransactionLog, missing_snapshots.len() as u64) + .await; - let (op_result_sender, op_result_receiver) = mpsc::channel(100_000_000); // FIXME: - let (op_execute_sender, op_execute_receiver) = mpsc::channel(100_000_000); // FIXME: + // TODO: should we limit these channels? + let (op_result_sender, op_result_receiver) = mpsc::unbounded_channel(); + let (op_execute_sender, op_execute_receiver) = mpsc::unbounded_channel(); + + let operations_tracker = TaskTracker::new(); let op_exec_handle = tokio::spawn(execute_operations( op_execute_receiver, op_result_sender, source.storage().clone(), destination.clone(), - 512, // FIXME: + max_concurrent_operations, + operations_tracker.clone(), )); let op_result_handle = tokio::spawn(receive_operation_result( op_result_receiver, - chunks_progress.clone(), - manifests_progress.clone(), - snapshots_progress.clone(), - bytes_progress.clone(), + Arc::clone(&progress_listener), )); for snap_id in missing_snapshots { let snap = source.asset_manager().fetch_snapshot(&snap_id).await?; for mfile in snap.manifest_files() { if !dest_manifests.contains(&mfile.id) { - manifests_progress.inc_length(1); + progress_listener.discovered(ObjectType::Manifest, 1).await; dest_manifests.insert(mfile.id.clone()); let manifest = source .asset_manager() @@ -536,53 +483,145 @@ pub async fn export( } ChunkPayload::Ref(chunk_ref) => { dest_chunks.insert(chunk_ref.id.clone()); - chunks_progress.inc_length(1); - let from_path = AssetManager::chunk_path(&chunk_ref.id); - let to_path = from_path.clone(); + progress_listener.discovered(ObjectType::Chunk, 1).await; + let path = AssetManager::chunk_path(&chunk_ref.id); let op = Operation::Copy { - from: from_path, - to: to_path, + path, object_type: ObjectType::Chunk, object_size: Some(chunk_ref.length), }; - op_execute_sender.send(op).await?; + op_execute_sender.send(op)?; } _ => { - panic!("Unknown payload type"); + return Err( + "bug in export, unknown chunk payload type".into() + ); } } } //copy manifest - let from_path = AssetManager::manifest_path(&manifest.id()); - let to_path = from_path.clone(); + let path = AssetManager::manifest_path(&manifest.id()); let op = Operation::Copy { - from: from_path, - to: to_path, + path, object_type: ObjectType::Manifest, object_size: Some(mfile.size_bytes), }; - op_execute_sender.send(op).await?; + op_execute_sender.send(op)?; } } + // copy tx log + let path = AssetManager::transaction_path(&snap.id()); + let op = Operation::Copy { + path, + object_size: None, + object_type: ObjectType::TransactionLog, + }; + op_execute_sender.send(op)?; + // copy snapshot - let from_path = AssetManager::snapshot_path(&snap.id()); - let to_path = from_path.clone(); + let path = AssetManager::snapshot_path(&snap.id()); let op = Operation::Copy { - from: from_path, - to: to_path, + path, object_size: None, object_type: ObjectType::Snapshot, }; - op_execute_sender.send(op).await?; + op_execute_sender.send(op)?; } drop(op_execute_sender); - op_exec_handle.await?.unwrap(); - op_result_handle.await?; + let spawned_ops = op_exec_handle.await?; + let received_results = op_result_handle.await??; + operations_tracker.close(); + operations_tracker.wait().await; + + let new_repo_info = + update_repo_info(source_repo_info.as_ref(), &requested_snaps, versions); + + let x = destination.; + + progress_listener.done().await; + + dbg!(spawned_ops); + dbg!(received_results); + println!("---------------- {}", spawned_ops); + println!(" ///////// {received_results}"); Ok(()) } +fn update_repo_info( + original: &RepoInfo, + requested_snaps: &HashSet, + selection: &VersionSelection, +) -> IcechunkResult { + match selection { + VersionSelection::SingleSnapshot(snapshot_id) => { + let tags = original.tags()?.filter(|(_, sid)| requested_snaps.contains(sid)); + let branches = original.branches()?.filter_map(|(name, sid)| { + if name == Ref::DEFAULT_BRANCH { + Some((Ref::DEFAULT_BRANCH, snapshot_id.clone())) + } else if requested_snaps.contains(&sid) { + Some((name, sid)) + } else { + None + } + }); + let initial_snap = original.find_snapshot(&Snapshot::INITIAL_SNAPSHOT_ID)?; + let mut target_snap = original.find_snapshot(snapshot_id)?; + target_snap.parent_id = Some(initial_snap.id.clone()); + + RepoInfo::new( + tags, + branches, + std::iter::empty(), + [initial_snap, target_snap], + &UpdateType::RepoInitializedUpdate, + None, + ) + } + VersionSelection::AllHistory => { + let snapshots: Vec<_> = original.all_snapshots()?.try_collect()?; + RepoInfo::new( + original.tags()?, + original.branches()?, + std::iter::empty(), + snapshots, + &UpdateType::RepoInitializedUpdate, + None, + ) + } + VersionSelection::RefsHistory { main_points_to, branches, .. } => { + assert!( + branches.contains(&Ref::DEFAULT_BRANCH.to_string()) + || main_points_to.is_some() + ); + let tags = original.tags()?.filter(|(_, sid)| requested_snaps.contains(sid)); + let branches = original.branches()?.filter_map(|(name, sid)| { + // at the beginning of export is checked that main_points_to points to an exported + // snapshot + if name == Ref::DEFAULT_BRANCH + && let Some(sid) = main_points_to + { + Some((Ref::DEFAULT_BRANCH, sid.clone())) + } else if requested_snaps.contains(&sid) { + Some((name, sid)) + } else { + None + } + }); + let snapshots: Vec<_> = original.all_snapshots()?.try_collect()?; + RepoInfo::new( + tags, + branches, + std::iter::empty(), + snapshots, + &UpdateType::RepoInitializedUpdate, + None, + ) + } + } +} + #[cfg(test)] mod tests {} diff --git a/icechunk-python/src/repository.rs b/icechunk-python/src/repository.rs index 65adf32a6..ac53a4863 100644 --- a/icechunk-python/src/repository.rs +++ b/icechunk-python/src/repository.rs @@ -42,7 +42,7 @@ use crate::{ format_option_to_string, }, errors::PyIcechunkStoreError, - export::{PyAllHistory, PyRefsHistory, PySingleSnapshot, PyVersionSelection}, + export::{PyAllHistory, PyRefsHistory, PySingleSnapshot}, impl_pickle, session::PySession, streams::PyAsyncGenerator, @@ -774,7 +774,11 @@ impl PyRepository { pyo3_async_runtimes::tokio::get_runtime().block_on(async move { let repo = self.0.write().await; let destination = destination.0; - icechunk_export::export(&repo, destination, &versions).await.unwrap(); //FIXME:unwrap + let progress = Arc::new(icechunk_export::ProgressBars::default()); + // FIXME: configuration + icechunk_export::export(&repo, destination, &versions, progress, 100) + .await + .unwrap(); //FIXME:unwrap Ok(()) }) }) diff --git a/icechunk/src/asset_manager.rs b/icechunk/src/asset_manager.rs index c5d5bf62f..a71ffa536 100644 --- a/icechunk/src/asset_manager.rs +++ b/icechunk/src/asset_manager.rs @@ -464,6 +464,10 @@ impl AssetManager { format!("{SNAPSHOTS_FILE_PATH}/{id}") } + pub fn transaction_path(id: &SnapshotId) -> String { + format!("{TRANSACTION_LOGS_FILE_PATH}/{id}") + } + #[instrument(skip(self, bytes))] pub async fn write_chunk( &self, @@ -1003,7 +1007,7 @@ async fn write_new_tx_log( .await??; debug!(%transaction_id, size_bytes=buffer.len(), "Writing transaction log"); - let path = format!("{TRANSACTION_LOGS_FILE_PATH}/{transaction_id}"); + let path = AssetManager::transaction_path(&transaction_id); let settings = storage::Settings { storage_class: storage_settings.metadata_storage_class().cloned(), ..storage_settings.clone() @@ -1025,7 +1029,7 @@ async fn fetch_transaction_log( semaphore: &Semaphore, ) -> RepositoryResult> { debug!(%transaction_id, "Downloading transaction log"); - let path = format!("{TRANSACTION_LOGS_FILE_PATH}/{transaction_id}"); + let path = AssetManager::transaction_path(transaction_id); let _permit = semaphore.acquire().await?; let (read, _) = storage.get_object(storage_settings, path.as_str(), None).await?; diff --git a/icechunk/src/format/mod.rs b/icechunk/src/format/mod.rs index 0a9759eac..12dcaa5ba 100644 --- a/icechunk/src/format/mod.rs +++ b/icechunk/src/format/mod.rs @@ -360,7 +360,9 @@ pub mod format_constants { } #[repr(u8)] - #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] + #[derive( + Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Ord, + )] pub enum SpecVersionBin { V1dot0 = 1u8, V2dot0 = 2u8, diff --git a/icechunk/src/storage/s3.rs b/icechunk/src/storage/s3.rs index c4ac04d30..217d8ae64 100644 --- a/icechunk/src/storage/s3.rs +++ b/icechunk/src/storage/s3.rs @@ -302,7 +302,7 @@ impl S3Storage { } if let Some(klass) = settings.storage_class() { - let klass = klass.as_str().into(); + let klass = klass.clone(); req = req.storage_class(klass); }