Skip to content

Commit b1f34c5

Browse files
authored
Merge branch 'main' into icechunk-js
2 parents a7175b0 + 57b4e7a commit b1f34c5

File tree

14 files changed

+384
-90
lines changed

14 files changed

+384
-90
lines changed

icechunk-python/python/icechunk/_icechunk_python.pyi

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,6 +1223,7 @@ class RepositoryConfig:
12231223
virtual_chunk_containers: dict[str, VirtualChunkContainer] | None = None,
12241224
manifest: ManifestConfig | None = None,
12251225
repo_update_retries: RepoUpdateRetryConfig | None = None,
1226+
num_updates_per_repo_info_file: int | None = None,
12261227
) -> RepositoryConfig:
12271228
"""
12281229
Create a new `RepositoryConfig` object
@@ -1439,6 +1440,12 @@ class RepositoryConfig:
14391440
Clear all virtual chunk containers from the repository.
14401441
"""
14411442
...
1443+
@property
1444+
def num_updates_per_repo_info_file(self) -> int | None:
1445+
"""The number of updates per repo info file."""
1446+
...
1447+
@num_updates_per_repo_info_file.setter
1448+
def num_updates_per_repo_info_file(self, value: int | None) -> None: ...
14421449
def merge(self, other: RepositoryConfig) -> RepositoryConfig:
14431450
"""
14441451
Merge another RepositoryConfig with this one.

icechunk-python/python/icechunk/testing/strategies.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,24 @@ def splitting_configs(
5454
key: draw(st.integers(min_value=1, max_value=size + 10))
5555
}
5656
return ic.ManifestSplittingConfig.from_dict(config_dict)
57+
58+
59+
@st.composite
60+
def repository_configs(
61+
draw: st.DrawFn,
62+
num_updates_per_repo_info_file: st.SearchStrategy[int] = st.integers( # noqa: B008
63+
min_value=1, max_value=50
64+
),
65+
inline_chunk_threshold_bytes: st.SearchStrategy[int] | None = None,
66+
splitting: st.SearchStrategy[ic.ManifestSplittingConfig] | None = None,
67+
) -> ic.RepositoryConfig:
68+
manifest = None
69+
if splitting is not None:
70+
manifest = ic.ManifestConfig(splitting=draw(splitting))
71+
return ic.RepositoryConfig(
72+
num_updates_per_repo_info_file=draw(num_updates_per_repo_info_file),
73+
inline_chunk_threshold_bytes=draw(inline_chunk_threshold_bytes)
74+
if inline_chunk_threshold_bytes is not None
75+
else None,
76+
manifest=manifest,
77+
)

icechunk-python/src/config.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1529,6 +1529,8 @@ pub struct PyRepositoryConfig {
15291529
pub previous_file: Option<String>,
15301530
#[pyo3(get, set)]
15311531
pub repo_update_retries: Option<Py<PyRepoUpdateRetryConfig>>,
1532+
#[pyo3(get, set)]
1533+
pub num_updates_per_repo_info_file: Option<u16>,
15321534
}
15331535

15341536
impl PartialEq for PyRepositoryConfig {
@@ -1567,6 +1569,7 @@ impl TryFrom<&PyRepositoryConfig> for RepositoryConfig {
15671569
.repo_update_retries
15681570
.as_ref()
15691571
.map(|r| (&*r.borrow(py)).into()),
1572+
num_updates_per_repo_info_file: value.num_updates_per_repo_info_file,
15701573
})
15711574
})
15721575
}
@@ -1604,6 +1607,7 @@ impl From<RepositoryConfig> for PyRepositoryConfig {
16041607
Py::new(py, Into::<PyRepoUpdateRetryConfig>::into(r))
16051608
.expect("Cannot create instance of RepoUpdateRetryConfig")
16061609
}),
1610+
num_updates_per_repo_info_file: value.num_updates_per_repo_info_file,
16071611
})
16081612
}
16091613
}
@@ -1617,7 +1621,7 @@ impl PyRepositoryConfig {
16171621
}
16181622

16191623
#[new]
1620-
#[pyo3(signature = (inline_chunk_threshold_bytes = None, get_partial_values_concurrency = None, compression = None, max_concurrent_requests = None, caching = None, storage = None, virtual_chunk_containers = None, manifest = None, repo_update_retries = None))]
1624+
#[pyo3(signature = (inline_chunk_threshold_bytes = None, get_partial_values_concurrency = None, compression = None, max_concurrent_requests = None, caching = None, storage = None, virtual_chunk_containers = None, manifest = None, repo_update_retries = None, num_updates_per_repo_info_file = None))]
16211625
#[allow(clippy::too_many_arguments)]
16221626
pub fn new(
16231627
inline_chunk_threshold_bytes: Option<u16>,
@@ -1629,6 +1633,7 @@ impl PyRepositoryConfig {
16291633
virtual_chunk_containers: Option<HashMap<String, PyVirtualChunkContainer>>,
16301634
manifest: Option<Py<PyManifestConfig>>,
16311635
repo_update_retries: Option<Py<PyRepoUpdateRetryConfig>>,
1636+
num_updates_per_repo_info_file: Option<u16>,
16321637
) -> Self {
16331638
Self {
16341639
inline_chunk_threshold_bytes,
@@ -1641,6 +1646,7 @@ impl PyRepositoryConfig {
16411646
manifest,
16421647
previous_file: None,
16431648
repo_update_retries,
1649+
num_updates_per_repo_info_file,
16441650
}
16451651
}
16461652

icechunk-python/src/repository.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2131,9 +2131,10 @@ impl PyRepository {
21312131
py.detach(move || {
21322132
let result =
21332133
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
2134-
let asset_manager = {
2134+
let (asset_manager, num_updates) = {
21352135
let lock = self.0.read().await;
2136-
Arc::clone(lock.asset_manager())
2136+
let num_updates = lock.config().num_updates_per_repo_info_file();
2137+
(Arc::clone(lock.asset_manager()), num_updates)
21372138
};
21382139

21392140
let result = expire(
@@ -2150,6 +2151,7 @@ impl PyRepository {
21502151
ExpiredRefAction::Ignore
21512152
},
21522153
None,
2154+
num_updates,
21532155
)
21542156
.await
21552157
.map_err(PyIcechunkStoreError::GCError)?;
@@ -2176,9 +2178,10 @@ impl PyRepository {
21762178
) -> PyResult<Bound<'py, PyAny>> {
21772179
let repository = self.0.clone();
21782180
pyo3_async_runtimes::tokio::future_into_py::<_, HashSet<String>>(py, async move {
2179-
let asset_manager = {
2181+
let (asset_manager, num_updates) = {
21802182
let lock = repository.read().await;
2181-
Arc::clone(lock.asset_manager())
2183+
let num_updates = lock.config().num_updates_per_repo_info_file();
2184+
(Arc::clone(lock.asset_manager()), num_updates)
21822185
};
21832186

21842187
let result = expire(
@@ -2195,6 +2198,7 @@ impl PyRepository {
21952198
ExpiredRefAction::Ignore
21962199
},
21972200
None,
2201+
num_updates,
21982202
)
21992203
.await
22002204
.map_err(PyIcechunkStoreError::GCError)?;
@@ -2224,13 +2228,15 @@ impl PyRepository {
22242228
max_concurrent_manifest_fetches,
22252229
dry_run,
22262230
);
2227-
let asset_manager = {
2231+
let (asset_manager, num_updates) = {
22282232
let lock = self.0.read().await;
2229-
Arc::clone(lock.asset_manager())
2233+
let num_updates = lock.config().num_updates_per_repo_info_file();
2234+
(Arc::clone(lock.asset_manager()), num_updates)
22302235
};
2231-
let result = garbage_collect(asset_manager, &gc_config, None)
2232-
.await
2233-
.map_err(PyIcechunkStoreError::GCError)?;
2236+
let result =
2237+
garbage_collect(asset_manager, &gc_config, None, num_updates)
2238+
.await
2239+
.map_err(PyIcechunkStoreError::GCError)?;
22342240
Ok::<_, PyIcechunkStoreError>(result.into())
22352241
})?;
22362242

@@ -2258,11 +2264,12 @@ impl PyRepository {
22582264
max_concurrent_manifest_fetches,
22592265
dry_run,
22602266
);
2261-
let asset_manager = {
2267+
let (asset_manager, num_updates) = {
22622268
let lock = repository.read().await;
2263-
Arc::clone(lock.asset_manager())
2269+
let num_updates = lock.config().num_updates_per_repo_info_file();
2270+
(Arc::clone(lock.asset_manager()), num_updates)
22642271
};
2265-
let result = garbage_collect(asset_manager, &gc_config, None)
2272+
let result = garbage_collect(asset_manager, &gc_config, None, num_updates)
22662273
.await
22672274
.map_err(PyIcechunkStoreError::GCError)?;
22682275
Ok(result.into())

icechunk-python/tests/test_stateful_repo_ops.py

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@
3939
from icechunk import (
4040
IcechunkError,
4141
Repository,
42+
RepositoryConfig,
4243
SnapshotInfo,
4344
Storage,
4445
in_memory_storage,
4546
)
47+
from icechunk.testing.strategies import repository_configs
4648
from zarr.testing.stateful import SyncStoreWrapper
4749

4850
# JSON file contents, keep it simple
@@ -110,7 +112,10 @@ class TagModel:
110112

111113
class Model:
112114
def __init__(self, **kwargs: Any) -> None:
113-
self.store: dict[str, Any] = {} #
115+
self.store: dict[str, Any] = {}
116+
117+
self.spec_version = 1 # will be overwritten on `@initialize`
118+
self.num_updates: int = 0
114119

115120
self.initial_snapshot_id: str | None = None
116121
self.changes_made: bool = False
@@ -151,6 +156,9 @@ def __setitem__(self, key: str, value: Buffer) -> None:
151156
def __getitem__(self, key: str) -> Buffer:
152157
return cast(Buffer, self.store[key])
153158

159+
def upgrade(self) -> None:
160+
self.num_updates += 1
161+
154162
@property
155163
def has_commits(self) -> bool:
156164
return bool(self.commits)
@@ -171,10 +179,11 @@ def commit(self, snap: SnapshotInfo) -> None:
171179

172180
assert self.branch is not None
173181
self.branch_heads[self.branch] = ref
182+
self.num_updates += 1
174183

175184
def amend(self, snap: SnapshotInfo) -> None:
176185
"""Amend the HEAD commit."""
177-
# this is simpe because we aren't modeling the branch as a list of commits
186+
# this is simple because we aren't modeling the branch as a list of commits
178187
self.commit(snap)
179188

180189
def checkout_commit(self, ref: str) -> None:
@@ -189,6 +198,7 @@ def checkout_commit(self, ref: str) -> None:
189198
def create_branch(self, name: str, commit: str) -> None:
190199
assert commit in self.commits
191200
self.branch_heads[name] = commit
201+
self.num_updates += 1
192202

193203
def checkout_branch(self, ref: str) -> None:
194204
self.checkout_commit(self.branch_heads[ref])
@@ -198,17 +208,27 @@ def checkout_branch(self, ref: str) -> None:
198208
def reset_branch(self, branch: str, commit: str) -> None:
199209
assert commit in self.commits
200210
self.branch_heads[branch] = commit
211+
self.num_updates += 1
201212

202213
def delete_branch(self, branch_name: str) -> None:
214+
self._delete_branch(branch_name)
215+
self.num_updates += 1
216+
217+
def _delete_branch(self, branch_name: str) -> None:
203218
del self.branch_heads[branch_name]
204219

205220
def delete_tag(self, tag: str) -> None:
221+
self._delete_tag(tag)
222+
self.num_updates += 1
223+
224+
def _delete_tag(self, tag: str) -> None:
206225
del self.tags[tag]
207226

208227
def create_tag(self, tag_name: str, commit_id: str) -> None:
209228
assert commit_id in self.commits
210229
self.tags[tag_name] = TagModel(commit_id=str(commit_id))
211230
self.created_tags.add(tag_name)
231+
self.num_updates += 1
212232

213233
def checkout_tag(self, ref: str) -> None:
214234
self.checkout_commit(self.tags[str(ref)].commit_id)
@@ -261,7 +281,7 @@ def expire_snapshots(
261281
}
262282
note(f"deleting tags {tags_to_delete=!r}")
263283
for tag in tags_to_delete:
264-
self.delete_tag(tag)
284+
self._delete_tag(tag)
265285
else:
266286
tags_to_delete = set()
267287

@@ -274,10 +294,12 @@ def expire_snapshots(
274294
note(f"deleting branches {branches_to_delete=!r}")
275295
for branch in branches_to_delete:
276296
note(f"deleting {branch=!r}, {self.branch_heads[branch]=!r}")
277-
self.delete_branch(branch)
297+
self._delete_branch(branch)
278298
else:
279299
branches_to_delete = set()
280300

301+
self.num_updates += 1
302+
281303
return ExpireInfo(
282304
expired_snapshots=expired_snaps,
283305
deleted_branches=branches_to_delete,
@@ -301,6 +323,7 @@ def garbage_collect(self, older_than: datetime.datetime) -> set[str]:
301323
self.ondisk_snaps.pop(k, None)
302324
deleted.add(k)
303325
note(f"Deleted snapshots in model: {deleted!r}")
326+
self.num_updates += 1
304327
return deleted
305328

306329

@@ -325,7 +348,14 @@ def __init__(self) -> None:
325348
@initialize(data=st.data(), target=branches, spec_version=st.sampled_from([1, 2]))
326349
def initialize(self, data: st.DataObject, spec_version: Literal[1, 2]) -> str:
327350
self.storage = in_memory_storage()
328-
self.repo = Repository.create(self.storage, spec_version=spec_version)
351+
config = data.draw(repository_configs())
352+
self.model.spec_version = spec_version
353+
354+
self.repo = Repository.create(
355+
self.storage,
356+
spec_version=spec_version,
357+
config=config,
358+
)
329359
self.session = self.repo.writable_session(DEFAULT_BRANCH)
330360

331361
snap = next(iter(self.repo.ancestry(branch=DEFAULT_BRANCH)))
@@ -338,6 +368,9 @@ def initialize(self, data: st.DataObject, spec_version: Literal[1, 2]) -> str:
338368
self.model.HEAD = HEAD
339369
self.model.create_branch(DEFAULT_BRANCH, HEAD)
340370
self.model.checkout_branch(DEFAULT_BRANCH)
371+
# RepoInitializedUpdate includes the initial branch creation,
372+
# so reset to 1 after create_branch incremented it.
373+
self.model.num_updates = 1
341374

342375
# initialize with some data always
343376
# TODO: always setting array metadata, since we cannot overwrite an existing group's zarr.json
@@ -371,17 +404,22 @@ def upgrade_spec_version(self) -> None:
371404
# don't test simple cases of catching error upgradging a v2 spec
372405
# that should be covered in unit tests
373406
icechunk.upgrade_icechunk_repository(self.repo)
407+
self.model.upgrade()
374408
# TODO: remove the reopen after https://github.com/earth-mover/icechunk/issues/1521
375-
self.reopen_repository()
409+
self._reopen_repository()
376410

377-
@rule()
378-
def reopen_repository(self) -> None:
411+
@rule(data=st.data())
412+
def reopen_repository(self, data: st.DataObject) -> None:
413+
config = data.draw(repository_configs())
414+
self._reopen_repository(config)
415+
416+
def _reopen_repository(self, config: RepositoryConfig | None = None) -> None:
379417
"""Reopen the repository from storage to get fresh state.
380418
381419
This discards any uncommitted changes.
382420
"""
383421
assert self.storage is not None, "storage must be initialized"
384-
self.repo = Repository.open(self.storage)
422+
self.repo = Repository.open(self.storage, config=config)
385423
note(f"Reopened repository (spec_version={self.repo.spec_version})")
386424

387425
# Reopening discards uncommitted changes - reset model to last committed state
@@ -688,6 +726,17 @@ def check_commit(self, commit: str) -> None:
688726
# even after expiration, written_at is unmodified
689727
assert actual.written_at == expected.written_at
690728

729+
def check_ops_log(self) -> None:
730+
if self.model.spec_version == 1:
731+
return
732+
actual_ops = list(self.repo.ops_log())
733+
assert len(actual_ops) == self.model.num_updates, (
734+
actual_ops,
735+
self.model.num_updates,
736+
actual_ops,
737+
)
738+
assert isinstance(actual_ops[-1], icechunk.RepoInitializedUpdate)
739+
691740
@invariant()
692741
def checks(self) -> None:
693742
# this method only exists to reduce verbosity of hypothesis output
@@ -697,6 +746,7 @@ def checks(self) -> None:
697746
self.check_tags()
698747
self.check_branches()
699748
self.check_ancestry()
749+
self.check_ops_log()
700750

701751
def check_list_prefix_from_root(self) -> None:
702752
model_list = self.model.list_prefix("")

icechunk-python/tests/test_zarr/test_stateful.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,13 @@ def reopen_with_config(self, data: st.DataObject) -> None:
109109
st.lists(st.sampled_from(sorted(self.all_arrays)), max_size=3, unique=True)
110110
)
111111
arrays = tuple(zarr.open_array(self.model, path=path) for path in array_paths)
112-
sconfig = data.draw(icst.splitting_configs(arrays=arrays))
113-
config = ic.RepositoryConfig(
114-
inline_chunk_threshold_bytes=0, manifest=ic.ManifestConfig(splitting=sconfig)
112+
config = data.draw(
113+
icst.repository_configs(
114+
inline_chunk_threshold_bytes=st.just(0),
115+
splitting=icst.splitting_configs(arrays=arrays),
116+
)
115117
)
116-
note(f"reopening with splitting config {sconfig=!r}")
118+
note(f"reopening with config {config!r}")
117119
self.repo = Repository.open(self.storage, config=config)
118120
if data.draw(st.booleans()):
119121
self.repo.save_config()

0 commit comments

Comments
 (0)