Skip to content

Commit 0f24418

Browse files
authored
Finish manifest splitting (#948)
1 parent 0a2b941 commit 0f24418

File tree

16 files changed

+2175
-432
lines changed

16 files changed

+2175
-432
lines changed

icechunk-python/benchmarks/test_benchmark_writes.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,9 @@ def write():
181181

182182

183183
@pytest.mark.benchmark(group="refs-write")
184-
def test_write_split_manifest_refs(benchmark, splitting, large_write_dataset) -> None:
184+
def test_write_split_manifest_refs_full_rewrite(
185+
benchmark, splitting, large_write_dataset
186+
) -> None:
185187
dataset = large_write_dataset
186188
config = repo_config_with(splitting=splitting)
187189
assert config is not None
@@ -219,3 +221,53 @@ def commit(session_from_setup):
219221
session_from_setup.commit("wrote refs")
220222

221223
benchmark.pedantic(commit, setup=write_refs, iterations=1, rounds=10)
224+
225+
226+
@pytest.mark.benchmark(group="refs-write")
227+
def test_write_split_manifest_refs_append(
228+
benchmark, splitting, large_write_dataset
229+
) -> None:
230+
dataset = large_write_dataset
231+
config = repo_config_with(splitting=splitting)
232+
assert config is not None
233+
if hasattr(config.manifest, "splitting"):
234+
assert config.manifest.splitting == splitting
235+
repo = dataset.create(config=config)
236+
session = repo.writable_session(branch="main")
237+
store = session.store
238+
group = zarr.open_group(store, zarr_format=3)
239+
group.create_array(
240+
"array",
241+
shape=dataset.shape,
242+
chunks=dataset.chunks,
243+
dtype="int8",
244+
fill_value=0,
245+
compressors=None,
246+
)
247+
session.commit("initialize")
248+
249+
# yuck, but I'm abusing `rounds` to do a loop and time _only_ the commit.
250+
global counter
251+
counter = 0
252+
rounds = 10
253+
num_chunks = dataset.shape[0] // dataset.chunks[0]
254+
batch_size = num_chunks // rounds
255+
256+
def write_refs() -> Session:
257+
global counter
258+
session = repo.writable_session(branch="main")
259+
chunks = [
260+
VirtualChunkSpec(
261+
index=[i], location=f"s3://foo/bar/{i}.nc", offset=0, length=1
262+
)
263+
for i in range(counter * batch_size, counter * batch_size + batch_size)
264+
]
265+
counter += 1
266+
session.store.set_virtual_refs("array", chunks)
267+
# (args, kwargs)
268+
return ((session,), {})
269+
270+
def commit(session_from_setup):
271+
session_from_setup.commit("wrote refs")
272+
273+
benchmark.pedantic(commit, setup=write_refs, iterations=1, rounds=rounds)

icechunk-python/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ benchmark = [
5151
"humanize",
5252
"platformdirs",
5353
"ipdb",
54+
"coiled",
5455
]
5556
docs = [
5657
"scipy",

icechunk-python/src/session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,14 @@ impl PySession {
165165
pub fn merge(&self, other: &PySession, py: Python<'_>) -> PyResult<()> {
166166
// This is blocking function, we need to release the Gil
167167
py.allow_threads(move || {
168-
// TODO: Bad clone
169-
let changes = other.0.blocking_read().deref().changes().clone();
168+
// TODO: bad clone
169+
let other = other.0.blocking_read().deref().clone();
170170

171171
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
172172
self.0
173173
.write()
174174
.await
175-
.merge(changes)
175+
.merge(other)
176176
.await
177177
.map_err(PyIcechunkStoreError::SessionError)?;
178178
Ok(())

icechunk-python/tests/test_manifest_splitting.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def test_manifest_splitting_appends():
102102
nchunks += math.prod(NEWSHAPE) * 2
103103
# the lon size goes from 17 -> 19 so one extra manifest,
104104
# compared to previous writes
105-
nmanifests += 7 * 2
105+
nmanifests += 2 * 2
106106

107107
assert len(os.listdir(f"{tmpdir}/chunks")) == nchunks
108108
assert len(os.listdir(f"{tmpdir}/manifests")) == nmanifests

icechunk-python/tests/test_stateful_repo_ops.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ def check_list_prefix_from_root(self) -> None:
570570
# need to load to dict to compare since ordering of entries might differ
571571
expected = json.loads(self.model[k].to_bytes())
572572
value = self.sync_store.get(k, default_buffer_prototype())
573-
assert value is not None
573+
assert value is not None, k
574574
actual = json.loads(value.to_bytes())
575575
actual_fv = actual.pop("fill_value")
576576
expected_fv = expected.pop("fill_value")

icechunk-python/tests/test_zarr/test_stateful.py

Lines changed: 182 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import json
2+
from collections.abc import Iterable
23
from typing import Any
34

5+
import hypothesis.extra.numpy as npst
46
import hypothesis.strategies as st
57
import numpy as np
68
import pytest
@@ -13,18 +15,74 @@
1315
run_state_machine_as_test,
1416
)
1517

18+
import icechunk as ic
1619
import zarr
17-
from icechunk import Repository, in_memory_storage
20+
from icechunk import Repository, Storage, in_memory_storage
1821
from zarr.core.buffer import default_buffer_prototype
1922
from zarr.testing.stateful import ZarrHierarchyStateMachine
2023
from zarr.testing.strategies import (
24+
basic_indices,
2125
node_names,
2226
np_array_and_chunks,
2327
numpy_arrays,
28+
orthogonal_indices,
2429
)
2530

2631
PROTOTYPE = default_buffer_prototype()
2732

33+
# pytestmark = [
34+
# pytest.mark.filterwarnings(
35+
# "ignore::zarr.core.dtype.common.UnstableSpecificationWarning"
36+
# ),
37+
# ]
38+
39+
40+
import functools
41+
42+
43+
def with_frequency(frequency):
44+
"""
45+
Decorator to control how frequently a rule runs in Hypothesis stateful tests.
46+
47+
Args:
48+
frequency: Float between 0 and 1, where 1.0 means always run,
49+
0.1 means run ~10% of the time, etc.
50+
51+
Usage:
52+
@rule()
53+
@with_frequency(0.1) # Run ~10% of the time
54+
def rare_operation(self):
55+
pass
56+
"""
57+
58+
def decorator(func):
59+
# Create a counter attribute name specific to this function
60+
counter_attr = f"__{func.__name__}_counter"
61+
62+
@functools.wraps(func)
63+
def wrapper(self, *args, **kwargs):
64+
return func(self, *args, **kwargs)
65+
66+
# Add precondition that checks frequency
67+
@precondition
68+
def frequency_check(self):
69+
# Initialize counter if it doesn't exist
70+
if not hasattr(self, counter_attr):
71+
setattr(self, counter_attr, 0)
72+
73+
# Increment counter
74+
current_count = getattr(self, counter_attr) + 1
75+
setattr(self, counter_attr, current_count)
76+
77+
# Check if we should run based on frequency
78+
# This gives roughly the right frequency over many calls
79+
return (current_count * frequency) % 1.0 >= (1.0 - frequency)
80+
81+
# Apply the precondition to the wrapped function
82+
return frequency_check(wrapper)
83+
84+
return decorator
85+
2886

2987
@st.composite
3088
def chunk_paths(
@@ -39,14 +97,66 @@ def chunk_paths(
3997
return "/".join(map(str, blockidx[subset_slicer]))
4098

4199

100+
@st.composite
101+
def splitting_configs(
102+
draw: st.DrawFn, *, arrays: Iterable[zarr.Array]
103+
) -> ic.ManifestSplittingConfig:
104+
config_dict = {}
105+
for array in arrays:
106+
if draw(st.booleans()):
107+
array_condition = ic.ManifestSplitCondition.name_matches(
108+
array.path.split("/")[-1]
109+
)
110+
else:
111+
array_condition = ic.ManifestSplitCondition.path_matches(array.path)
112+
dimnames = array.metadata.dimension_names or (None,) * array.ndim
113+
dimsize_axis_names = draw(
114+
st.lists(
115+
st.sampled_from(
116+
tuple(zip(array.shape, range(array.ndim), dimnames, strict=False))
117+
),
118+
min_size=1,
119+
unique=True,
120+
)
121+
)
122+
for size, axis, dimname in dimsize_axis_names:
123+
if dimname is None or draw(st.booleans()):
124+
key = ic.ManifestSplitDimCondition.Axis(axis)
125+
else:
126+
key = ic.ManifestSplitDimCondition.DimensionName(dimname)
127+
config_dict[array_condition] = {
128+
key: draw(st.integers(min_value=1, max_value=size + 10))
129+
}
130+
return ic.ManifestSplittingConfig.from_dict(config_dict)
131+
132+
42133
# TODO: more before/after commit invariants?
43134
# TODO: add "/" to self.all_groups, deleting "/" seems to be problematic
44135
class ModifiedZarrHierarchyStateMachine(ZarrHierarchyStateMachine):
45-
def __init__(self, repo: Repository) -> None:
46-
self.repo = repo
47-
store = repo.writable_session("main").store
136+
def __init__(self, storage: Storage) -> None:
137+
self.storage = storage
138+
self.repo = Repository.create(self.storage)
139+
store = self.repo.writable_session("main").store
48140
super().__init__(store)
49141

142+
@precondition(
143+
lambda self: not self.store.session.has_uncommitted_changes
144+
and bool(self.all_arrays)
145+
)
146+
@rule(data=st.data())
147+
def reopen_with_config(self, data):
148+
array_paths = data.draw(
149+
st.lists(st.sampled_from(sorted(self.all_arrays)), max_size=3, unique=True)
150+
)
151+
arrays = tuple(zarr.open_array(self.model, path=path) for path in array_paths)
152+
sconfig = data.draw(splitting_configs(arrays=arrays))
153+
config = ic.RepositoryConfig(
154+
inline_chunk_threshold_bytes=0, manifest=ic.ManifestConfig(splitting=sconfig)
155+
)
156+
note(f"reopening with splitting config {sconfig=!r}")
157+
self.repo = Repository.open(self.storage, config=config)
158+
self.store = self.repo.writable_session("main").store
159+
50160
@precondition(lambda self: self.store.session.has_uncommitted_changes)
51161
@rule(data=st.data())
52162
def commit_with_check(self, data) -> None:
@@ -108,8 +218,49 @@ def add_array(
108218
assume(array.size > 0)
109219
super().add_array(data, name, array_and_chunks)
110220

221+
@precondition(lambda self: bool(self.all_groups))
222+
@rule(data=st.data())
223+
def check_list_dir(self, data: st.DataObject) -> None:
224+
path = self.draw_directory(data)
225+
note(f"list_dir for {path=!r}")
226+
model_ls = sorted(self._sync_iter(self.model.list_dir(path)))
227+
store_ls = sorted(self._sync_iter(self.store.list_dir(path)))
228+
if model_ls != store_ls and set(model_ls).symmetric_difference(set(store_ls)) != {
229+
"c"
230+
}:
231+
# Consider .list_dir("path/to/array") for an array with a single chunk.
232+
# The MemoryStore model will return `"c", "zarr.json"` only if the chunk exists
233+
# If that chunk was deleted, then `"c"` is not returned.
234+
# LocalStore will not have this behaviour :/
235+
# In Icechunk, we always return the `c` so ignore this inconsistency.
236+
assert model_ls == store_ls, (model_ls, store_ls)
237+
111238
##### TODO: port everything below to zarr
239+
@precondition(lambda self: bool(self.all_arrays))
240+
@rule(data=st.data())
241+
def check_array(self, data: st.DataObject) -> None:
242+
path = data.draw(st.sampled_from(sorted(self.all_arrays)))
243+
actual = zarr.open_array(self.store, path=path)[:]
244+
expected = zarr.open_array(self.model, path=path)[:]
245+
np.testing.assert_equal(actual, expected)
246+
247+
@precondition(lambda self: bool(self.all_arrays))
248+
@rule(data=st.data())
249+
def overwrite_array_orthogonal_indexing(self, data: st.DataObject) -> None:
250+
array = data.draw(st.sampled_from(sorted(self.all_arrays)))
251+
model_array = zarr.open_array(path=array, store=self.model)
252+
store_array = zarr.open_array(path=array, store=self.store)
253+
indexer, _ = data.draw(orthogonal_indices(shape=model_array.shape))
254+
note(f"overwriting array orthogonal {indexer=}")
255+
new_data = data.draw(
256+
npst.arrays(shape=model_array.oindex[indexer].shape, dtype=model_array.dtype)
257+
)
258+
model_array.oindex[indexer] = new_data
259+
store_array.oindex[indexer] = new_data
260+
261+
##### TODO: delete after next Zarr release (Jun 18, 2025)
112262
@rule()
263+
@with_frequency(0.25)
113264
def clear(self) -> None:
114265
note("clearing")
115266
import zarr
@@ -152,23 +303,6 @@ def draw_directory(self, data) -> str:
152303
path = array_or_group
153304
return path
154305

155-
@precondition(lambda self: bool(self.all_groups))
156-
@rule(data=st.data())
157-
def check_list_dir(self, data) -> None:
158-
path = self.draw_directory(data)
159-
note(f"list_dir for {path=!r}")
160-
model_ls = sorted(self._sync_iter(self.model.list_dir(path)))
161-
store_ls = sorted(self._sync_iter(self.store.list_dir(path)))
162-
if model_ls != store_ls and set(model_ls).symmetric_difference(set(store_ls)) != {
163-
"c"
164-
}:
165-
# Consider .list_dir("path/to/array") for an array with a single chunk.
166-
# The MemoryStore model will return `"c", "zarr.json"` only if the chunk exists
167-
# If that chunk was deleted, then `"c"` is not returned.
168-
# LocalStore will not have this behaviour :/
169-
# In Icechunk, we always return the `c` so ignore this inconsistency.
170-
assert model_ls == store_ls, (model_ls, store_ls)
171-
172306
@precondition(lambda self: bool(self.all_arrays))
173307
@rule(data=st.data())
174308
def delete_chunk(self, data) -> None:
@@ -182,6 +316,32 @@ def delete_chunk(self, data) -> None:
182316
self._sync(self.model.delete(path))
183317
self._sync(self.store.delete(path))
184318

319+
@precondition(lambda self: bool(self.all_arrays))
320+
@rule(data=st.data())
321+
def overwrite_array_basic_indexing(self, data) -> None:
322+
array = data.draw(st.sampled_from(sorted(self.all_arrays)))
323+
model_array = zarr.open_array(path=array, store=self.model)
324+
store_array = zarr.open_array(path=array, store=self.store)
325+
slicer = data.draw(basic_indices(shape=model_array.shape))
326+
note(f"overwriting array basic {slicer=}")
327+
new_data = data.draw(
328+
npst.arrays(shape=model_array[slicer].shape, dtype=model_array.dtype)
329+
)
330+
model_array[slicer] = new_data
331+
store_array[slicer] = new_data
332+
333+
@precondition(lambda self: bool(self.all_arrays))
334+
@rule(data=st.data())
335+
def resize_array(self, data) -> None:
336+
array = data.draw(st.sampled_from(sorted(self.all_arrays)))
337+
model_array = zarr.open_array(path=array, store=self.model)
338+
store_array = zarr.open_array(path=array, store=self.store)
339+
ndim = model_array.ndim
340+
new_shape = data.draw(npst.array_shapes(max_dims=ndim, min_dims=ndim, min_side=1))
341+
note(f"resizing array from {model_array.shape} to {new_shape}")
342+
model_array.resize(new_shape)
343+
store_array.resize(new_shape)
344+
185345
@precondition(lambda self: bool(self.all_arrays) or bool(self.all_groups))
186346
@rule(data=st.data())
187347
def delete_dir(self, data) -> None:
@@ -219,10 +379,8 @@ def check_list_prefix_from_root(self) -> None:
219379

220380

221381
def test_zarr_hierarchy() -> None:
222-
repo = Repository.create(in_memory_storage())
223-
224382
def mk_test_instance_sync() -> ModifiedZarrHierarchyStateMachine:
225-
return ModifiedZarrHierarchyStateMachine(repo)
383+
return ModifiedZarrHierarchyStateMachine(in_memory_storage())
226384

227385
run_state_machine_as_test(
228386
mk_test_instance_sync, settings=Settings(report_multiple_bugs=False)

0 commit comments

Comments
 (0)