diff --git a/icechunk-python/src/repository.rs b/icechunk-python/src/repository.rs index 56e8d3837..c350938e0 100644 --- a/icechunk-python/src/repository.rs +++ b/icechunk-python/src/repository.rs @@ -2118,6 +2118,7 @@ impl PyRepository { let message = message.to_owned(); let branch = branch.to_owned(); let metadata = metadata.map(|m| m.into()); + pyo3_async_runtimes::tokio::future_into_py::<_, String>(py, async move { let repository = repository.read().await; // TODO: make commit method selectable diff --git a/icechunk/src/change_set.rs b/icechunk/src/change_set.rs index eab1b2e82..1c3f13dd1 100644 --- a/icechunk/src/change_set.rs +++ b/icechunk/src/change_set.rs @@ -19,10 +19,10 @@ use tracing::warn; use crate::{ format::{ ChunkIndices, NodeId, Path, - manifest::{ChunkInfo, ChunkPayload, ManifestExtents, ManifestSplits, Overlap}, + manifest::{ChunkInfo, ChunkPayload}, snapshot::{ArrayShape, DimensionName, NodeData, NodeSnapshot}, }, - session::{SessionErrorKind, SessionResult, find_coord}, + session::{SessionErrorKind, SessionResult}, }; // We have limitations on how many chunks we can save on a single commit. @@ -39,7 +39,7 @@ pub struct ArrayData { pub user_data: Bytes, } -type SplitManifest = BTreeMap>; +pub(crate) type ChunkTable = BTreeMap>; #[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] pub struct EditChanges { @@ -47,7 +47,7 @@ pub struct EditChanges { new_arrays: HashMap, updated_arrays: HashMap, updated_groups: HashMap, - set_chunks: BTreeMap>, + set_chunks: BTreeMap, // Number of chunks added to this change set num_chunks: u64, @@ -81,15 +81,13 @@ impl EditChanges { // FIXME: do we even test this? self.deleted_chunks_outside_bounds.extend(other.deleted_chunks_outside_bounds); - other.set_chunks.into_iter().for_each(|(node, other_splits)| { - let manifests = self.set_chunks.entry(node).or_insert_with(|| { - HashMap::::with_capacity( - other_splits.len(), - ) - }); - other_splits.into_iter().for_each(|(extent, their_manifest)| { - manifests.entry(extent).or_default().extend(their_manifest) - }) + other.set_chunks.into_iter().for_each(|(node, other_manifest)| { + match self.set_chunks.get_mut(&node) { + Some(manifest) => manifest.extend(other_manifest), + None => { + self.set_chunks.insert(node, other_manifest); + } + } }); } } @@ -251,9 +249,20 @@ impl ChangeSet { pub fn changed_chunks( &self, ) -> impl Iterator)> { - self.edits().set_chunks.iter().map(|(node_id, split_map)| { - (node_id, split_map.values().flat_map(|x| x.keys())) - }) + self.edits() + .set_chunks + .iter() + .map(|(node_id, manifest)| (node_id, manifest.keys())) + } + + pub fn changed_node_chunks( + &self, + node_id: &NodeId, + ) -> impl Iterator { + match self.edits().set_chunks.get(node_id) { + Some(chunks) => Either::Left(chunks.keys()), + None => Either::Right(iter::empty()), + } } pub fn is_updated_array(&self, node: &NodeId) -> bool { @@ -334,7 +343,6 @@ impl ChangeSet { node_id: &NodeId, path: &Path, array_data: ArrayData, - new_splits: &ManifestSplits, ) -> SessionResult<()> { let edits = self.edits_mut()?; match edits.new_arrays.get(path) { @@ -347,65 +355,6 @@ impl ChangeSet { } } - // update existing splits - let mut to_remove = HashSet::::new(); - if let Some(manifests) = edits.set_chunks.remove(node_id) { - let mut new_deleted_chunks = HashSet::::new(); - let mut new_manifests = - HashMap::::with_capacity( - new_splits.len(), - ); - for (old_extents, mut chunks) in manifests.into_iter() { - for new_extents in new_splits.iter() { - if old_extents.overlap_with(new_extents) == Overlap::None { - continue; - } - - // TODO: replace with `BTreeMap.drain_filter` after it is stable. - let mut extracted = - BTreeMap::>::new(); - chunks.retain(|coord, payload| { - let cond = new_extents.contains(coord.0.as_slice()); - if cond { - extracted.insert(coord.clone(), payload.clone()); - } - !cond - }); - new_manifests - .entry(new_extents.clone()) - .or_default() - .extend(extracted); - } - new_deleted_chunks.extend( - chunks.into_iter().filter_map(|(coord, payload)| { - payload.is_none().then_some(coord) - }), - ); - } - - // bring back any previously tracked deletes - if let Some(deletes) = edits.deleted_chunks_outside_bounds.get_mut(node_id) { - for coord in deletes.iter() { - if let Some(extents) = new_splits.find(coord) { - new_manifests - .entry(extents.clone()) - .or_default() - .insert(coord.clone(), None); - to_remove.insert(coord.clone()); - }; - } - deletes.retain(|item| !to_remove.contains(item)); - to_remove.drain(); - }; - edits.set_chunks.insert(node_id.clone(), new_manifests); - - // keep track of any deletes not inserted in to set_chunks - edits - .deleted_chunks_outside_bounds - .entry(node_id.clone()) - .or_default() - .extend(new_deleted_chunks); - } Ok(()) } @@ -469,26 +418,11 @@ impl ChangeSet { node_id: NodeId, coord: ChunkIndices, data: Option, - splits: &ManifestSplits, ) -> SessionResult<()> { - #[allow(clippy::expect_used)] - let extent = splits.find(&coord).expect("logic bug. Trying to set chunk ref but can't find the appropriate split manifest."); // this implementation makes delete idempotent // it allows deleting a deleted chunk by repeatedly setting None. let edits = self.edits_mut()?; - - let old = edits - .set_chunks - .entry(node_id) - .or_insert_with(|| { - HashMap::< - ManifestExtents, - BTreeMap>, - >::with_capacity(splits.len()) - }) - .entry(extent.clone()) - .or_default() - .insert(coord, data); + let old = edits.set_chunks.entry(node_id).or_default().insert(coord, data); if old.is_none() { edits.num_chunks += 1; @@ -510,11 +444,10 @@ impl ChangeSet { node_id: &NodeId, coords: &ChunkIndices, ) -> Option<&Option> { - self.edits().set_chunks.get(node_id).and_then(|node_chunks| { - find_coord(node_chunks.keys(), coords).and_then(|(_, extent)| { - node_chunks.get(extent).and_then(|s| s.get(coords)) - }) - }) + self.edits() + .set_chunks + .get(node_id) + .and_then(|node_chunks| node_chunks.get(coords)) } /// Drop the updated chunk references for the node. @@ -525,9 +458,7 @@ impl ChangeSet { predicate: impl Fn(&ChunkIndices) -> bool, ) -> SessionResult<()> { if let Some(changes) = self.edits_mut()?.set_chunks.get_mut(node_id) { - for split in changes.values_mut() { - split.retain(|coord, _| !predicate(coord)); - } + changes.retain(|coord, _| !predicate(coord)); } Ok(()) } @@ -546,18 +477,13 @@ impl ChangeSet { &self, node_id: &NodeId, node_path: &Path, - extent: ManifestExtents, ) -> impl Iterator)> + use<'_> { if self.is_deleted(node_path, node_id) { return Either::Left(iter::empty()); } match self.edits().set_chunks.get(node_id) { None => Either::Left(iter::empty()), - Some(h) => Either::Right( - h.iter() - .filter(move |(manifest_extent, _)| extent.matches(manifest_extent)) - .flat_map(|(_, manifest)| manifest.iter()), - ), + Some(manifest) => Either::Right(manifest.iter()), } } @@ -565,8 +491,7 @@ impl ChangeSet { &self, ) -> impl Iterator + use<'_> { self.edits().new_arrays.iter().flat_map(|(path, (node_id, _))| { - self.new_array_chunk_iterator(node_id, path, ManifestExtents::ALL) - .map(|ci| (path.clone(), ci)) + self.new_array_chunk_iterator(node_id, path).map(|ci| (path.clone(), ci)) }) } @@ -574,9 +499,8 @@ impl ChangeSet { &'a self, node_id: &'a NodeId, node_path: &Path, - extent: ManifestExtents, ) -> impl Iterator + use<'a> { - self.array_chunks_iterator(node_id, node_path, extent).filter_map( + self.array_chunks_iterator(node_id, node_path).filter_map( move |(coords, payload)| { payload.as_ref().map(|p| ChunkInfo { node: node_id.clone(), @@ -587,38 +511,23 @@ impl ChangeSet { ) } - pub fn modified_manifest_extents_iterator( - &self, - node_id: &NodeId, - node_path: &Path, - ) -> impl Iterator + use<'_> { - if self.is_deleted(node_path, node_id) { - return Either::Left(iter::empty()); - } - match self.edits().set_chunks.get(node_id) { - None => Either::Left(iter::empty()), - Some(h) => Either::Right(h.keys()), - } - } - - pub fn array_manifest( - &self, - node_id: &NodeId, - extent: &ManifestExtents, - ) -> Option<&SplitManifest> { - self.edits().set_chunks.get(node_id).and_then(|x| x.get(extent)) + pub fn array_manifest(&self, node_id: &NodeId) -> Option<&ChunkTable> { + self.edits().set_chunks.get(node_id) } pub fn new_nodes(&self) -> impl Iterator { - self.new_groups().chain(self.new_arrays()) + self.new_groups().chain(self.new_arrays().map(|(path, id, _)| (path, id))) } pub fn new_groups(&self) -> impl Iterator { self.edits().new_groups.iter().map(|(path, (node_id, _))| (path, node_id)) } - pub fn new_arrays(&self) -> impl Iterator { - self.edits().new_arrays.iter().map(|(path, (node_id, _))| (path, node_id)) + pub fn new_arrays(&self) -> impl Iterator { + self.edits() + .new_arrays + .iter() + .map(|(path, (node_id, node_data))| (path, node_id, node_data)) } /// Merge this ChangeSet with `other`. @@ -760,7 +669,7 @@ mod tests { change_set::{ArrayData, EditChanges, MoveTracker}, format::{ ChunkIndices, NodeId, Path, - manifest::{ChunkInfo, ChunkPayload, ManifestSplits}, + manifest::{ChunkInfo, ChunkPayload}, snapshot::ArrayShape, }, roundtrip_serialization_tests, @@ -793,41 +702,29 @@ mod tests { )?; assert_eq!(None, change_set.new_arrays_chunk_iterator().next()); - let splits1 = ManifestSplits::from_edges(vec![vec![0, 10], vec![0, 10]]); - - change_set.set_chunk_ref( - node_id1.clone(), - ChunkIndices(vec![0, 1]), - None, - &splits1, - )?; + change_set.set_chunk_ref(node_id1.clone(), ChunkIndices(vec![0, 1]), None)?; assert_eq!(None, change_set.new_arrays_chunk_iterator().next()); change_set.set_chunk_ref( node_id1.clone(), ChunkIndices(vec![1, 0]), Some(ChunkPayload::Inline("bar1".into())), - &splits1, )?; change_set.set_chunk_ref( node_id1.clone(), ChunkIndices(vec![1, 1]), Some(ChunkPayload::Inline("bar2".into())), - &splits1, )?; - let splits2 = ManifestSplits::from_edges(vec![vec![0, 10]]); change_set.set_chunk_ref( node_id2.clone(), ChunkIndices(vec![0]), Some(ChunkPayload::Inline("baz1".into())), - &splits2, )?; change_set.set_chunk_ref( node_id2.clone(), ChunkIndices(vec![1]), Some(ChunkPayload::Inline("baz2".into())), - &splits2, )?; { @@ -1009,8 +906,7 @@ mod tests { } use crate::strategies::{ - array_data, bytes, gen_move, large_chunk_indices, manifest_extents, node_id, - path, split_manifest, + array_data, bytes, gen_move, large_chunk_indices, node_id, path, split_manifest, }; use proptest::collection::{btree_map, hash_map, hash_set, vec}; use proptest::prelude::*; @@ -1021,9 +917,7 @@ mod tests { new_arrays in hash_map(path(),(node_id(), array_data()), 0..3), updated_arrays in hash_map(node_id(), array_data(), 0..3), updated_groups in hash_map(node_id(), bytes(), 0..3), - set_chunks in btree_map(node_id(), - hash_map(manifest_extents(num_of_dims), split_manifest(), 0..3), - 0..3), + set_chunks in btree_map(node_id(), split_manifest(), 0..3), deleted_chunks_outside_bounds in btree_map(node_id(), hash_set(large_chunk_indices(num_of_dims), 0..3), 0..3), deleted_groups in hash_set((path(), node_id()), 0..3), deleted_arrays in hash_set((path(), node_id()), 0..3) diff --git a/icechunk/src/format/manifest.rs b/icechunk/src/format/manifest.rs index 70e5bef9b..4b69c61b2 100644 --- a/icechunk/src/format/manifest.rs +++ b/icechunk/src/format/manifest.rs @@ -507,7 +507,7 @@ fn lookup_ref<'a>( }) } -struct PayloadIterator { +pub struct PayloadIterator { manifest: Arc, node_id: NodeId, last_ref_index: usize, diff --git a/icechunk/src/format/transaction_log.rs b/icechunk/src/format/transaction_log.rs index 9a610384d..fa9f1672c 100644 --- a/icechunk/src/format/transaction_log.rs +++ b/icechunk/src/format/transaction_log.rs @@ -31,7 +31,7 @@ impl TransactionLog { let mut new_groups: Vec<_> = cs.new_groups().map(|(_, id)| generated::ObjectId8::new(&id.0)).collect(); let mut new_arrays: Vec<_> = - cs.new_arrays().map(|(_, id)| generated::ObjectId8::new(&id.0)).collect(); + cs.new_arrays().map(|(_, id, _)| generated::ObjectId8::new(&id.0)).collect(); let mut deleted_groups: Vec<_> = cs.deleted_groups().map(|(_, id)| generated::ObjectId8::new(&id.0)).collect(); let mut deleted_arrays: Vec<_> = @@ -626,10 +626,8 @@ mod tests { use crate::{ change_set::{ArrayData, ChangeSet}, format::{ - ChunkIndices, NodeId, SnapshotId, - manifest::{ChunkPayload, ManifestExtents, ManifestSplits}, - snapshot::ArrayShape, - transaction_log::TransactionLog, + ChunkIndices, NodeId, SnapshotId, manifest::ChunkPayload, + snapshot::ArrayShape, transaction_log::TransactionLog, }, }; @@ -659,7 +657,6 @@ mod tests { chunk_added.clone(), ChunkIndices(vec![0]), Some(ChunkPayload::Inline(Bytes::new())), - &ManifestSplits::from_extents(vec![ManifestExtents::new(&[0], &[100])]), )?; let t1 = TransactionLog::new(&SnapshotId::random(), &cs1); @@ -694,29 +691,17 @@ mod tests { )?; cs2.delete_array("/a2".try_into().unwrap(), &deleted_array2)?; cs2.update_group(&updated_group2, &"/g3".try_into().unwrap(), Bytes::new())?; - cs2.set_chunk_ref( - chunk_added.clone(), - ChunkIndices(vec![0]), - None, - &ManifestSplits::from_extents(vec![ManifestExtents::new(&[0], &[100])]), - )?; + cs2.set_chunk_ref(chunk_added.clone(), ChunkIndices(vec![0]), None)?; cs2.set_chunk_ref( chunk_added.clone(), ChunkIndices(vec![1]), Some(ChunkPayload::Inline(Bytes::new())), - &ManifestSplits::from_extents(vec![ManifestExtents::new(&[0], &[100])]), - )?; - cs2.set_chunk_ref( - chunk_added.clone(), - ChunkIndices(vec![42]), - None, - &ManifestSplits::from_extents(vec![ManifestExtents::new(&[0], &[100])]), )?; + cs2.set_chunk_ref(chunk_added.clone(), ChunkIndices(vec![42]), None)?; cs2.set_chunk_ref( chunk_added2.clone(), ChunkIndices(vec![7]), Some(ChunkPayload::Inline(Bytes::new())), - &ManifestSplits::from_extents(vec![ManifestExtents::new(&[0], &[100])]), )?; let t3 = TransactionLog::new(&SnapshotId::random(), &cs2); diff --git a/icechunk/src/refs.rs b/icechunk/src/refs.rs index b0c1eb0a8..438d7344e 100644 --- a/icechunk/src/refs.rs +++ b/icechunk/src/refs.rs @@ -656,18 +656,12 @@ mod tests { delete_tag(storage.as_ref(), &storage_settings, "tag1").await?; // cannot delete twice - assert!(delete_tag(storage.as_ref(), &storage_settings, "tag1") - .await - .is_err()); + assert!(delete_tag(storage.as_ref(), &storage_settings, "tag1").await.is_err()); // we cannot delete non-existent tag - assert!(delete_tag( - storage.as_ref(), - &storage_settings, - "doesnt_exist", - ) - .await - .is_err()); + assert!( + delete_tag(storage.as_ref(), &storage_settings, "doesnt_exist",).await.is_err() + ); // cannot recreate same tag matches!(create_tag( diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index e818ad70c..858cf0412 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -1712,20 +1712,20 @@ impl Repository { let manifest_id_c = manifest_id.clone(); let path = node.path.clone(); futures.push(async move { - trace!("Preloading manifest {} for array {}", &manifest_id_c, path); - if let Err(err) = asset_manager - .fetch_manifest( - &manifest_id_c, - size_bytes, - ) - .await - { - error!( - "Failure pre-loading manifest {}: {}", - &manifest_id_c, err - ); - } - }); + trace!( + "Preloading manifest {} for array {}", + &manifest_id_c, path + ); + if let Err(err) = asset_manager + .fetch_manifest(&manifest_id_c, size_bytes) + .await + { + error!( + "Failure pre-loading manifest {}: {}", + &manifest_id_c, err + ); + } + }); loaded_manifests.insert(manifest_id); loaded_refs += manifest_info.num_chunk_refs; } @@ -1879,10 +1879,10 @@ mod tests { asset_manager: &Arc, total_manifests: usize, ) { - let expected = asset_manager.list_manifests().await.unwrap().count().await; + let actual = asset_manager.list_manifests().await.unwrap().count().await; assert_eq!( - total_manifests, expected, - "Mismatch in manifest count: expected {expected}, but got {total_manifests}", + total_manifests, actual, + "Mismatch in manifest count: expected {total_manifests}, but got {actual}", ); } @@ -2271,17 +2271,6 @@ mod tests { } verify_data(&session, 0).await; - let node = session.get_node(&array_path).await?; - let orig_splits = session.lookup_splits(&node.id).cloned(); - assert_eq!( - orig_splits, - Some(ManifestSplits::from_edges(vec![ - vec![0, 3, 6, 9, 12, 13], - vec![0, 2], - vec![0, 1] - ])) - ); - // this should update the splits session .update_array( @@ -2292,15 +2281,6 @@ mod tests { ) .await?; verify_data(&session, 0).await; - let new_splits = session.lookup_splits(&node.id).cloned(); - assert_eq!( - new_splits, - Some(ManifestSplits::from_edges(vec![ - vec![0, 4, 8, 12, 13], - vec![0, 2], - vec![0, 1] - ])) - ); // update data for i in 0..12 { @@ -2989,7 +2969,6 @@ mod tests { [vec![0, 0, 1, 0], vec![0, 0, 0, 0], vec![0, 2, 0, 0], vec![0, 2, 0, 1]]; let mut session1 = repository.writable_session("main").await?; - let node_id = session1.get_node(&temp_path).await?.id; session1 .set_chunk_ref( temp_path.clone(), @@ -3037,7 +3016,7 @@ mod tests { ) .await?; - assert!(session1.merge(session2).await.is_err()); + assert!(session1.merge(session2).await.is_ok()); } // now with the same split sizes @@ -3060,12 +3039,6 @@ mod tests { ) .await?; - // Session.splits should be _complete_ so it should be identical for the same node - // on any two sessions with compatible splits - let splits = session1.lookup_splits(&node_id).unwrap().clone(); - assert_eq!(session1.lookup_splits(&node_id), session2.lookup_splits(&node_id)); - session1.merge(session2).await?; - assert_eq!(session1.lookup_splits(&node_id), Some(&splits)); for (val, idx) in enumerate(indices.iter()) { let actual = get_chunk( session1 diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index c43d3c7d4..981323d57 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -35,7 +35,7 @@ use tracing::{Instrument, debug, info, instrument, trace, warn}; use crate::{ RepositoryConfig, Storage, StorageError, asset_manager::AssetManager, - change_set::{ArrayData, ChangeSet}, + change_set::{ArrayData, ChangeSet, ChunkTable}, config::{ManifestSplitDim, ManifestSplitDimCondition, ManifestSplittingConfig}, conflicts::{Conflict, ConflictResolution, ConflictSolver}, error::ICError, @@ -148,11 +148,6 @@ pub enum SessionErrorKind { InvalidIndex { coords: ChunkIndices, path: Path }, #[error("invalid chunk index for splitting manifests: {coords:?}")] InvalidIndexForSplitManifests { coords: ChunkIndices }, - #[error("incompatible manifest splitting config when merging two sessions")] - IncompatibleSplittingConfig { - ours: ManifestSplittingConfig, - theirs: ManifestSplittingConfig, - }, #[error("`to` snapshot ancestry doesn't include `from`")] BadSnapshotChainForDiff, #[error( @@ -272,8 +267,6 @@ pub struct Session { snapshot_id: SnapshotId, change_set: ChangeSet, default_commit_metadata: SnapshotProperties, - // This is an optimization so that we needn't figure out the split sizes on every set. - splits: HashMap, } impl Session { @@ -295,9 +288,6 @@ impl Session { snapshot_id, change_set: ChangeSet::for_edits(), default_commit_metadata: SnapshotProperties::default(), - // Splits are populated for a node during - // `add_array`, `update_array`, and `set_chunk_ref` - splits: Default::default(), } } @@ -322,7 +312,6 @@ impl Session { snapshot_id, change_set: ChangeSet::for_edits(), default_commit_metadata, - splits: Default::default(), } } @@ -347,7 +336,6 @@ impl Session { snapshot_id, change_set: ChangeSet::for_rearranging(), default_commit_metadata, - splits: Default::default(), } } @@ -496,7 +484,6 @@ impl Session { match self.get_node(&path).await { Err(SessionError { kind: SessionErrorKind::NodeNotFound { .. }, .. }) => { let id = NodeId::random(); - self.cache_splits(&id, &path, &shape, &dimension_names); self.change_set_mut()?.add_array( path, id, @@ -526,8 +513,6 @@ impl Session { ) -> SessionResult<()> { match self.get_array(path).await { Ok(node) => { - // needed to handle a resize for example. - self.cache_splits(&node.id, path, &shape, &dimension_names); { // we need to play this trick because we need to borrow from self twice // once to get the mutable change set, and other to compute @@ -538,14 +523,10 @@ impl Session { let _ = self.change_set_mut()?; } let change_set = &mut self.change_set; - #[allow(clippy::expect_used)] - let splits = - self.splits.get(&node.id).expect("getting splits should not fail."); change_set.update_array( &node.id, path, ArrayData { shape, dimension_names, user_data }, - splits, )?; Ok(()) } @@ -608,12 +589,8 @@ impl Session { { let node = self.get_array(array_path).await?; #[allow(clippy::panic)] - let (shape, splits) = if let NodeData::Array { shape, dimension_names, .. } = - node.node_data - { - let splits = - self.get_splits(&node.id, &node.path, &shape, &dimension_names).clone(); - (shape, splits) + let shape = if let NodeData::Array { shape, .. } = node.node_data { + shape } else { // we know it's an array because get_array succeeded panic!("bug in reindex") @@ -631,7 +608,6 @@ impl Session { node.id.clone(), new_chunk_index, new_payload, - &splits, )?; } else { return Err(SessionErrorKind::InvalidIndex { @@ -684,10 +660,6 @@ impl Session { self.set_node_chunk_ref(node_snapshot, coord, data).await } - pub fn lookup_splits(&self, node_id: &NodeId) -> Option<&ManifestSplits> { - self.splits.get(node_id) - } - fn change_set(&self) -> &ChangeSet { &self.change_set } @@ -700,37 +672,13 @@ impl Session { } } - /// This method is directly called in add_array & update_array - /// where we know we must update the splits HashMap - fn cache_splits( - &mut self, - node_id: &NodeId, - path: &Path, - shape: &ArrayShape, - dimension_names: &Option>, - ) { - // Q: What happens if we set a chunk, then change a dimension name, so - // that the split changes. - // A: we reorg the existing chunk refs in the changeset to the new splits. - let splitting = self.config.manifest().splitting(); - let splits = splitting.get_split_sizes(path, shape, dimension_names); - self.splits.insert(node_id.clone(), splits); - } - fn get_splits( &mut self, - node_id: &NodeId, path: &Path, shape: &ArrayShape, dimension_names: &Option>, - ) -> &ManifestSplits { - self.splits.entry(node_id.clone()).or_insert_with(|| { - self.config.manifest().splitting().get_split_sizes( - path, - shape, - dimension_names, - ) - }) + ) -> ManifestSplits { + self.config.manifest().splitting().get_split_sizes(path, shape, dimension_names) } // Helper function that accepts a NodeSnapshot instead of a path, @@ -742,12 +690,9 @@ impl Session { coord: ChunkIndices, data: Option, ) -> SessionResult<()> { - if let NodeData::Array { shape, dimension_names, .. } = node.node_data { + if let NodeData::Array { shape, .. } = node.node_data { if shape.valid_chunk_coord(&coord) { - let splits = self - .get_splits(&node.id, &node.path, &shape, &dimension_names) - .clone(); - self.change_set_mut()?.set_chunk_ref(node.id, coord, data, &splits)?; + self.change_set_mut()?.set_chunk_ref(node.id, coord, data)?; Ok(()) } else { Err(SessionErrorKind::InvalidIndex { @@ -821,7 +766,6 @@ impl Session { self.change_set(), &self.snapshot_id, path, - ManifestExtents::ALL, ) .await } @@ -1066,7 +1010,6 @@ impl Session { &self.change_set, &self.snapshot_id, node.clone(), - ManifestExtents::ALL, ) .await } @@ -1083,7 +1026,6 @@ impl Session { self.change_set(), &self.snapshot_id, node.clone(), - ManifestExtents::ALL, ) .await .map_ok(|(_path, chunk_info)| chunk_info.coord); @@ -1091,7 +1033,7 @@ impl Session { let res = try_stream! { let new_chunks = stream::iter( self.change_set() - .new_array_chunk_iterator(&node.id, array_path, ManifestExtents::ALL) + .new_array_chunk_iterator(&node.id, array_path) .map(|chunk_info| Ok::(chunk_info.coord)), ); @@ -1132,25 +1074,8 @@ impl Session { if self.read_only() { return Err(SessionErrorKind::ReadOnlySession.into()); } - let Session { splits: other_splits, change_set, config: other_config, .. } = - other; - - if self.splits.iter().any(|(node, our_splits)| { - other_splits - .get(node) - .is_some_and(|their_splits| !our_splits.compatible_with(their_splits)) - }) { - let ours = self.config().manifest().splitting().clone(); - let theirs = other_config.manifest().splitting().clone(); - return Err( - SessionErrorKind::IncompatibleSplittingConfig { ours, theirs }.into() - ); - } + let Session { change_set, .. } = other; - // Session.splits is _complete_ in that it will include every possible split. - // So a simple `extend` is fine, if the same node appears in two sessions, - // it must have the same splits and overwriting is fine. - self.splits.extend(other_splits); self.change_set.merge(change_set)?; Ok(()) } @@ -1243,11 +1168,16 @@ impl Session { Arc::clone(&self.asset_manager), &self.change_set, self.snapshot_id(), - &self.splits, ); - let new_snap = - do_flush(flush_data, message, properties, false, CommitMethod::NewCommit) - .await?; + let new_snap = do_flush( + flush_data, + message, + properties, + false, + CommitMethod::NewCommit, + self.config.manifest().splitting(), + ) + .await?; match self.spec_version() { SpecVersionBin::V1dot0 => self.flush_v1(Arc::clone(&new_snap)).await, @@ -1282,13 +1212,12 @@ impl Session { // In the normal chunk setting workflow, that would've been done by `set_chunk_ref` for node in nodes.into_iter().flatten() { if let NodeSnapshot { - id, path, node_data: NodeData::Array { shape, dimension_names, .. }, .. } = node { - self.get_splits(&id, &path, &shape, &dimension_names); + self.get_splits(&path, &shape, &dimension_names); } } @@ -1348,9 +1277,9 @@ impl Session { change_set, message, Some(properties), - &self.splits, rewrite_manifests, commit_method, + self.config.manifest().splitting(), allow_empty, self.config.repo_update_retries().retries(), num_updates, @@ -1630,14 +1559,8 @@ async fn updated_chunk_iterator<'a>( // We have not applied any changeset updates. At the moment, the downstream code only // use node.id so there is no need to update yet. - Ok(updated_node_chunks_iterator( - asset_manager, - change_set, - snapshot_id, - node, - ManifestExtents::ALL, - ) - .await) + Ok(updated_node_chunks_iterator(asset_manager, change_set, snapshot_id, node) + .await) }); Ok(res.try_flatten()) } @@ -1647,7 +1570,6 @@ async fn updated_node_chunks_iterator<'a>( change_set: &'a ChangeSet, snapshot_id: &'a SnapshotId, node: NodeSnapshot, - extent: ManifestExtents, ) -> impl Stream> + 'a { // This iterator should yield chunks for existing arrays + any updates. // we check for deletion here in the case that `path` exists in the snapshot, @@ -1658,15 +1580,9 @@ async fn updated_node_chunks_iterator<'a>( let path = node.path.clone(); Either::Right( // TODO: avoid clone - verified_node_chunk_iterator( - asset_manager, - snapshot_id, - change_set, - node, - extent, - ) - .await - .map_ok(move |ci| (path.clone(), ci)), + verified_node_chunk_iterator(asset_manager, snapshot_id, change_set, node) + .await + .map_ok(move |ci| (path.clone(), ci)), ) } } @@ -1677,18 +1593,11 @@ async fn node_chunk_iterator<'a>( change_set: &'a ChangeSet, snapshot_id: &'a SnapshotId, path: &Path, - extent: ManifestExtents, ) -> impl Stream> + 'a + use<'a> { match get_node(asset_manager, change_set, snapshot_id, path).await { Ok(node) => futures::future::Either::Left( - verified_node_chunk_iterator( - asset_manager, - snapshot_id, - change_set, - node, - extent, - ) - .await, + verified_node_chunk_iterator(asset_manager, snapshot_id, change_set, node) + .await, ), Err(_) => futures::future::Either::Right(futures::stream::empty()), } @@ -1700,14 +1609,13 @@ async fn verified_node_chunk_iterator<'a>( snapshot_id: &'a SnapshotId, change_set: &'a ChangeSet, node: NodeSnapshot, - extent: ManifestExtents, ) -> impl Stream> + 'a { match node.node_data { NodeData::Group => futures::future::Either::Left(futures::stream::empty()), NodeData::Array { manifests, .. } => { let new_chunk_indices: Box> = Box::new( change_set - .array_chunks_iterator(&node.id, &node.path, extent.clone()) + .array_chunks_iterator(&node.id, &node.path) .map(|(idx, _)| idx) // by chaining here, we make sure we don't pull from the manifest // any chunks that were deleted prior to resizing in this session @@ -1716,9 +1624,8 @@ async fn verified_node_chunk_iterator<'a>( ); let node_id_c = node.id.clone(); - let extent_c = extent.clone(); let new_chunks = change_set - .array_chunks_iterator(&node.id, &node.path, extent.clone()) + .array_chunks_iterator(&node.id, &node.path) .filter_map(move |(idx, payload)| { payload.as_ref().map(|payload| { Ok(ChunkInfo { @@ -1732,18 +1639,11 @@ async fn verified_node_chunk_iterator<'a>( futures::future::Either::Right( futures::stream::iter(new_chunks).chain( futures::stream::iter(manifests) - .filter(move |manifest_ref| { - futures::future::ready( - extent.overlap_with(&manifest_ref.extents) - != Overlap::None, - ) - }) .then(move |manifest_ref| { let new_chunk_indices = new_chunk_indices.clone(); let node_id_c = node.id.clone(); let node_id_c2 = node.id.clone(); let node_id_c3 = node.id.clone(); - let extent_c2 = extent_c.clone(); async move { let manifest = fetch_manifest( &manifest_ref.object_id, @@ -1757,9 +1657,6 @@ async fn verified_node_chunk_iterator<'a>( .iter(node_id_c.clone()) .filter_ok(move |(coord, _)| { !new_chunk_indices.contains(coord) - // If the manifest we are parsing partially overlaps with `extent`, - // we need to filter all coordinates - && extent_c2.contains(coord.0.as_slice()) }) .map_ok(move |(coord, payload)| ChunkInfo { node: node_id_c2.clone(), @@ -2034,7 +1931,6 @@ struct FlushProcess<'a> { asset_manager: Arc, change_set: &'a ChangeSet, parent_id: &'a SnapshotId, - splits: &'a HashMap, manifest_refs: HashMap>, manifest_files: HashSet, } @@ -2044,36 +1940,16 @@ impl<'a> FlushProcess<'a> { asset_manager: Arc, change_set: &'a ChangeSet, parent_id: &'a SnapshotId, - splits: &'a HashMap, ) -> Self { Self { asset_manager, change_set, parent_id, - splits, manifest_refs: Default::default(), manifest_files: Default::default(), } } - async fn write_manifest_for_updated_chunks( - &mut self, - node: &NodeSnapshot, - extent: &ManifestExtents, - ) -> SessionResult> { - let asset_manager = Arc::clone(&self.asset_manager); - let updated_chunks = updated_node_chunks_iterator( - asset_manager.as_ref(), - self.change_set, - self.parent_id, - node.clone(), - extent.clone(), - ) - .await - .map_ok(|(_path, chunk_info)| chunk_info); - self.write_manifest_from_iterator(updated_chunks).await - } - async fn write_manifest_from_iterator( &mut self, chunks: impl Stream>, @@ -2110,16 +1986,17 @@ impl<'a> FlushProcess<'a> { &mut self, node_id: &NodeId, node_path: &Path, + splits: &ManifestSplits, ) -> SessionResult<()> { #[allow(clippy::expect_used)] - let splits = - self.splits.get(node_id).expect("getting split for node unexpectedly failed"); - for extent in splits.iter() { - if self.change_set.array_manifest(node_id, extent).is_some() { + if self.change_set.array_manifest(node_id).is_some() { let chunks = stream::iter( self.change_set - .new_array_chunk_iterator(node_id, node_path, extent.clone()) + .new_array_chunk_iterator(node_id, node_path) + // FIXME: do we need to optimize this so we don't need multiple passes over all chunks calling + // contains? + .filter(|chunk| extent.contains(&chunk.coord.0)) .map(Ok), ); let new_ref = self.write_manifest_from_iterator(chunks).await?; @@ -2132,88 +2009,178 @@ impl<'a> FlushProcess<'a> { Ok(()) } + /// Creates a new manifest for the node, by obtaining all previous chunks coming from + /// `previous_manifests`, filtering those that are in the `extent`, and overriding them + /// with any changes in `modified_chunks` + async fn write_manifest_with_changes( + &mut self, + previous_manifests: impl Iterator, + modified_chunks: &ChunkTable, + extent: &ManifestExtents, + node_id: &NodeId, + old_snapshot: &SnapshotId, + ) -> SessionResult> { + // Collect unmodified chunks from all intersecting manifests + let mut all_chunks_vec: Vec> = Vec::new(); + + // add chunks from previous manifests that are not modified + for mref in previous_manifests { + let manifest = + fetch_manifest(&mref.object_id, old_snapshot, &self.asset_manager) + .await?; + + for item in manifest.iter(node_id.clone()) { + match item { + Ok((idx, payload)) => { + if !modified_chunks.contains_key(&idx) && extent.contains(&idx.0) + { + all_chunks_vec.push(Ok(ChunkInfo { + node: node_id.clone(), + coord: idx.clone(), + payload: payload.clone(), + })); + } + } + Err(e) => all_chunks_vec.push(Err(e.into())), + } + } + } + + // add modified chunks + for (idx, maybe_payload) in modified_chunks.iter() { + if let Some(payload) = maybe_payload.as_ref() { + all_chunks_vec.push(Ok(ChunkInfo { + node: node_id.clone(), + coord: idx.clone(), + payload: payload.clone(), + })); + } + } + + self.write_manifest_from_iterator(stream::iter(all_chunks_vec).err_into()).await + } + /// Write a manifest for a node that was modified in this session /// It needs to update the chunks according to the change set /// and record the new manifest async fn write_manifest_for_existing_node( &mut self, node: &NodeSnapshot, - existing_manifests: Vec, + existing_manifests: &[ManifestRef], old_snapshot: &Snapshot, rewrite_manifests: bool, + splits: &ManifestSplits, ) -> SessionResult<()> { - #[allow(clippy::expect_used)] - let splits = - self.splits.get(&node.id).expect("splits should exist for this node."); - let mut refs = - HashMap::>::with_capacity(splits.len()); - - let on_disk_extents = - existing_manifests.iter().map(|m| m.extents.clone()).collect::>(); - - let modified_splits = self + // Some points to take into account to understand this algorithm: + // * The `splits` could have changed, so the `existing_manifests` not necessarily were + // created with the same splits, they could be widely different + // * In general we don't want to rewrite past manifests if we don't have to, we just + // try to reuse them, but if user says `rewrite_manifests=true` we'll rewrite everything + // * This function needs to work in the scenario where there are multiple past manifests + // for the node, and there are also session changes to chunks. These changes can be + // modifying, adding or deleting existing chunks. + // * We want this function to take time and space proportional to the size of the split, + // and not to the total size of the array. + // + // The algorithm: + // + // * Analyze all chunks in the changeset to understand what splits have been changed, this + // is a full pass through the changeset. Results are put in `update_chunks_by_extent`. + // new snapshot + // * For each (current) extent `extent` in the array splits: + // * find intersecting_manifests, all manifests in existing_manifests that have non-empty + // intersection with `extent` + // * if there are changes in this session to `extent` or we wants to rewrite manifests: + // * create a new manifest with all chunks in `extent` from all + // `intersecting_manifests`, overriding chunks with those coming from the change + // set, add the new manifest to the list of refs + // * else (no changes in this session to `extent`) + // * for each intersecting manifest: + // * if it's fully contained in the extent, we can reuse it, just add it to th + // elist of refs + // * else create a new manifest filtering out the chunks that are outside of + // the extent + let updated_chunks_by_extent: HashMap = self .change_set - .modified_manifest_extents_iterator(&node.id, &node.path) - .collect::>(); - - // ``modified_splits`` (i.e. splits used in this session) - // must be a subset of ``splits`` (the splits set in the config) - debug_assert!(modified_splits.is_subset(&splits.iter().collect::>())); + .array_chunks_iterator(&node.id, &node.path) + .fold(HashMap::new(), |mut res, (idx, payload)| { + if let Some(extents) = splits.find(idx) { + let entry = res.entry(extents.clone()).or_default(); + entry.insert(idx.clone(), payload.clone()); + } + res + }); + let snapshot_id = &old_snapshot.id(); for extent in splits.iter() { - if rewrite_manifests || modified_splits.contains(extent) { - // this split was modified in this session, rewrite it completely - self.write_manifest_for_updated_chunks(node, extent) + let intersecting_manifests: Vec<(&ManifestRef, Overlap)> = existing_manifests + .iter() + .filter_map(|mr| { + // order is critical here, `overlap_with` is not symmetric + match mr.extents.overlap_with(extent) { + Overlap::None => None, + ov => Some((mr, ov)), + } + }) + .collect(); + + let no_changes = Default::default(); + let modified_chunks = + updated_chunks_by_extent.get(extent).unwrap_or(&no_changes); + + if !modified_chunks.is_empty() || rewrite_manifests { + // if we were ask to rewrite manifests, or there are modified chunks in this split + // we need to create a new manifest for the split, previous manifests are of no use + if let Some(new_ref) = self + .write_manifest_with_changes( + intersecting_manifests.iter().map(|(mr, _)| *mr), + modified_chunks, + extent, + &node.id, + snapshot_id, + ) .await? - .map(|new_ref| refs.insert(extent.clone(), vec![new_ref])); + { + self.manifest_refs.entry(node.id.clone()).or_default().push(new_ref); + } } else { - // intersection of the current split with extents on disk - let on_disk_bbox = on_disk_extents - .iter() - .filter_map(|e| e.intersection(extent)) - .reduce(|a, b| a.union(&b)); - - // split was unmodified in this session. Let's look at the current manifests - // and see what we need to do with them - for old_ref in existing_manifests.iter() { - // Remember that the extents written to disk are the `from`:`to` ranges - // of populated chunks - match old_ref.extents.overlap_with(extent) { - Overlap::Complete => { - debug_assert!(on_disk_bbox.is_some()); - // Just propagate this ref again, no rewriting necessary - refs.entry(extent.clone()).or_default().push(old_ref.clone()); - // OK to unwrap here since this manifest file must exist in the old snapshot - #[allow(clippy::expect_used)] + // the session made no changes to this split, so we may have opportunity to reuse + // the previous manifests + for (mref, overlap) in intersecting_manifests { + if overlap == Overlap::Complete { + // only if the full manifest overlaps with the current split we can reuse + // it, otherwise it could have "extra stuff" we don't want. Remember splits + // can be different now than when the manifest was first written + self.manifest_refs + .entry(node.id.clone()) + .or_default() + .push(mref.clone()); + // OK to unwrap here since this manifest file must exist in the old snapshot + #[allow(clippy::expect_used)] self.manifest_files.insert( - old_snapshot.manifest_info(&old_ref.object_id).expect("logic bug. creating manifest file info for an existing manifest failed."), + old_snapshot.manifest_info(&mref.object_id).expect("logic bug. creating manifest file info for an existing manifest failed."), ); - } - Overlap::Partial => { - // the splits have changed, but no refs in this split have been written in this session - // same as `if` block above - debug_assert!(on_disk_bbox.is_some()); - if let Some(new_ref) = self - .write_manifest_for_updated_chunks(node, extent) - .await? - { - refs.entry(extent.clone()).or_default().push(new_ref); - } - } - Overlap::None => { - // Nothing to do - } - }; + } else if let Some(new_ref) = self + // if the existing manifest only partially overlaps, we need to write a new + // one that contains only the chunks we want + .write_manifest_with_changes( + std::iter::once(mref), + &no_changes, + extent, + &node.id, + snapshot_id, + ) + .await? + { + self.manifest_refs + .entry(node.id.clone()) + .or_default() + .push(new_ref); + } } } } - // FIXME: Assert that bboxes in refs don't overlap - - self.manifest_refs - .entry(node.id.clone()) - .or_default() - .extend(refs.into_values().flatten()); Ok(()) } @@ -2336,6 +2303,7 @@ async fn do_flush( properties: SnapshotProperties, rewrite_manifests: bool, commit_method: CommitMethod, + split_config: &ManifestSplittingConfig, ) -> SessionResult> { let old_snapshot = flush_data.asset_manager.fetch_snapshot(flush_data.parent_id).await?; @@ -2390,13 +2358,22 @@ async fn do_flush( &node.path, ) .await?; - if let NodeData::Array { manifests, .. } = new_node.node_data { + + if let NodeData::Array { manifests, shape, dimension_names } = + new_node.node_data + { + let splits = split_config.get_split_sizes( + &new_node.path, + &shape, + &dimension_names, + ); flush_data .write_manifest_for_existing_node( &node, - manifests, + manifests.as_slice(), old_snapshot.as_ref(), rewrite_manifests, + &splits, ) .await?; } @@ -2409,9 +2386,15 @@ async fn do_flush( // Now we need to go through all the new arrays, and generate manifests for them - for (node_path, node_id) in flush_data.change_set.new_arrays() { + for (node_path, node_id, array_data) in flush_data.change_set.new_arrays() { + let splits = split_config.get_split_sizes( + node_path, + &array_data.shape, + &array_data.dimension_names, + ); + trace!(path=%node_path, "New node, writing a manifest"); - flush_data.write_manifest_for_new_node(node_id, node_path).await?; + flush_data.write_manifest_for_new_node(node_id, node_path, &splits).await?; } // manifest_files & manifest_refs _must_ be consistent @@ -2562,9 +2545,9 @@ async fn do_commit( change_set: &ChangeSet, message: &str, properties: Option, - splits: &HashMap, rewrite_manifests: bool, commit_method: CommitMethod, + split_config: &ManifestSplittingConfig, allow_empty: bool, retry_settings: &storage::RetriesSettings, num_updates_per_repo_info_file: u16, @@ -2582,10 +2565,16 @@ async fn do_commit( let properties = properties.unwrap_or_default(); let flush_data = - FlushProcess::new(Arc::clone(&asset_manager), change_set, snapshot_id, splits); - let new_snapshot = - do_flush(flush_data, message, properties, rewrite_manifests, commit_method) - .await?; + FlushProcess::new(Arc::clone(&asset_manager), change_set, snapshot_id); + let new_snapshot = do_flush( + flush_data, + message, + properties, + rewrite_manifests, + commit_method, + split_config, + ) + .await?; let new_snapshot_id = new_snapshot.id(); let res = match asset_manager.spec_version() { @@ -2831,6 +2820,7 @@ mod tests { }; use super::*; + use async_trait::async_trait; use icechunk_macros::tokio_test; use itertools::{Itertools, assert_equal}; @@ -4641,6 +4631,7 @@ mod tests { let barrier = Arc::new(Barrier::new(2)); let barrier_c = Arc::clone(&barrier); let barrier_cc = Arc::clone(&barrier); + let handle1 = tokio::spawn(async move { let _ = barrier_c.wait().await; ds1.commit("from 1", None).await @@ -5561,6 +5552,101 @@ mod tests { Ok(()) } + #[icechunk_macros::tokio_test] + // Test rebase over a commit with a resize. + // + // Error-triggering flow: + // 1. Session A starts from snapshot S1 (array shape [5, 1]) + // 2. Splits are cached for the array with extent [0..5) + // 3. Session A writes chunk [3] + // 4. Meanwhile, another session resizes array to [20, 1] and writes chunk [10] + // 5. Session A rebases to the new snapshot + // 6. BUG: Session A's splits are NOT updated (still [0..5)) + // 7. Session A commits - during flush, verified_node_chunk_iterator filters + // parent chunks by extent, dropping chunk [10] because it's outside [0..5) + // 8. Result: chunk [10] is lost + // + // With IC1, doing this correctly requires updating Session.splits during rebase to match the new + // parent snapshot's array shapes. + async fn test_rebase_over_resize() -> Result<(), Box> { + struct YoloSolver; + #[async_trait] + impl ConflictSolver for YoloSolver { + async fn solve( + &self, + _previous_change: &TransactionLog, + _previous_repo: &Session, + current_changes: ChangeSet, + _sccurrent_repo: &Session, + ) -> SessionResult { + Ok(ConflictResolution::Patched(current_changes)) + } + } + + let repo = get_repo_for_conflict().await?; + + let mut ds1 = repo.writable_session("main").await?; + let mut ds2 = repo.writable_session("main").await?; + + let path: Path = "/foo/bar/some-array".try_into().unwrap(); + ds1.set_chunk_ref( + path.clone(), + ChunkIndices(vec![1]), + Some(ChunkPayload::Inline("repo 1".into())), + ) + .await?; + ds1.commit("writer 1 updated non-conflict chunk", None).await?; + + let mut ds1 = repo.writable_session("main").await?; + ds1.update_array( + &path, + ArrayShape::new(vec![(20, 1)]).unwrap(), + None, + Bytes::new(), + ) + .await?; + // Write a chunk beyond the original extent [0..5) to trigger the bug + ds1.set_chunk_ref( + path.clone(), + ChunkIndices(vec![10]), + Some(ChunkPayload::Inline("repo 1 chunk 10".into())), + ) + .await?; + ds1.commit("writer 1 updates array size and adds chunk 10", None).await?; + + // now set a chunk ref that is valid with both old and new shape. + ds2.set_chunk_ref( + path.clone(), + ChunkIndices(vec![3]), + Some(ChunkPayload::Inline("repo 2".into())), + ) + .await?; + ds2.commit_rebasing( + &YoloSolver, + 1u16, + "writer 2 writes chunk 0", + None, + async |_| {}, + async |_| {}, + ) + .await?; + + let ds3 = repo.writable_session("main").await?; + // All three chunks should be present: [1] and [10] from ds1, [3] from ds2 + for i in [1u32, 3, 10] { + assert!( + get_chunk( + ds3.get_chunk_reader(&path, &ChunkIndices(vec![i]), &ByteRange::ALL,) + .await? + ) + .await? + .is_some(), + "chunk [{i}] should be present" + ); + } + Ok(()) + } + #[tokio_test] /// Tests `commit_rebasing` retries the proper number of times when there are conflicts async fn test_commit_rebasing_attempts() -> Result<(), Box> { diff --git a/icechunk/src/storage/mod.rs b/icechunk/src/storage/mod.rs index 7f2e40a72..4704cad9e 100644 --- a/icechunk/src/storage/mod.rs +++ b/icechunk/src/storage/mod.rs @@ -693,7 +693,10 @@ pub fn new_s3_storage( && (endpoint.contains("fly.storage.tigris.dev") || endpoint.contains("t3.storage.dev")) { - return Err(StorageError::from(StorageErrorKind::Other("Tigris Storage is not S3 compatible, use the Tigris specific constructor instead".to_string()))); + return Err(StorageError::from(StorageErrorKind::Other( + "Tigris Storage is not S3 compatible, use the Tigris specific constructor instead" + .to_string(), + ))); } let st = S3Storage::new( @@ -855,7 +858,10 @@ pub async fn new_s3_object_store_storage( && (endpoint.contains("fly.storage.tigris.dev") || endpoint.contains("t3.storage.dev")) { - return Err(StorageError::from(StorageErrorKind::Other("Tigris Storage is not S3 compatible, use the Tigris specific constructor instead".to_string()))); + return Err(StorageError::from(StorageErrorKind::Other( + "Tigris Storage is not S3 compatible, use the Tigris specific constructor instead" + .to_string(), + ))); } let storage = ObjectStorage::new_s3(bucket, prefix, credentials, Some(config)).await?; diff --git a/icechunk/src/storage/redirect.rs b/icechunk/src/storage/redirect.rs index 64cedaf37..79f0a7b96 100644 --- a/icechunk/src/storage/redirect.rs +++ b/icechunk/src/storage/redirect.rs @@ -90,7 +90,8 @@ impl RedirectStorage { })?; let res = client.execute(req).await.map_err(|e| { StorageError::from(StorageErrorKind::BadRedirect(format!( - "Request to redirect url ({}) failed, cannot find target Storage instance: {e}", &self.url + "Request to redirect url ({}) failed, cannot find target Storage instance: {e}", + &self.url ))) })?; let storage_url = res.headers().get("location").ok_or_else(|| { diff --git a/icechunk/src/store.rs b/icechunk/src/store.rs index 3ad882641..cca8a8783 100644 --- a/icechunk/src/store.rs +++ b/icechunk/src/store.rs @@ -1486,11 +1486,11 @@ mod tests { store.set("a/b/zarr.json", Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"group","attributes":{"spam":"ham","eggs":42}}"#)).await?; assert_eq!( - store.get("a/b/zarr.json", &ByteRange::ALL).await.unwrap(), - Bytes::copy_from_slice( - br#"{"zarr_format":3,"node_type":"group","attributes":{"spam":"ham","eggs":42}}"# - ) - ); + store.get("a/b/zarr.json", &ByteRange::ALL).await.unwrap(), + Bytes::copy_from_slice( + br#"{"zarr_format":3,"node_type":"group","attributes":{"spam":"ham","eggs":42}}"# + ) + ); let zarr_meta = Bytes::copy_from_slice(br#"{"zarr_format":3,"node_type":"array","attributes":{"foo":42},"shape":[2,2,2],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[1,1,1]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"mycodec","configuration":{"foo":42}}],"storage_transformers":[{"name":"mytransformer","configuration":{"bar":43}}],"dimension_names":["x","y","t"]}"#); store.set("a/b/array/zarr.json", zarr_meta.clone()).await?; @@ -1507,7 +1507,8 @@ mod tests { let repo = create_memory_store_repository().await; let ds = repo.writable_session("main").await?; let store = Store::from_session(Arc::new(RwLock::new(ds))).await; - let group_data = br#"{"zarr_format":3, "node_type":"group", "attributes": {"spam":"ham", "eggs":42}}"#; + let group_data = + br#"{"zarr_format":3, "node_type":"group", "attributes": {"spam":"ham", "eggs":42}}"#; store .set(