Skip to content

Commit d5398aa

Browse files
committed
[WIP] Optimize split manifest writes.
1 parent 9b78cf6 commit d5398aa

File tree

3 files changed

+381
-184
lines changed

3 files changed

+381
-184
lines changed

icechunk/src/change_set.rs

Lines changed: 188 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ use itertools::{Either, Itertools as _};
99
use serde::{Deserialize, Serialize};
1010

1111
use crate::{
12+
config::ManifestSplittingConfig,
1213
format::{
1314
ChunkIndices, NodeId, Path,
14-
manifest::{ChunkInfo, ChunkPayload},
15+
manifest::{ChunkInfo, ChunkPayload, ManifestExtents, ManifestSplits},
1516
snapshot::{ArrayShape, DimensionName, NodeData, NodeSnapshot},
1617
},
1718
session::SessionResult,
@@ -24,20 +25,81 @@ pub struct ArrayData {
2425
pub user_data: Bytes,
2526
}
2627

28+
impl ManifestSplits {
29+
pub fn which_extent(&self, coord: &ChunkIndices) -> SessionResult<&ManifestExtents> {
30+
Ok(self.0.get(self.which(coord)?).expect("logic bug"))
31+
}
32+
}
33+
34+
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
35+
pub struct SplitManifest {
36+
from: Vec<u32>,
37+
to: Vec<u32>,
38+
// It's important we keep these sorted, we use this fact in TransactionLog creation
39+
chunks: BTreeMap<ChunkIndices, Option<ChunkPayload>>,
40+
}
41+
42+
impl SplitManifest {
43+
pub fn update(&mut self, coord: ChunkIndices, data: Option<ChunkPayload>) {
44+
if self.from.is_empty() {
45+
debug_assert!(self.to.is_empty());
46+
debug_assert!(self.chunks.is_empty());
47+
// important to remember that `to` is not inclusive, so we need +1
48+
let mut coord0 = coord.0.clone();
49+
self.to.extend(coord0.iter().map(|n| *n + 1));
50+
self.from.append(&mut coord0);
51+
} else {
52+
for (existing, coord0) in self.from.iter_mut().zip(coord.0.iter()) {
53+
if coord0 < existing {
54+
*existing = *coord0
55+
}
56+
}
57+
for (existing, coord0) in self.to.iter_mut().zip(coord.0.iter()) {
58+
// important to remember that `to` is not inclusive, so we need +1
59+
let range_value = coord0 + 1;
60+
if range_value > *existing {
61+
*existing = range_value
62+
}
63+
}
64+
}
65+
self.chunks.insert(coord, data);
66+
}
67+
68+
pub fn retain(&mut self, predicate: impl Fn(&ChunkIndices) -> bool) {
69+
self.chunks.retain(|coord, _| {
70+
if !predicate(coord) {
71+
// FIXME: handle from, to updating
72+
todo!();
73+
} else {
74+
false
75+
}
76+
})
77+
}
78+
}
79+
2780
#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
2881
pub struct ChangeSet {
82+
splitting: ManifestSplittingConfig,
83+
// This is an optimization so that we needn't figure out the split sizes on every set.
84+
// FIXME: this is needed to catch the case where the splitting config changes in a session
85+
// after chunks are written
86+
splits: HashMap<NodeId, ManifestSplits>,
2987
new_groups: HashMap<Path, (NodeId, Bytes)>,
3088
new_arrays: HashMap<Path, (NodeId, ArrayData)>,
3189
updated_arrays: HashMap<NodeId, ArrayData>,
32-
updated_groups: HashMap<NodeId, Bytes>,
33-
// It's important we keep these sorted, we use this fact in TransactionLog creation
34-
// TODO: could track ManifestExtents
35-
set_chunks: BTreeMap<NodeId, BTreeMap<ChunkIndices, Option<ChunkPayload>>>,
90+
updated3_groups: HashMap<NodeId, Bytes>,
91+
// FIXME: It's important we keep these sorted, we use this fact in TransactionLog creation
92+
// Change HashMap -> BTreeMap, need to check Ord on ManifestExtents
93+
set_chunks: BTreeMap<NodeId, HashMap<ManifestExtents, SplitManifest>>,
3694
deleted_groups: HashSet<(Path, NodeId)>,
3795
deleted_arrays: HashSet<(Path, NodeId)>,
3896
}
3997

4098
impl ChangeSet {
99+
pub fn new(splitting: ManifestSplittingConfig) -> Self {
100+
Self { splitting, ..Default::default() }
101+
}
102+
41103
pub fn deleted_arrays(&self) -> impl Iterator<Item = &(Path, NodeId)> {
42104
self.deleted_arrays.iter()
43105
}
@@ -58,10 +120,13 @@ impl ChangeSet {
58120
self.deleted_arrays.contains(path_and_id)
59121
}
60122

123+
pub fn splits(&self, id: &NodeId) -> Option<&ManifestSplits> {
124+
self.splits.get(id)
125+
}
126+
61127
pub fn chunk_changes(
62128
&self,
63-
) -> impl Iterator<Item = (&NodeId, &BTreeMap<ChunkIndices, Option<ChunkPayload>>)>
64-
{
129+
) -> impl Iterator<Item = (&NodeId, &HashMap<ManifestExtents, SplitManifest>)> {
65130
self.set_chunks.iter()
66131
}
67132

@@ -100,11 +165,51 @@ impl ChangeSet {
100165
}
101166
}
102167

103-
pub fn add_array(&mut self, path: Path, node_id: NodeId, array_data: ArrayData) {
168+
fn update_cached_splits(
169+
&mut self,
170+
node_id: &NodeId,
171+
path: &Path,
172+
shape: &ArrayShape,
173+
dimension_names: &Option<Vec<DimensionName>>,
174+
) -> SessionResult<()> {
175+
// FIXME: What happens if we set a chunk, then change a dimension name, so
176+
// that the split changes.
177+
let splits = self.splitting.get_split_sizes(path, shape, dimension_names)?;
178+
// FIXME: Check that splits matches any existing split used? This is for the case when
179+
// we write a chunk, populate `self.splits`, then say a dimension name was changed,
180+
// which changes the splits, so things are now inconsistent.
181+
self.splits.insert(node_id.clone(), splits);
182+
Ok(())
183+
}
184+
185+
pub fn add_array(
186+
&mut self,
187+
path: Path,
188+
node_id: NodeId,
189+
array_data: ArrayData,
190+
) -> SessionResult<()> {
191+
self.update_cached_splits(
192+
&node_id,
193+
&path,
194+
&array_data.shape,
195+
&array_data.dimension_names,
196+
);
104197
self.new_arrays.insert(path, (node_id, array_data));
198+
Ok(())
105199
}
106200

107-
pub fn update_array(&mut self, node_id: &NodeId, path: &Path, array_data: ArrayData) {
201+
pub fn update_array(
202+
&mut self,
203+
node_id: &NodeId,
204+
path: &Path,
205+
array_data: ArrayData,
206+
) -> SessionResult<()> {
207+
self.update_cached_splits(
208+
&node_id,
209+
&path,
210+
&array_data.shape,
211+
&array_data.dimension_names,
212+
);
108213
match self.new_arrays.get(path) {
109214
Some((id, _)) => {
110215
debug_assert!(!self.updated_arrays.contains_key(id));
@@ -114,6 +219,7 @@ impl ChangeSet {
114219
self.updated_arrays.insert(node_id.clone(), array_data);
115220
}
116221
}
222+
Ok(())
117223
}
118224

119225
pub fn update_group(&mut self, node_id: &NodeId, path: &Path, definition: Bytes) {
@@ -139,6 +245,7 @@ impl ChangeSet {
139245

140246
self.updated_arrays.remove(node_id);
141247
self.set_chunks.remove(node_id);
248+
self.splits.remove(node_id);
142249
if !is_new_array {
143250
self.deleted_arrays.insert((path, node_id.clone()));
144251
}
@@ -166,34 +273,60 @@ impl ChangeSet {
166273
node_id: NodeId,
167274
coord: ChunkIndices,
168275
data: Option<ChunkPayload>,
169-
) {
276+
) -> SessionResult<()> {
277+
let cached_splits = self.splits.get(&node_id).expect("logic bug");
278+
279+
let extent = cached_splits.which_extent(&coord)?;
170280
// this implementation makes delete idempotent
171281
// it allows deleting a deleted chunk by repeatedly setting None.
172282
self.set_chunks
173283
.entry(node_id)
174284
.and_modify(|h| {
175-
h.insert(coord.clone(), data.clone());
285+
h.entry(extent.clone()).or_default().update(coord.clone(), data.clone());
176286
})
177-
.or_insert(BTreeMap::from([(coord, data)]));
287+
.or_insert_with(|| {
288+
let mut h = HashMap::<ManifestExtents, SplitManifest>::with_capacity(
289+
cached_splits.len(),
290+
);
291+
h.entry(extent.clone())
292+
// TODO: this is duplicative. I can't use `or_default` because it's
293+
// nice to create the HashMap using `with_capacity`
294+
.or_default()
295+
.update(coord, data);
296+
h
297+
});
298+
Ok(())
178299
}
179300

180301
pub fn get_chunk_ref(
181302
&self,
182303
node_id: &NodeId,
183304
coords: &ChunkIndices,
184305
) -> Option<&Option<ChunkPayload>> {
185-
self.set_chunks.get(node_id).and_then(|h| h.get(coords))
306+
self.splits
307+
.get(node_id)
308+
.and_then(|splits| {
309+
splits.which_extent(coords).ok().map(|extent| {
310+
self.set_chunks
311+
.get(node_id)
312+
.and_then(|h| h.get(&extent))
313+
.and_then(|s| s.chunks.get(coords))
314+
})
315+
})
316+
.flatten()
186317
}
187318

188319
/// Drop the updated chunk references for the node.
189320
/// This will only drop the references for which `predicate` returns true
190321
pub fn drop_chunk_changes(
191322
&mut self,
192323
node_id: &NodeId,
193-
predicate: impl Fn(&ChunkIndices) -> bool,
324+
predicate: impl Fn(&ChunkIndices) -> bool + Clone,
194325
) {
195326
if let Some(changes) = self.set_chunks.get_mut(node_id) {
196-
changes.retain(|coord, _| !predicate(coord));
327+
for split in changes.values_mut() {
328+
split.retain(predicate.clone());
329+
}
197330
}
198331
}
199332

@@ -207,7 +340,7 @@ impl ChangeSet {
207340
}
208341
match self.set_chunks.get(node_id) {
209342
None => Either::Left(iter::empty()),
210-
Some(h) => Either::Right(h.iter()),
343+
Some(h) => Either::Right(h.values().flat_map(|x| x.chunks.iter())),
211344
}
212345
}
213346

@@ -235,6 +368,44 @@ impl ChangeSet {
235368
)
236369
}
237370

371+
pub fn array_manifests_iterator(
372+
&self,
373+
node_id: &NodeId,
374+
node_path: &Path,
375+
) -> impl Iterator<Item = (&ManifestExtents, &SplitManifest)> + use<'_> {
376+
if self.is_deleted(node_path, node_id) {
377+
return Either::Left(iter::empty());
378+
}
379+
match self.set_chunks.get(node_id) {
380+
None => Either::Left(iter::empty()),
381+
Some(h) => Either::Right(h.iter()),
382+
}
383+
}
384+
385+
pub fn array_manifest(
386+
&self,
387+
node_id: &NodeId,
388+
extent: &ManifestExtents,
389+
) -> Option<&SplitManifest> {
390+
self.set_chunks.get(node_id).and_then(|x| x.get(extent))
391+
}
392+
393+
// pub fn new_array_manifest_iterator<'a>(
394+
// &'a self,
395+
// node_id: &'a NodeId,
396+
// node_path: &Path,
397+
// ) -> impl Iterator<Item = ChunkInfo> + use<'a> {
398+
// self.array_manifests_iterator(node_id, node_path).filter_map(
399+
// move |(coords, payload)| {
400+
// payload.as_ref().map(|p| ChunkInfo {
401+
// node: node_id.clone(),
402+
// coord: coords.clone(),
403+
// payload: p.clone(),
404+
// })
405+
// },
406+
// )
407+
// }
408+
238409
pub fn new_nodes(&self) -> impl Iterator<Item = (&Path, &NodeId)> {
239410
self.new_groups().chain(self.new_arrays())
240411
}
@@ -274,6 +445,7 @@ impl ChangeSet {
274445
self.updated_arrays.extend(other.updated_arrays);
275446
self.deleted_groups.extend(other.deleted_groups);
276447
self.deleted_arrays.extend(other.deleted_arrays);
448+
// FIXME: handle splits
277449

278450
for (node, other_chunks) in other.set_chunks.into_iter() {
279451
match self.set_chunks.remove(&node) {

icechunk/src/format/manifest.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use super::{
1818
ChunkId, ChunkIndices, ChunkLength, ChunkOffset, IcechunkResult, ManifestId, NodeId,
1919
};
2020

21-
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21+
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
2222
pub struct ManifestExtents(Vec<Range<u32>>);
2323

2424
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -54,8 +54,8 @@ impl ManifestExtents {
5454
}
5555
}
5656

57-
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
58-
pub struct ManifestSplits(Vec<ManifestExtents>);
57+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58+
pub struct ManifestSplits(pub Vec<ManifestExtents>);
5959

6060
impl ManifestSplits {
6161
/// Used at read-time

0 commit comments

Comments
 (0)