Skip to content

Commit 46d13b6

Browse files
committed
Use ManifestExtents::ALL sentinel
1 parent 0b1f73f commit 46d13b6

File tree

3 files changed

+39
-28
lines changed

3 files changed

+39
-28
lines changed

icechunk/src/change_set.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ impl ChangeSet {
297297
&self,
298298
node_id: &NodeId,
299299
node_path: &Path,
300-
extent: Option<ManifestExtents>,
300+
extent: ManifestExtents,
301301
) -> impl Iterator<Item = (&ChunkIndices, &Option<ChunkPayload>)> + use<'_> {
302302
if self.is_deleted(node_path, node_id) {
303303
return Either::Left(iter::empty());
@@ -306,9 +306,7 @@ impl ChangeSet {
306306
None => Either::Left(iter::empty()),
307307
Some(h) => Either::Right(
308308
h.iter()
309-
.filter(move |(manifest_extent, _)| {
310-
extent.as_ref().is_none_or(|e| e == *manifest_extent)
311-
})
309+
.filter(move |(manifest_extent, _)| extent.matches(manifest_extent))
312310
.flat_map(|(_, manifest)| manifest.iter()),
313311
),
314312
}
@@ -318,7 +316,7 @@ impl ChangeSet {
318316
&self,
319317
) -> impl Iterator<Item = (Path, ChunkInfo)> + use<'_> {
320318
self.new_arrays.iter().flat_map(|(path, (node_id, _))| {
321-
self.new_array_chunk_iterator(node_id, path, None)
319+
self.new_array_chunk_iterator(node_id, path, ManifestExtents::ALL)
322320
.map(|ci| (path.clone(), ci))
323321
})
324322
}
@@ -327,7 +325,7 @@ impl ChangeSet {
327325
&'a self,
328326
node_id: &'a NodeId,
329327
node_path: &Path,
330-
extent: Option<ManifestExtents>,
328+
extent: ManifestExtents,
331329
) -> impl Iterator<Item = ChunkInfo> + use<'a> {
332330
self.array_chunks_iterator(node_id, node_path, extent).filter_map(
333331
move |(coords, payload)| {

icechunk/src/format/manifest.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ pub enum Overlap {
3636
pub struct ManifestExtents(Vec<Range<u32>>);
3737

3838
impl ManifestExtents {
39+
// sentinel for a "universal set"
40+
pub const ALL: Self = Self(Vec::new());
41+
3942
pub fn new(from: &[u32], to: &[u32]) -> Self {
4043
let v = from
4144
.iter()
@@ -66,6 +69,10 @@ impl ManifestExtents {
6669
}
6770

6871
pub fn intersection(&self, other: &Self) -> Option<Self> {
72+
if self == &Self::ALL {
73+
return Some(other.clone());
74+
}
75+
6976
debug_assert_eq!(self.len(), other.len());
7077
let ranges = zip(self.iter(), other.iter())
7178
.map(|(a, b)| max(a.start, b.start)..min(a.end, b.end))
@@ -74,6 +81,9 @@ impl ManifestExtents {
7481
}
7582

7683
pub fn union(&self, other: &Self) -> Self {
84+
if self == &Self::ALL {
85+
return Self::ALL;
86+
}
7787
debug_assert_eq!(self.len(), other.len());
7888
Self::from_ranges_iter(
7989
zip(self.iter(), other.iter())
@@ -83,13 +93,17 @@ impl ManifestExtents {
8393

8494
pub fn overlap_with(&self, other: &Self) -> Overlap {
8595
// Important: this is not symmetric.
96+
if *other == Self::ALL {
97+
return Overlap::Complete;
98+
} else if *self == Self::ALL {
99+
return Overlap::Partial;
100+
}
86101
debug_assert!(
87102
self.len() == other.len(),
88103
"Length mismatch: self = {:?}, other = {:?}",
89104
&self,
90105
&other
91106
);
92-
93107
let mut overlap = Overlap::Complete;
94108
for (a, b) in zip(other.iter(), self.iter()) {
95109
debug_assert!(a.start <= a.end, "Invalid range: {:?}", a.clone());
@@ -102,6 +116,12 @@ impl ManifestExtents {
102116
}
103117
overlap
104118
}
119+
120+
pub fn matches(&self, other: &ManifestExtents) -> bool {
121+
// used in `.filter`
122+
// ALL always matches any other extents
123+
if *self == Self::ALL { true } else { self == other }
124+
}
105125
}
106126

107127
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]

icechunk/src/session.rs

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -625,7 +625,7 @@ impl Session {
625625
&self.change_set,
626626
&self.snapshot_id,
627627
path,
628-
None,
628+
ManifestExtents::ALL,
629629
)
630630
.await
631631
}
@@ -867,15 +867,15 @@ impl Session {
867867
&self.change_set,
868868
&self.snapshot_id,
869869
node.clone(),
870-
None,
870+
ManifestExtents::ALL,
871871
)
872872
.await
873873
.map_ok(|(_path, chunk_info)| chunk_info.coord);
874874

875875
let res = try_stream! {
876876
let new_chunks = stream::iter(
877877
self.change_set
878-
.new_array_chunk_iterator(&node.id, array_path, None)
878+
.new_array_chunk_iterator(&node.id, array_path, ManifestExtents::ALL)
879879
.map(|chunk_info| Ok::<ChunkIndices, SessionError>(chunk_info.coord)),
880880
);
881881

@@ -1200,7 +1200,7 @@ async fn updated_chunk_iterator<'a>(
12001200
change_set,
12011201
snapshot_id,
12021202
node,
1203-
None,
1203+
ManifestExtents::ALL,
12041204
)
12051205
.await)
12061206
});
@@ -1212,7 +1212,7 @@ async fn updated_node_chunks_iterator<'a>(
12121212
change_set: &'a ChangeSet,
12131213
snapshot_id: &'a SnapshotId,
12141214
node: NodeSnapshot,
1215-
extent: Option<ManifestExtents>,
1215+
extent: ManifestExtents,
12161216
) -> impl Stream<Item = SessionResult<(Path, ChunkInfo)>> + 'a {
12171217
// This iterator should yield chunks for existing arrays + any updates.
12181218
// we check for deletion here in the case that `path` exists in the snapshot,
@@ -1242,7 +1242,7 @@ async fn node_chunk_iterator<'a>(
12421242
change_set: &'a ChangeSet,
12431243
snapshot_id: &'a SnapshotId,
12441244
path: &Path,
1245-
extent: Option<ManifestExtents>,
1245+
extent: ManifestExtents,
12461246
) -> impl Stream<Item = SessionResult<ChunkInfo>> + 'a + use<'a> {
12471247
match get_node(asset_manager, change_set, snapshot_id, path).await {
12481248
Ok(node) => futures::future::Either::Left(
@@ -1265,7 +1265,7 @@ async fn verified_node_chunk_iterator<'a>(
12651265
snapshot_id: &'a SnapshotId,
12661266
change_set: &'a ChangeSet,
12671267
node: NodeSnapshot,
1268-
extent: Option<ManifestExtents>,
1268+
extent: ManifestExtents,
12691269
) -> impl Stream<Item = SessionResult<ChunkInfo>> + 'a {
12701270
match node.node_data {
12711271
NodeData::Group => futures::future::Either::Left(futures::stream::empty()),
@@ -1298,9 +1298,10 @@ async fn verified_node_chunk_iterator<'a>(
12981298
futures::stream::iter(new_chunks).chain(
12991299
futures::stream::iter(manifests)
13001300
.filter(move |manifest_ref| {
1301-
futures::future::ready(extent.as_ref().is_none_or(|e| {
1302-
e.overlap_with(&manifest_ref.extents) != Overlap::None
1303-
}))
1301+
futures::future::ready(
1302+
extent.overlap_with(&manifest_ref.extents)
1303+
!= Overlap::None,
1304+
)
13041305
})
13051306
.then(move |manifest_ref| {
13061307
let new_chunk_indices = new_chunk_indices.clone();
@@ -1323,11 +1324,7 @@ async fn verified_node_chunk_iterator<'a>(
13231324
!new_chunk_indices.contains(coord)
13241325
// If the manifest we are parsing partially overlaps with `extent`,
13251326
// we need to filter all coordinates
1326-
&& extent_c2.as_ref().is_none_or(
1327-
move |e| {
1328-
e.contains(coord.0.as_slice())
1329-
},
1330-
)
1327+
&& extent_c2.contains(coord.0.as_slice())
13311328
})
13321329
.map_ok(move |(coord, payload)| ChunkInfo {
13331330
node: node_id_c2.clone(),
@@ -1603,7 +1600,7 @@ impl<'a> FlushProcess<'a> {
16031600
self.change_set,
16041601
self.parent_id,
16051602
node.clone(),
1606-
Some(extent.clone()),
1603+
extent.clone(),
16071604
)
16081605
.await
16091606
.map_ok(|(_path, chunk_info)| chunk_info);
@@ -1656,11 +1653,7 @@ impl<'a> FlushProcess<'a> {
16561653
if self.change_set.array_manifest(node_id, extent).is_some() {
16571654
let chunks = stream::iter(
16581655
self.change_set
1659-
.new_array_chunk_iterator(
1660-
node_id,
1661-
node_path,
1662-
Some(extent.clone()),
1663-
)
1656+
.new_array_chunk_iterator(node_id, node_path, extent.clone())
16641657
.map(Ok),
16651658
);
16661659
#[allow(clippy::expect_used)]

0 commit comments

Comments
 (0)