Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 141 additions & 2 deletions docs/docs/icechunk-python/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ For very large arrays (millions of chunks), these files can get quite large.
By default, Icechunk stores all chunk references in a single manifest file per array.
Requesting even a single chunk requires downloading the entire manifest.
In some cases, this can result in a slow time-to-first-byte or large memory usage.
Similarly, appending a small amount of data to a large array requires
downloading and rewriting the entire manifest.

!!! note

Note that the chunk sizes in the following examples are tiny for demonstration purposes.

To avoid that, Icechunk lets you split the manifest files by specifying a ``ManifestSplittingConfig``.

### Configuring splitting

To solve this issue, Icechunk lets you __split_ the manifest files by specifying a ``ManifestSplittingConfig``.
```python exec="on" session="perf" source="material-block"
import icechunk as ic
from icechunk import ManifestSplitCondition, ManifestSplittingConfig, ManifestSplitDimCondition
Expand All @@ -38,14 +42,20 @@ split_config = ManifestSplittingConfig.from_dict(
}
}
)
repo_config = ic.RepositoryConfig(manifest=ic.ManifestConfig(splitting=split_config))
repo_config = ic.RepositoryConfig(
manifest=ic.ManifestConfig(splitting=split_config),
)
```

Then pass the config to `Repository.open` or `Repository.create`
```python
repo = ic.Repository.open(..., config=repo_config)
```

!!! important

Once you find a splitting configuration you like, remember to persist it on-disk using `repo.save_config`.

This particular example splits manifests so that each manifest contains `365 * 24` chunks along the time dimension, and every chunk along every other dimension in a single file.

Options for specifying the arrays whose manifest you want to split are:
Expand Down Expand Up @@ -92,3 +102,132 @@ will result in splitting manifests so that each manifest contains (3 longitude c
!!! note

Python dictionaries preserve insertion order, so the first condition encountered takes priority.



### Splitting behaviour

By default, Icechunk minimizes the number of chunk refs that are written in a single commit.

Consider this simple example: a 1D array with split size 1 along axis 0.
```python exec="on" session="perf" source="material-block"
import random

import icechunk as ic
from icechunk import (
ManifestSplitCondition,
ManifestSplitDimCondition,
ManifestSplittingConfig,
)

split_config = ManifestSplittingConfig.from_dict(
{ManifestSplitCondition.AnyArray(): {ManifestSplitDimCondition.Any(): 1}}
)
repo_config = ic.RepositoryConfig(manifest=ic.ManifestConfig(splitting=split_config))

storage = ic.local_filesystem_storage(
f"/tmp/splitting-test/{random.randint(100, 20000)}"
)
# Note any config passed to Repository.create is persisted to disk.
repo = ic.Repository.create(storage, config=repo_config)
```

Create an array
```python exec="on" session="perf" source="material-block"
import zarr

session = repo.writable_session("main")
root = zarr.group(session.store)
name = "array"
array = root.create_array(name=name, shape=(10,), dtype=int, chunks=(1,))
```

Now lets write 5 chunk references
```python exec="on" session="perf" source="material-block"
import numpy as np

array[:5] = np.arange(10, 15)
print(session.status())
```

And commit
```python exec="on" session="perf" source="material-block"
snap = session.commit("Add 5 chunks")
```

Use [`repo.lookup_snapshot`](./reference.md#icechunk.Repository.lookup_snapshot) to examine the manifests associated with a Snapshot
```python exec="on" session="perf" source="material-block"
print(repo.lookup_snapshot(snap).manifests)
```

Let's open the Repository again with a different splitting config --- where 5 chunk references are in a single manifest.
```python exec="on" session="perf" source="material-block"
split_config = ManifestSplittingConfig.from_dict(
{ManifestSplitCondition.AnyArray(): {ManifestSplitDimCondition.Any(): 5}}
)
repo_config = ic.RepositoryConfig(manifest=ic.ManifestConfig(splitting=split_config))
new_repo = ic.Repository.open(storage, config=repo_config)
print(new_repo.config.manifest)
```

Now let's append data.
```python exec="on" session="perf" source="material-block"
session = new_repo.writable_session("main")
array = zarr.open_array(session.store, path=name, mode="a")
array[6:9] = [1, 2, 3]
print(session.status())
```

```python exec="on" session="perf" source="material-block"
snap2 = session.commit("appended data")
repo.lookup_snapshot(snap2).manifests
```

Look carefully, only one new manifest with the 3 new chunk refs has been written.

Why?

Icechunk minimizes how many chunk references are rewritten at each commit (to save time and memory). The previous splitting configuration (split size of 1) results in manifests that are _compatible_ with the current configuration (split size of 5) because the bounding box of every existing manifest `slice(0, 1)`, `slice(1, 2)`, etc. is fully contained in the bounding boxes implied by the new configuration `[slice(0, 5), slice(5, 10)]`.

Now for a more complex example: let's rewrite the references in `slice(3,7)` i.e. spanning the break in manifests

```python exec="on" session="perf" source="material-block"
session = new_repo.writable_session("main")
array = zarr.open_array(session.store, path=name, mode="a")
array[3:7] = [1, 2, 3, 4]
print(session.status())
```

```python exec="on" session="perf" source="material-block"
snap3 = session.commit("rewrite [3,7)")
print(repo.lookup_snapshot(snap3).manifests)
```
This ends up rewriting all refs to two new manifests.

### Rewriting manifests

To force Icechunk to rewrite all chunk refs to the current splitting configuration use [`rewrite_manifests`](./reference.md#icechunk.Repository.rewrite_manifests) --- for the current example this will consolidate to two manifests.

To illustrate, we will use a split size of 3.
```python exec="on" session="perf" source="material-block"
split_config = ManifestSplittingConfig.from_dict(
{ManifestSplitCondition.AnyArray(): {ManifestSplitDimCondition.Any(): 3}}
)
repo_config = ic.RepositoryConfig(
manifest=ic.ManifestConfig(splitting=split_config),
)
new_repo = ic.Repository.open(storage, config=repo_config)

snap4 = new_repo.rewrite_manifests(
f"rewrite_manifests with new config {str(split_config.to_dict())!r}", branch="main"
)
```

`rewrite_snapshots` will create a new commit on `branch` with the provided `message`.
```python exec="on" session="perf" source="material-block"
print(repo.lookup_snapshot(snap4).manifests)
```

!!! important

Once you find a splitting configuration you like, remember to persist it on-disk using `repo.save_config`.
8 changes: 8 additions & 0 deletions icechunk-python/python/icechunk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ def from_dict(split_sizes: SplitSizesDict) -> ManifestSplittingConfig:
return ManifestSplittingConfig(unwrapped)


def to_dict(config: ManifestSplittingConfig) -> SplitSizesDict:
return {
split_condition: dict(dim_conditions)
for split_condition, dim_conditions in config.split_sizes
}


ManifestSplittingConfig.from_dict = from_dict # type: ignore[attr-defined]
ManifestSplittingConfig.to_dict = to_dict # type: ignore[attr-defined]

initialize_logs()
3 changes: 3 additions & 0 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,9 @@ class PyRepository:
def garbage_collect(
self, delete_object_older_than: datetime.datetime
) -> GCSummary: ...
def rewrite_manifests(
self, message: str, *, branch: str, metadata: dict[str, Any] | None = None
) -> str: ...
def total_chunks_storage(self) -> int: ...

class PySession:
Expand Down
25 changes: 25 additions & 0 deletions icechunk-python/python/icechunk/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,31 @@ def garbage_collect(self, delete_object_older_than: datetime.datetime) -> GCSumm

return self._repository.garbage_collect(delete_object_older_than)

def rewrite_manifests(
self, message: str, *, branch: str, metadata: dict[str, Any] | None = None
) -> str:
"""
Rewrite manifests for all arrays and commit to the specified branch.

Parameters
----------
message : str
The message to write with the commit.
branch: str
The branch to commit to.
metadata : dict[str, Any] | None, optional
Additional metadata to store with the commit snapshot.

Returns
-------
str
The snapshot ID of the new commit.

"""
return self._repository.rewrite_manifests(
message, branch=branch, metadata=metadata
)

def total_chunks_storage(self) -> int:
"""Calculate the total storage used for chunks, in bytes .

Expand Down
43 changes: 43 additions & 0 deletions icechunk-python/python/icechunk/testing/strategies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from collections.abc import Iterable
from typing import cast

import hypothesis.strategies as st

import icechunk as ic
import zarr
from zarr.core.metadata import ArrayV3Metadata


@st.composite
def splitting_configs(
draw: st.DrawFn, *, arrays: Iterable[zarr.Array]
) -> ic.ManifestSplittingConfig:
config_dict = {}
for array in arrays:
if draw(st.booleans()):
array_condition = ic.ManifestSplitCondition.name_matches(
array.path.split("/")[-1]
)
else:
array_condition = ic.ManifestSplitCondition.path_matches(array.path)
dimnames = (
cast(ArrayV3Metadata, array.metadata).dimension_names or (None,) * array.ndim
)
dimsize_axis_names = draw(
st.lists(
st.sampled_from(
tuple(zip(array.shape, range(array.ndim), dimnames, strict=False))
),
min_size=1,
unique=True,
)
)
for size, axis, dimname in dimsize_axis_names:
if dimname is None or draw(st.booleans()):
key = ic.ManifestSplitDimCondition.Axis(axis)
else:
key = ic.ManifestSplitDimCondition.DimensionName(dimname) # type: ignore[assignment]
config_dict[array_condition] = {
key: draw(st.integers(min_value=1, max_value=size + 10))
}
return ic.ManifestSplittingConfig.from_dict(config_dict) # type: ignore[attr-defined, no-any-return]
2 changes: 1 addition & 1 deletion icechunk-python/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1204,7 +1204,7 @@ impl PyManifestSplitDimCondition {
match self {
Axis(axis) => format!("Axis({})", axis),
DimensionName(name) => format!(r#"DimensionName("{}")"#, name),
Any() => "Rest".to_string(),
Any() => "Any".to_string(),
}
}

Expand Down
4 changes: 3 additions & 1 deletion icechunk-python/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use icechunk::{
StorageError,
format::IcechunkFormatError,
ops::gc::GCError,
ops::{gc::GCError, manifests::ManifestOpsError},
repository::RepositoryError,
session::{SessionError, SessionErrorKind},
store::{StoreError, StoreErrorKind},
Expand Down Expand Up @@ -38,6 +38,8 @@ pub(crate) enum PyIcechunkStoreError {
IcechunkFormatError(#[from] IcechunkFormatError),
#[error(transparent)]
GCError(#[from] GCError),
#[error(transparent)]
ManifestOpsError(#[from] ManifestOpsError),
#[error("{0}")]
PyKeyError(String),
#[error("{0}")]
Expand Down
24 changes: 24 additions & 0 deletions icechunk-python/src/repository.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
ops::Deref,
sync::Arc,
};

Expand All @@ -18,6 +19,7 @@ use icechunk::{
},
ops::{
gc::{ExpiredRefAction, GCConfig, GCSummary, expire, garbage_collect},
manifests::rewrite_manifests,
stats::repo_chunks_storage,
},
repository::{RepositoryErrorKind, VersionInfo},
Expand Down Expand Up @@ -925,6 +927,28 @@ impl PyRepository {
})
}

#[pyo3(signature = (message, branch, metadata=None))]
pub fn rewrite_manifests(
&self,
py: Python<'_>,
message: &str,
branch: &str,
metadata: Option<PySnapshotProperties>,
) -> PyResult<String> {
// This function calls block_on, so we need to allow other thread python to make progress
py.allow_threads(move || {
let metadata = metadata.map(|m| m.into());
let result =
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
let lock = self.0.read().await;
rewrite_manifests(lock.deref(), branch, message, metadata)
.await
.map_err(PyIcechunkStoreError::ManifestOpsError)
})?;
Ok(result.to_string())
})
}

#[pyo3(signature = (older_than, *, delete_expired_branches = false, delete_expired_tags = false))]
pub fn expire_snapshots(
&self,
Expand Down
15 changes: 15 additions & 0 deletions icechunk-python/tests/test_manifest_splitting.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,33 @@

import numpy as np
import pytest
from hypothesis import given
from hypothesis import strategies as st

import icechunk as ic
import xarray as xr
import zarr
from icechunk import ManifestSplitCondition, ManifestSplitDimCondition
from icechunk.testing.strategies import splitting_configs
from icechunk.xarray import to_icechunk
from zarr.testing.strategies import arrays as zarr_arrays

SHAPE = (3, 4, 17)
CHUNKS = (1, 1, 1)
DIMS = ("time", "latitude", "longitude")


@given(data=st.data())
def test_splitting_config_dict_roundtrip(data):
arrays = data.draw(
st.lists(
zarr_arrays(compressors=st.none(), attrs=st.none(), zarr_formats=st.just(3))
)
)
config = data.draw(splitting_configs(arrays=arrays))
assert ic.ManifestSplittingConfig.from_dict(config.to_dict()) == config


def test_manifest_splitting_appends():
array_condition = ManifestSplitCondition.or_conditions(
[
Expand Down
Loading