Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
4b30b9f
[WIP] Optimize split manifest writes.
dcherian May 13, 2025
3a19559
Refactor to track splitting on Session
dcherian Jun 6, 2025
09cd445
Revert to aggregate_extents
dcherian Jun 6, 2025
8924a39
Tests pass!
dcherian Jun 6, 2025
ca2d171
sqw iterator
dcherian Jun 6, 2025
5e843be
cleanup
dcherian Jun 6, 2025
03d62ac
lint
dcherian Jun 6, 2025
ef37e58
Fix bug
dcherian Jun 7, 2025
9fdd0d3
Minor changes
dcherian Jun 9, 2025
c93ea84
Fix distributed writes.
dcherian Jun 9, 2025
46638de
Handle appends
dcherian Jun 10, 2025
ac1d24b
Refactor overlaps to overlap_with
dcherian Jun 10, 2025
dae0d60
Refactor overlaps to overlap_with
dcherian Jun 10, 2025
f841e3c
Cleanup
dcherian Jun 10, 2025
839fdc3
cleanup more
dcherian Jun 10, 2025
35934a9
more tests
dcherian Jun 10, 2025
4b3cde3
more cleanup
dcherian Jun 11, 2025
b9d3ec8
note
dcherian Jun 11, 2025
75d264a
Track _all_ deleted chunks separately
dcherian Jun 11, 2025
f88c0f5
Revert "Track _all_ deleted chunks separately"
dcherian Jun 11, 2025
d6d1a90
Track deleted chunks _outside_ array shape only
dcherian Jun 11, 2025
9b9712e
Update stateful tests
dcherian Jun 11, 2025
eaee66a
Fix bug & update tests.
dcherian Jun 11, 2025
5f4e93a
Stricter GC test
dcherian Jun 12, 2025
a15a8ac
Fix stateful test
dcherian Jun 12, 2025
0fc8c2a
Cleanup
dcherian Jun 12, 2025
2a6d65d
More complex test + fixes
dcherian Jun 13, 2025
27ad4bc
more test
dcherian Jun 13, 2025
14d70df
Fix merging
dcherian Jun 13, 2025
27c7c4b
reorg tests
dcherian Jun 13, 2025
9e1c8ad
edits
dcherian Jun 13, 2025
e974f48
Add conflicting commits test
dcherian Jun 16, 2025
76e58fd
Add test for splits changing in a session
dcherian Jun 16, 2025
a306abf
lint
dcherian Jun 16, 2025
e719c7f
Review comments
dcherian Jun 17, 2025
b7fb385
Use ManifestExtents::ALL sentinel
dcherian Jun 17, 2025
4a2e455
Update stateful tests
dcherian Jun 18, 2025
b179dbc
update benchmarks dep group
dcherian Jun 20, 2025
e184e29
small edits
dcherian Jun 20, 2025
3b52259
Parallel writes
dcherian Jun 21, 2025
4e68231
more benchmarks
dcherian Jun 21, 2025
d0c42be
Revert "Parallel writes"
dcherian Jun 23, 2025
fcdc2d6
lint
dcherian Jun 23, 2025
ed648a8
Merge branch 'main' into manifest-split-write
dcherian Jun 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion icechunk-python/benchmarks/test_benchmark_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ def write():


@pytest.mark.benchmark(group="refs-write")
def test_write_split_manifest_refs(benchmark, splitting, large_write_dataset) -> None:
def test_write_split_manifest_refs_full_rewrite(
benchmark, splitting, large_write_dataset
) -> None:
dataset = large_write_dataset
config = repo_config_with(splitting=splitting)
assert config is not None
Expand Down Expand Up @@ -219,3 +221,53 @@ def commit(session_from_setup):
session_from_setup.commit("wrote refs")

benchmark.pedantic(commit, setup=write_refs, iterations=1, rounds=10)


@pytest.mark.benchmark(group="refs-write")
def test_write_split_manifest_refs_append(
benchmark, splitting, large_write_dataset
) -> None:
dataset = large_write_dataset
config = repo_config_with(splitting=splitting)
assert config is not None
if hasattr(config.manifest, "splitting"):
assert config.manifest.splitting == splitting
repo = dataset.create(config=config)
session = repo.writable_session(branch="main")
store = session.store
group = zarr.open_group(store, zarr_format=3)
group.create_array(
"array",
shape=dataset.shape,
chunks=dataset.chunks,
dtype="int8",
fill_value=0,
compressors=None,
)
session.commit("initialize")

# yuck, but I'm abusing `rounds` to do a loop and time _only_ the commit.
global counter
counter = 0
rounds = 10
num_chunks = dataset.shape[0] // dataset.chunks[0]
batch_size = num_chunks // rounds

def write_refs() -> Session:
global counter
session = repo.writable_session(branch="main")
chunks = [
VirtualChunkSpec(
index=[i], location=f"s3://foo/bar/{i}.nc", offset=0, length=1
)
for i in range(counter * batch_size, counter * batch_size + batch_size)
]
counter += 1
session.store.set_virtual_refs("array", chunks)
# (args, kwargs)
return ((session,), {})

def commit(session_from_setup):
session_from_setup.commit("wrote refs")

benchmark.pedantic(commit, setup=write_refs, iterations=1, rounds=rounds)
1 change: 1 addition & 0 deletions icechunk-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ benchmark = [
"humanize",
"platformdirs",
"ipdb",
"coiled",
]
docs = [
"scipy",
Expand Down
6 changes: 3 additions & 3 deletions icechunk-python/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,14 @@ impl PySession {
pub fn merge(&self, other: &PySession, py: Python<'_>) -> PyResult<()> {
// This is blocking function, we need to release the Gil
py.allow_threads(move || {
// TODO: Bad clone
let changes = other.0.blocking_read().deref().changes().clone();
// TODO: bad clone
let other = other.0.blocking_read().deref().clone();

pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
self.0
.write()
.await
.merge(changes)
.merge(other)
.await
.map_err(PyIcechunkStoreError::SessionError)?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion icechunk-python/tests/test_manifest_splitting.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def test_manifest_splitting_appends():
nchunks += math.prod(NEWSHAPE) * 2
# the lon size goes from 17 -> 19 so one extra manifest,
# compared to previous writes
nmanifests += 7 * 2
nmanifests += 2 * 2

assert len(os.listdir(f"{tmpdir}/chunks")) == nchunks
assert len(os.listdir(f"{tmpdir}/manifests")) == nmanifests
Expand Down
2 changes: 1 addition & 1 deletion icechunk-python/tests/test_stateful_repo_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ def check_list_prefix_from_root(self) -> None:
# need to load to dict to compare since ordering of entries might differ
expected = json.loads(self.model[k].to_bytes())
value = self.sync_store.get(k, default_buffer_prototype())
assert value is not None
assert value is not None, k
actual = json.loads(value.to_bytes())
actual_fv = actual.pop("fill_value")
expected_fv = expected.pop("fill_value")
Expand Down
206 changes: 182 additions & 24 deletions icechunk-python/tests/test_zarr/test_stateful.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
from collections.abc import Iterable
from typing import Any

import hypothesis.extra.numpy as npst
import hypothesis.strategies as st
import numpy as np
import pytest
Expand All @@ -13,18 +15,74 @@
run_state_machine_as_test,
)

import icechunk as ic
import zarr
from icechunk import Repository, in_memory_storage
from icechunk import Repository, Storage, in_memory_storage
from zarr.core.buffer import default_buffer_prototype
from zarr.testing.stateful import ZarrHierarchyStateMachine
from zarr.testing.strategies import (
basic_indices,
node_names,
np_array_and_chunks,
numpy_arrays,
orthogonal_indices,
)

PROTOTYPE = default_buffer_prototype()

# pytestmark = [
# pytest.mark.filterwarnings(
# "ignore::zarr.core.dtype.common.UnstableSpecificationWarning"
# ),
# ]


import functools


def with_frequency(frequency):
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was Claude. looks good!

"""
Decorator to control how frequently a rule runs in Hypothesis stateful tests.

Args:
frequency: Float between 0 and 1, where 1.0 means always run,
0.1 means run ~10% of the time, etc.

Usage:
@rule()
@with_frequency(0.1) # Run ~10% of the time
def rare_operation(self):
pass
"""

def decorator(func):
# Create a counter attribute name specific to this function
counter_attr = f"__{func.__name__}_counter"

@functools.wraps(func)
def wrapper(self, *args, **kwargs):
return func(self, *args, **kwargs)

# Add precondition that checks frequency
@precondition
def frequency_check(self):
# Initialize counter if it doesn't exist
if not hasattr(self, counter_attr):
setattr(self, counter_attr, 0)

# Increment counter
current_count = getattr(self, counter_attr) + 1
setattr(self, counter_attr, current_count)

# Check if we should run based on frequency
# This gives roughly the right frequency over many calls
return (current_count * frequency) % 1.0 >= (1.0 - frequency)

# Apply the precondition to the wrapped function
return frequency_check(wrapper)

return decorator


@st.composite
def chunk_paths(
Expand All @@ -39,14 +97,66 @@ def chunk_paths(
return "/".join(map(str, blockidx[subset_slicer]))


@st.composite
def splitting_configs(
draw: st.DrawFn, *, arrays: Iterable[zarr.Array]
) -> ic.ManifestSplittingConfig:
config_dict = {}
for array in arrays:
if draw(st.booleans()):
array_condition = ic.ManifestSplitCondition.name_matches(
array.path.split("/")[-1]
)
else:
array_condition = ic.ManifestSplitCondition.path_matches(array.path)
dimnames = array.metadata.dimension_names or (None,) * array.ndim
dimsize_axis_names = draw(
st.lists(
st.sampled_from(
tuple(zip(array.shape, range(array.ndim), dimnames, strict=False))
),
min_size=1,
unique=True,
)
)
for size, axis, dimname in dimsize_axis_names:
if dimname is None or draw(st.booleans()):
key = ic.ManifestSplitDimCondition.Axis(axis)
else:
key = ic.ManifestSplitDimCondition.DimensionName(dimname)
config_dict[array_condition] = {
key: draw(st.integers(min_value=1, max_value=size + 10))
}
return ic.ManifestSplittingConfig.from_dict(config_dict)


# TODO: more before/after commit invariants?
# TODO: add "/" to self.all_groups, deleting "/" seems to be problematic
class ModifiedZarrHierarchyStateMachine(ZarrHierarchyStateMachine):
def __init__(self, repo: Repository) -> None:
self.repo = repo
store = repo.writable_session("main").store
def __init__(self, storage: Storage) -> None:
self.storage = storage
self.repo = Repository.create(self.storage)
store = self.repo.writable_session("main").store
super().__init__(store)

@precondition(
lambda self: not self.store.session.has_uncommitted_changes
and bool(self.all_arrays)
)
@rule(data=st.data())
def reopen_with_config(self, data):
array_paths = data.draw(
st.lists(st.sampled_from(sorted(self.all_arrays)), max_size=3, unique=True)
)
arrays = tuple(zarr.open_array(self.model, path=path) for path in array_paths)
sconfig = data.draw(splitting_configs(arrays=arrays))
config = ic.RepositoryConfig(
inline_chunk_threshold_bytes=0, manifest=ic.ManifestConfig(splitting=sconfig)
)
note(f"reopening with splitting config {sconfig=!r}")
self.repo = Repository.open(self.storage, config=config)
self.store = self.repo.writable_session("main").store

@precondition(lambda self: self.store.session.has_uncommitted_changes)
@rule(data=st.data())
def commit_with_check(self, data) -> None:
Expand Down Expand Up @@ -108,8 +218,49 @@ def add_array(
assume(array.size > 0)
super().add_array(data, name, array_and_chunks)

@precondition(lambda self: bool(self.all_groups))
@rule(data=st.data())
def check_list_dir(self, data: st.DataObject) -> None:
path = self.draw_directory(data)
note(f"list_dir for {path=!r}")
model_ls = sorted(self._sync_iter(self.model.list_dir(path)))
store_ls = sorted(self._sync_iter(self.store.list_dir(path)))
if model_ls != store_ls and set(model_ls).symmetric_difference(set(store_ls)) != {
"c"
}:
# Consider .list_dir("path/to/array") for an array with a single chunk.
# The MemoryStore model will return `"c", "zarr.json"` only if the chunk exists
# If that chunk was deleted, then `"c"` is not returned.
# LocalStore will not have this behaviour :/
# In Icechunk, we always return the `c` so ignore this inconsistency.
assert model_ls == store_ls, (model_ls, store_ls)

##### TODO: port everything below to zarr
@precondition(lambda self: bool(self.all_arrays))
@rule(data=st.data())
def check_array(self, data: st.DataObject) -> None:
path = data.draw(st.sampled_from(sorted(self.all_arrays)))
actual = zarr.open_array(self.store, path=path)[:]
expected = zarr.open_array(self.model, path=path)[:]
np.testing.assert_equal(actual, expected)

@precondition(lambda self: bool(self.all_arrays))
@rule(data=st.data())
def overwrite_array_orthogonal_indexing(self, data: st.DataObject) -> None:
array = data.draw(st.sampled_from(sorted(self.all_arrays)))
model_array = zarr.open_array(path=array, store=self.model)
store_array = zarr.open_array(path=array, store=self.store)
indexer, _ = data.draw(orthogonal_indices(shape=model_array.shape))
note(f"overwriting array orthogonal {indexer=}")
new_data = data.draw(
npst.arrays(shape=model_array.oindex[indexer].shape, dtype=model_array.dtype)
)
model_array.oindex[indexer] = new_data
store_array.oindex[indexer] = new_data

##### TODO: delete after next Zarr release (Jun 18, 2025)
@rule()
@with_frequency(0.25)
def clear(self) -> None:
note("clearing")
import zarr
Expand Down Expand Up @@ -152,23 +303,6 @@ def draw_directory(self, data) -> str:
path = array_or_group
return path

@precondition(lambda self: bool(self.all_groups))
@rule(data=st.data())
def check_list_dir(self, data) -> None:
path = self.draw_directory(data)
note(f"list_dir for {path=!r}")
model_ls = sorted(self._sync_iter(self.model.list_dir(path)))
store_ls = sorted(self._sync_iter(self.store.list_dir(path)))
if model_ls != store_ls and set(model_ls).symmetric_difference(set(store_ls)) != {
"c"
}:
# Consider .list_dir("path/to/array") for an array with a single chunk.
# The MemoryStore model will return `"c", "zarr.json"` only if the chunk exists
# If that chunk was deleted, then `"c"` is not returned.
# LocalStore will not have this behaviour :/
# In Icechunk, we always return the `c` so ignore this inconsistency.
assert model_ls == store_ls, (model_ls, store_ls)

@precondition(lambda self: bool(self.all_arrays))
@rule(data=st.data())
def delete_chunk(self, data) -> None:
Expand All @@ -182,6 +316,32 @@ def delete_chunk(self, data) -> None:
self._sync(self.model.delete(path))
self._sync(self.store.delete(path))

@precondition(lambda self: bool(self.all_arrays))
@rule(data=st.data())
def overwrite_array_basic_indexing(self, data) -> None:
array = data.draw(st.sampled_from(sorted(self.all_arrays)))
model_array = zarr.open_array(path=array, store=self.model)
store_array = zarr.open_array(path=array, store=self.store)
slicer = data.draw(basic_indices(shape=model_array.shape))
note(f"overwriting array basic {slicer=}")
new_data = data.draw(
npst.arrays(shape=model_array[slicer].shape, dtype=model_array.dtype)
)
model_array[slicer] = new_data
store_array[slicer] = new_data

@precondition(lambda self: bool(self.all_arrays))
@rule(data=st.data())
def resize_array(self, data) -> None:
array = data.draw(st.sampled_from(sorted(self.all_arrays)))
model_array = zarr.open_array(path=array, store=self.model)
store_array = zarr.open_array(path=array, store=self.store)
ndim = model_array.ndim
new_shape = data.draw(npst.array_shapes(max_dims=ndim, min_dims=ndim, min_side=1))
note(f"resizing array from {model_array.shape} to {new_shape}")
model_array.resize(new_shape)
store_array.resize(new_shape)

@precondition(lambda self: bool(self.all_arrays) or bool(self.all_groups))
@rule(data=st.data())
def delete_dir(self, data) -> None:
Expand Down Expand Up @@ -219,10 +379,8 @@ def check_list_prefix_from_root(self) -> None:


def test_zarr_hierarchy() -> None:
repo = Repository.create(in_memory_storage())

def mk_test_instance_sync() -> ModifiedZarrHierarchyStateMachine:
return ModifiedZarrHierarchyStateMachine(repo)
return ModifiedZarrHierarchyStateMachine(in_memory_storage())

run_state_machine_as_test(
mk_test_instance_sync, settings=Settings(report_multiple_bugs=False)
Expand Down
Loading