diff --git a/icechunk-python/benchmarks/test_benchmark_writes.py b/icechunk-python/benchmarks/test_benchmark_writes.py index 8c82fa4c9..efe77e8cc 100644 --- a/icechunk-python/benchmarks/test_benchmark_writes.py +++ b/icechunk-python/benchmarks/test_benchmark_writes.py @@ -181,7 +181,9 @@ def write(): @pytest.mark.benchmark(group="refs-write") -def test_write_split_manifest_refs(benchmark, splitting, large_write_dataset) -> None: +def test_write_split_manifest_refs_full_rewrite( + benchmark, splitting, large_write_dataset +) -> None: dataset = large_write_dataset config = repo_config_with(splitting=splitting) assert config is not None @@ -219,3 +221,53 @@ def commit(session_from_setup): session_from_setup.commit("wrote refs") benchmark.pedantic(commit, setup=write_refs, iterations=1, rounds=10) + + +@pytest.mark.benchmark(group="refs-write") +def test_write_split_manifest_refs_append( + benchmark, splitting, large_write_dataset +) -> None: + dataset = large_write_dataset + config = repo_config_with(splitting=splitting) + assert config is not None + if hasattr(config.manifest, "splitting"): + assert config.manifest.splitting == splitting + repo = dataset.create(config=config) + session = repo.writable_session(branch="main") + store = session.store + group = zarr.open_group(store, zarr_format=3) + group.create_array( + "array", + shape=dataset.shape, + chunks=dataset.chunks, + dtype="int8", + fill_value=0, + compressors=None, + ) + session.commit("initialize") + + # yuck, but I'm abusing `rounds` to do a loop and time _only_ the commit. + global counter + counter = 0 + rounds = 10 + num_chunks = dataset.shape[0] // dataset.chunks[0] + batch_size = num_chunks // rounds + + def write_refs() -> Session: + global counter + session = repo.writable_session(branch="main") + chunks = [ + VirtualChunkSpec( + index=[i], location=f"s3://foo/bar/{i}.nc", offset=0, length=1 + ) + for i in range(counter * batch_size, counter * batch_size + batch_size) + ] + counter += 1 + session.store.set_virtual_refs("array", chunks) + # (args, kwargs) + return ((session,), {}) + + def commit(session_from_setup): + session_from_setup.commit("wrote refs") + + benchmark.pedantic(commit, setup=write_refs, iterations=1, rounds=rounds) diff --git a/icechunk-python/pyproject.toml b/icechunk-python/pyproject.toml index 478bf3a0e..ee2db15b6 100644 --- a/icechunk-python/pyproject.toml +++ b/icechunk-python/pyproject.toml @@ -51,6 +51,7 @@ benchmark = [ "humanize", "platformdirs", "ipdb", + "coiled", ] docs = [ "scipy", diff --git a/icechunk-python/src/session.rs b/icechunk-python/src/session.rs index cf5306dcc..e5606c76a 100644 --- a/icechunk-python/src/session.rs +++ b/icechunk-python/src/session.rs @@ -165,14 +165,14 @@ impl PySession { pub fn merge(&self, other: &PySession, py: Python<'_>) -> PyResult<()> { // This is blocking function, we need to release the Gil py.allow_threads(move || { - // TODO: Bad clone - let changes = other.0.blocking_read().deref().changes().clone(); + // TODO: bad clone + let other = other.0.blocking_read().deref().clone(); pyo3_async_runtimes::tokio::get_runtime().block_on(async move { self.0 .write() .await - .merge(changes) + .merge(other) .await .map_err(PyIcechunkStoreError::SessionError)?; Ok(()) diff --git a/icechunk-python/tests/test_manifest_splitting.py b/icechunk-python/tests/test_manifest_splitting.py index b29fb7b18..479cecd33 100644 --- a/icechunk-python/tests/test_manifest_splitting.py +++ b/icechunk-python/tests/test_manifest_splitting.py @@ -102,7 +102,7 @@ def test_manifest_splitting_appends(): nchunks += math.prod(NEWSHAPE) * 2 # the lon size goes from 17 -> 19 so one extra manifest, # compared to previous writes - nmanifests += 7 * 2 + nmanifests += 2 * 2 assert len(os.listdir(f"{tmpdir}/chunks")) == nchunks assert len(os.listdir(f"{tmpdir}/manifests")) == nmanifests diff --git a/icechunk-python/tests/test_stateful_repo_ops.py b/icechunk-python/tests/test_stateful_repo_ops.py index dfdc6a9f9..47dbd4809 100644 --- a/icechunk-python/tests/test_stateful_repo_ops.py +++ b/icechunk-python/tests/test_stateful_repo_ops.py @@ -570,7 +570,7 @@ def check_list_prefix_from_root(self) -> None: # need to load to dict to compare since ordering of entries might differ expected = json.loads(self.model[k].to_bytes()) value = self.sync_store.get(k, default_buffer_prototype()) - assert value is not None + assert value is not None, k actual = json.loads(value.to_bytes()) actual_fv = actual.pop("fill_value") expected_fv = expected.pop("fill_value") diff --git a/icechunk-python/tests/test_zarr/test_stateful.py b/icechunk-python/tests/test_zarr/test_stateful.py index a3477848f..3ad392b69 100644 --- a/icechunk-python/tests/test_zarr/test_stateful.py +++ b/icechunk-python/tests/test_zarr/test_stateful.py @@ -1,6 +1,8 @@ import json +from collections.abc import Iterable from typing import Any +import hypothesis.extra.numpy as npst import hypothesis.strategies as st import numpy as np import pytest @@ -13,18 +15,74 @@ run_state_machine_as_test, ) +import icechunk as ic import zarr -from icechunk import Repository, in_memory_storage +from icechunk import Repository, Storage, in_memory_storage from zarr.core.buffer import default_buffer_prototype from zarr.testing.stateful import ZarrHierarchyStateMachine from zarr.testing.strategies import ( + basic_indices, node_names, np_array_and_chunks, numpy_arrays, + orthogonal_indices, ) PROTOTYPE = default_buffer_prototype() +# pytestmark = [ +# pytest.mark.filterwarnings( +# "ignore::zarr.core.dtype.common.UnstableSpecificationWarning" +# ), +# ] + + +import functools + + +def with_frequency(frequency): + """ + Decorator to control how frequently a rule runs in Hypothesis stateful tests. + + Args: + frequency: Float between 0 and 1, where 1.0 means always run, + 0.1 means run ~10% of the time, etc. + + Usage: + @rule() + @with_frequency(0.1) # Run ~10% of the time + def rare_operation(self): + pass + """ + + def decorator(func): + # Create a counter attribute name specific to this function + counter_attr = f"__{func.__name__}_counter" + + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + return func(self, *args, **kwargs) + + # Add precondition that checks frequency + @precondition + def frequency_check(self): + # Initialize counter if it doesn't exist + if not hasattr(self, counter_attr): + setattr(self, counter_attr, 0) + + # Increment counter + current_count = getattr(self, counter_attr) + 1 + setattr(self, counter_attr, current_count) + + # Check if we should run based on frequency + # This gives roughly the right frequency over many calls + return (current_count * frequency) % 1.0 >= (1.0 - frequency) + + # Apply the precondition to the wrapped function + return frequency_check(wrapper) + + return decorator + @st.composite def chunk_paths( @@ -39,14 +97,66 @@ def chunk_paths( return "/".join(map(str, blockidx[subset_slicer])) +@st.composite +def splitting_configs( + draw: st.DrawFn, *, arrays: Iterable[zarr.Array] +) -> ic.ManifestSplittingConfig: + config_dict = {} + for array in arrays: + if draw(st.booleans()): + array_condition = ic.ManifestSplitCondition.name_matches( + array.path.split("/")[-1] + ) + else: + array_condition = ic.ManifestSplitCondition.path_matches(array.path) + dimnames = array.metadata.dimension_names or (None,) * array.ndim + dimsize_axis_names = draw( + st.lists( + st.sampled_from( + tuple(zip(array.shape, range(array.ndim), dimnames, strict=False)) + ), + min_size=1, + unique=True, + ) + ) + for size, axis, dimname in dimsize_axis_names: + if dimname is None or draw(st.booleans()): + key = ic.ManifestSplitDimCondition.Axis(axis) + else: + key = ic.ManifestSplitDimCondition.DimensionName(dimname) + config_dict[array_condition] = { + key: draw(st.integers(min_value=1, max_value=size + 10)) + } + return ic.ManifestSplittingConfig.from_dict(config_dict) + + # TODO: more before/after commit invariants? # TODO: add "/" to self.all_groups, deleting "/" seems to be problematic class ModifiedZarrHierarchyStateMachine(ZarrHierarchyStateMachine): - def __init__(self, repo: Repository) -> None: - self.repo = repo - store = repo.writable_session("main").store + def __init__(self, storage: Storage) -> None: + self.storage = storage + self.repo = Repository.create(self.storage) + store = self.repo.writable_session("main").store super().__init__(store) + @precondition( + lambda self: not self.store.session.has_uncommitted_changes + and bool(self.all_arrays) + ) + @rule(data=st.data()) + def reopen_with_config(self, data): + array_paths = data.draw( + st.lists(st.sampled_from(sorted(self.all_arrays)), max_size=3, unique=True) + ) + arrays = tuple(zarr.open_array(self.model, path=path) for path in array_paths) + sconfig = data.draw(splitting_configs(arrays=arrays)) + config = ic.RepositoryConfig( + inline_chunk_threshold_bytes=0, manifest=ic.ManifestConfig(splitting=sconfig) + ) + note(f"reopening with splitting config {sconfig=!r}") + self.repo = Repository.open(self.storage, config=config) + self.store = self.repo.writable_session("main").store + @precondition(lambda self: self.store.session.has_uncommitted_changes) @rule(data=st.data()) def commit_with_check(self, data) -> None: @@ -108,8 +218,49 @@ def add_array( assume(array.size > 0) super().add_array(data, name, array_and_chunks) + @precondition(lambda self: bool(self.all_groups)) + @rule(data=st.data()) + def check_list_dir(self, data: st.DataObject) -> None: + path = self.draw_directory(data) + note(f"list_dir for {path=!r}") + model_ls = sorted(self._sync_iter(self.model.list_dir(path))) + store_ls = sorted(self._sync_iter(self.store.list_dir(path))) + if model_ls != store_ls and set(model_ls).symmetric_difference(set(store_ls)) != { + "c" + }: + # Consider .list_dir("path/to/array") for an array with a single chunk. + # The MemoryStore model will return `"c", "zarr.json"` only if the chunk exists + # If that chunk was deleted, then `"c"` is not returned. + # LocalStore will not have this behaviour :/ + # In Icechunk, we always return the `c` so ignore this inconsistency. + assert model_ls == store_ls, (model_ls, store_ls) + ##### TODO: port everything below to zarr + @precondition(lambda self: bool(self.all_arrays)) + @rule(data=st.data()) + def check_array(self, data: st.DataObject) -> None: + path = data.draw(st.sampled_from(sorted(self.all_arrays))) + actual = zarr.open_array(self.store, path=path)[:] + expected = zarr.open_array(self.model, path=path)[:] + np.testing.assert_equal(actual, expected) + + @precondition(lambda self: bool(self.all_arrays)) + @rule(data=st.data()) + def overwrite_array_orthogonal_indexing(self, data: st.DataObject) -> None: + array = data.draw(st.sampled_from(sorted(self.all_arrays))) + model_array = zarr.open_array(path=array, store=self.model) + store_array = zarr.open_array(path=array, store=self.store) + indexer, _ = data.draw(orthogonal_indices(shape=model_array.shape)) + note(f"overwriting array orthogonal {indexer=}") + new_data = data.draw( + npst.arrays(shape=model_array.oindex[indexer].shape, dtype=model_array.dtype) + ) + model_array.oindex[indexer] = new_data + store_array.oindex[indexer] = new_data + + ##### TODO: delete after next Zarr release (Jun 18, 2025) @rule() + @with_frequency(0.25) def clear(self) -> None: note("clearing") import zarr @@ -152,23 +303,6 @@ def draw_directory(self, data) -> str: path = array_or_group return path - @precondition(lambda self: bool(self.all_groups)) - @rule(data=st.data()) - def check_list_dir(self, data) -> None: - path = self.draw_directory(data) - note(f"list_dir for {path=!r}") - model_ls = sorted(self._sync_iter(self.model.list_dir(path))) - store_ls = sorted(self._sync_iter(self.store.list_dir(path))) - if model_ls != store_ls and set(model_ls).symmetric_difference(set(store_ls)) != { - "c" - }: - # Consider .list_dir("path/to/array") for an array with a single chunk. - # The MemoryStore model will return `"c", "zarr.json"` only if the chunk exists - # If that chunk was deleted, then `"c"` is not returned. - # LocalStore will not have this behaviour :/ - # In Icechunk, we always return the `c` so ignore this inconsistency. - assert model_ls == store_ls, (model_ls, store_ls) - @precondition(lambda self: bool(self.all_arrays)) @rule(data=st.data()) def delete_chunk(self, data) -> None: @@ -182,6 +316,32 @@ def delete_chunk(self, data) -> None: self._sync(self.model.delete(path)) self._sync(self.store.delete(path)) + @precondition(lambda self: bool(self.all_arrays)) + @rule(data=st.data()) + def overwrite_array_basic_indexing(self, data) -> None: + array = data.draw(st.sampled_from(sorted(self.all_arrays))) + model_array = zarr.open_array(path=array, store=self.model) + store_array = zarr.open_array(path=array, store=self.store) + slicer = data.draw(basic_indices(shape=model_array.shape)) + note(f"overwriting array basic {slicer=}") + new_data = data.draw( + npst.arrays(shape=model_array[slicer].shape, dtype=model_array.dtype) + ) + model_array[slicer] = new_data + store_array[slicer] = new_data + + @precondition(lambda self: bool(self.all_arrays)) + @rule(data=st.data()) + def resize_array(self, data) -> None: + array = data.draw(st.sampled_from(sorted(self.all_arrays))) + model_array = zarr.open_array(path=array, store=self.model) + store_array = zarr.open_array(path=array, store=self.store) + ndim = model_array.ndim + new_shape = data.draw(npst.array_shapes(max_dims=ndim, min_dims=ndim, min_side=1)) + note(f"resizing array from {model_array.shape} to {new_shape}") + model_array.resize(new_shape) + store_array.resize(new_shape) + @precondition(lambda self: bool(self.all_arrays) or bool(self.all_groups)) @rule(data=st.data()) def delete_dir(self, data) -> None: @@ -219,10 +379,8 @@ def check_list_prefix_from_root(self) -> None: def test_zarr_hierarchy() -> None: - repo = Repository.create(in_memory_storage()) - def mk_test_instance_sync() -> ModifiedZarrHierarchyStateMachine: - return ModifiedZarrHierarchyStateMachine(repo) + return ModifiedZarrHierarchyStateMachine(in_memory_storage()) run_state_machine_as_test( mk_test_instance_sync, settings=Settings(report_multiple_bugs=False) diff --git a/icechunk/proptest-regressions/session.txt b/icechunk/proptest-regressions/session.txt index ce82f8b5d..f938988b2 100644 --- a/icechunk/proptest-regressions/session.txt +++ b/icechunk/proptest-regressions/session.txt @@ -8,3 +8,5 @@ cc da94eced751096504c0803bed6ad66cde255567c3cf6c0b316cce66c22e3142a # shrinks to cc b0a66d6fdd012c51dd804b9f6c58e4403b2dd41f15c3856adc8d90e3d42311fc # shrinks to (initial_state, transitions, seen_counter) = (RepositoryModel { arrays: {}, groups: [] }, [AddArray(Path(Utf8PathBuf { _encoding: "unix", inner: "/" }), ZarrArrayMetadata { shape: [1], data_type: Bool, chunk_shape: ChunkShape([1]), chunk_key_encoding: Slash, fill_value: Bool(false), codecs: [Codec { name: "mycodec", configuration: None }], storage_transformers: Some([StorageTransformer { name: "mytransformer", configuration: None }]), dimension_names: None })], None) cc 4f7049d25e420db7b98fcadb0fe6bc7576d3bbc6eb3b971074e6f7257282d040 # shrinks to input = _TestAddDeleteArrayArgs { path: Path(Utf8PathBuf { _encoding: "unix", inner: "/" }), metadata: ZarrArrayMetadata { shape: [1], data_type: Bool, chunk_shape: ChunkShape([1]), chunk_key_encoding: Slash, fill_value: Bool(false), codecs: [Codec { name: "mycodec", configuration: None }], storage_transformers: Some([StorageTransformer { name: "mytransformer", configuration: None }]), dimension_names: None }, session: Session { config: RepositoryConfig { inline_chunk_threshold_bytes: 512, unsafe_overwrite_refs: false, get_partial_values_concurrency: 10, compression: CompressionConfig { algorithm: Zstd, level: 1 }, caching: CachingConfig { snapshots_cache_size: 2, manifests_cache_size: 2, transactions_cache_size: 0, attributes_cache_size: 2, chunks_cache_size: 0 }, storage: None, virtual_chunk_containers: {"file": VirtualChunkContainer { name: "file", url_prefix: "file", store: LocalFileSystem("") }, "tigris": VirtualChunkContainer { name: "tigris", url_prefix: "tigris", store: Tigris }, "s3": VirtualChunkContainer { name: "s3", url_prefix: "s3", store: S3(S3Options { region: None, endpoint_url: None, anonymous: false, allow_http: false }) }, "gcs": VirtualChunkContainer { name: "gcs", url_prefix: "gcs", store: Gcs({}) }, "az": VirtualChunkContainer { name: "az", url_prefix: "az", store: Azure }} }, storage_settings: Settings { concurrency: ConcurrencySettings { max_concurrent_requests_for_object: 5, min_concurrent_request_size: 1 } }, storage: ObjectStorage { config: ObjectStorageConfig { url: "memory:/", prefix: "", options: [] }, store: InMemory { storage: RwLock { data: Storage { next_etag: 3, map: {Path { raw: "config.yaml" }: Entry { data: b"inline_chunk_threshold_bytes: 512\nunsafe_overwrite_refs: false\nget_partial_values_concurrency: 10\ncompression:\n algorithm: Zstd\n level: 1\ncaching:\n snapshots_cache_size: 2\n manifests_cache_size: 2\n transactions_cache_size: 0\n attributes_cache_size: 2\n chunks_cache_size: 0\nstorage: null\nvirtual_chunk_containers:\n file:\n name: file\n url_prefix: file\n store: !LocalFileSystem ''\n tigris:\n name: tigris\n url_prefix: tigris\n store: !Tigris {}\n s3:\n name: s3\n url_prefix: s3\n store: !S3\n region: null\n endpoint_url: null\n anonymous: false\n allow_http: false\n gcs:\n name: gcs\n url_prefix: gcs\n store: !Gcs {}\n az:\n name: az\n url_prefix: az\n store: !Azure {}\n", last_modified: 2025-01-08T15:41:23.520617823Z, attributes: Attributes({ContentType: AttributeValue("application/yaml")}), e_tag: 2 }, Path { raw: "refs/branch.main/ZZZZZZZZ.json" }: Entry { data: b"{\"snapshot\":\"6EPXF9PX9JPEY8F4Q9KG\"}", last_modified: 2025-01-08T15:41:23.520470841Z, attributes: Attributes({}), e_tag: 1 }, Path { raw: "snapshots/6EPXF9PX9JPEY8F4Q9KG" }: Entry { data: b"ICE\xf0\x9f\xa7\x8aCHUNKic- \x01\x02\x01(\xb5/\xfd\0H\x1d\x03\0\xb4\x05\x9b\0\x80\x90\x90\0\0\x90\x93\xb4QJ6QBBJ59Y0VSRKHH1GG\xbe2025-01-08T15:41:23.520187204Z\xb6Repository initialized9404Z\x80\x80\x01\0T\xb6U\x14", last_modified: 2025-01-08T15:41:23.520418067Z, attributes: Attributes({Metadata("ic-file-type"): AttributeValue("manifest"), Metadata("ic-spec-ver"): AttributeValue("1"), Metadata("ic-comp-alg"): AttributeValue("zstd"), Metadata("ic-"): AttributeValue("ic-client")}), e_tag: 0 }}, uploads: {} } } } }, asset_resolver: AssetResolver { storage: ObjectStorage { config: ObjectStorageConfig { url: "memory:/", prefix: "", options: [] }, store: InMemory { storage: RwLock { data: Storage { next_etag: 3, map: {Path { raw: "config.yaml" }: Entry { data: b"inline_chunk_threshold_bytes: 512\nunsafe_overwrite_refs: false\nget_partial_values_concurrency: 10\ncompression:\n algorithm: Zstd\n level: 1\ncaching:\n snapshots_cache_size: 2\n manifests_cache_size: 2\n transactions_cache_size: 0\n attributes_cache_size: 2\n chunks_cache_size: 0\nstorage: null\nvirtual_chunk_containers:\n file:\n name: file\n url_prefix: file\n store: !LocalFileSystem ''\n tigris:\n name: tigris\n url_prefix: tigris\n store: !Tigris {}\n s3:\n name: s3\n url_prefix: s3\n store: !S3\n region: null\n endpoint_url: null\n anonymous: false\n allow_http: false\n gcs:\n name: gcs\n url_prefix: gcs\n store: !Gcs {}\n az:\n name: az\n url_prefix: az\n store: !Azure {}\n", last_modified: 2025-01-08T15:41:23.520617823Z, attributes: Attributes({ContentType: AttributeValue("application/yaml")}), e_tag: 2 }, Path { raw: "refs/branch.main/ZZZZZZZZ.json" }: Entry { data: b"{\"snapshot\":\"6EPXF9PX9JPEY8F4Q9KG\"}", last_modified: 2025-01-08T15:41:23.520470841Z, attributes: Attributes({}), e_tag: 1 }, Path { raw: "snapshots/6EPXF9PX9JPEY8F4Q9KG" }: Entry { data: b"ICE\xf0\x9f\xa7\x8aCHUNKic- \x01\x02\x01(\xb5/\xfd\0H\x1d\x03\0\xb4\x05\x9b\0\x80\x90\x90\0\0\x90\x93\xb4QJ6QBBJ59Y0VSRKHH1GG\xbe2025-01-08T15:41:23.520187204Z\xb6Repository initialized9404Z\x80\x80\x01\0T\xb6U\x14", last_modified: 2025-01-08T15:41:23.520418067Z, attributes: Attributes({Metadata("ic-file-type"): AttributeValue("manifest"), Metadata("ic-spec-ver"): AttributeValue("1"), Metadata("ic-comp-alg"): AttributeValue("zstd"), Metadata("ic-"): AttributeValue("ic-client")}), e_tag: 0 }}, uploads: {} } } } }, storage_settings: Settings { concurrency: ConcurrencySettings { max_concurrent_requests_for_object: 5, min_concurrent_request_size: 1 } }, num_snapshots: 2, num_manifests: 2, num_transactions: 0, num_attributes: 2, num_chunks: 0, snapshot_cache: Cache { .. }, manifest_cache: Cache { .. }, transactions_cache: Cache { .. }, attributes_cache: Cache { .. }, chunk_cache: Cache { .. } }, virtual_resolver: VirtualChunkResolver { containers: [VirtualChunkContainer { name: "tigris", url_prefix: "tigris", store: Tigris }, VirtualChunkContainer { name: "file", url_prefix: "file", store: LocalFileSystem("") }, VirtualChunkContainer { name: "gcs", url_prefix: "gcs", store: Gcs({}) }, VirtualChunkContainer { name: "s3", url_prefix: "s3", store: S3(S3Options { region: None, endpoint_url: None, anonymous: false, allow_http: false }) }, VirtualChunkContainer { name: "az", url_prefix: "az", store: Azure }], credentials: {}, fetchers: RwLock { data: {} } }, branch_name: Some("main"), snapshot_id: 33add7a6dd4cacef21e4ba67, change_set: ChangeSet { new_groups: {}, new_arrays: {}, updated_arrays: {}, updated_attributes: {}, set_chunks: {}, deleted_groups: {}, deleted_arrays: {} } } } cc a4bf17e17086f3e723abccc1307f0b489a5e646e899c08e3b483d3befe15cb26 # shrinks to input = _TestAddArrayGroupClashArgs { path: Path(Utf8PathBuf { _encoding: "unix", inner: "/" }), metadata: ZarrArrayMetadata { shape: [1], data_type: Bool, chunk_shape: ChunkShape([1]), chunk_key_encoding: Slash, fill_value: Bool(false), codecs: [Codec { name: "mycodec", configuration: None }], storage_transformers: Some([StorageTransformer { name: "mytransformer", configuration: None }]), dimension_names: None }, session: Session { config: RepositoryConfig { inline_chunk_threshold_bytes: 512, unsafe_overwrite_refs: false, get_partial_values_concurrency: 10, compression: CompressionConfig { algorithm: Zstd, level: 1 }, caching: CachingConfig { snapshots_cache_size: 2, manifests_cache_size: 2, transactions_cache_size: 0, attributes_cache_size: 2, chunks_cache_size: 0 }, storage: None, virtual_chunk_containers: {"tigris": VirtualChunkContainer { name: "tigris", url_prefix: "tigris", store: Tigris }, "file": VirtualChunkContainer { name: "file", url_prefix: "file", store: LocalFileSystem("") }, "az": VirtualChunkContainer { name: "az", url_prefix: "az", store: Azure }, "s3": VirtualChunkContainer { name: "s3", url_prefix: "s3", store: S3(S3Options { region: None, endpoint_url: None, anonymous: false, allow_http: false }) }, "gcs": VirtualChunkContainer { name: "gcs", url_prefix: "gcs", store: Gcs({}) }} }, storage_settings: Settings { concurrency: ConcurrencySettings { max_concurrent_requests_for_object: 5, min_concurrent_request_size: 1 } }, storage: ObjectStorage { config: ObjectStorageConfig { url: "memory:/", prefix: "", options: [] }, store: InMemory { storage: RwLock { data: Storage { next_etag: 3, map: {Path { raw: "config.yaml" }: Entry { data: b"inline_chunk_threshold_bytes: 512\nunsafe_overwrite_refs: false\nget_partial_values_concurrency: 10\ncompression:\n algorithm: Zstd\n level: 1\ncaching:\n snapshots_cache_size: 2\n manifests_cache_size: 2\n transactions_cache_size: 0\n attributes_cache_size: 2\n chunks_cache_size: 0\nstorage: null\nvirtual_chunk_containers:\n tigris:\n name: tigris\n url_prefix: tigris\n store: !Tigris {}\n file:\n name: file\n url_prefix: file\n store: !LocalFileSystem ''\n az:\n name: az\n url_prefix: az\n store: !Azure {}\n s3:\n name: s3\n url_prefix: s3\n store: !S3\n region: null\n endpoint_url: null\n anonymous: false\n allow_http: false\n gcs:\n name: gcs\n url_prefix: gcs\n store: !Gcs {}\n", last_modified: 2025-01-08T15:41:23.635133963Z, attributes: Attributes({ContentType: AttributeValue("application/yaml")}), e_tag: 2 }, Path { raw: "refs/branch.main/ZZZZZZZZ.json" }: Entry { data: b"{\"snapshot\":\"2T8MSGGT2FVZBS6094K0\"}", last_modified: 2025-01-08T15:41:23.635089347Z, attributes: Attributes({}), e_tag: 1 }, Path { raw: "snapshots/2T8MSGGT2FVZBS6094K0" }: Entry { data: b"ICE\xf0\x9f\xa7\x8aCHUNKic- \x01\x02\x01(\xb5/\xfd\0H\x1d\x03\0\xb4\x05\x9b\0\x80\x90\x90\0\0\x90\x93\xb47BZ1BYSVSA8MSS4WB5JG\xbe2025-01-08T15:41:23.634873394Z\xb6Repository initialized5576Z\x80\x80\x01\0T\xb6U\x14", last_modified: 2025-01-08T15:41:23.635043198Z, attributes: Attributes({Metadata("ic-comp-alg"): AttributeValue("zstd"), Metadata("ic-file-type"): AttributeValue("manifest"), Metadata("ic-spec-ver"): AttributeValue("1"), Metadata("ic-"): AttributeValue("ic-client")}), e_tag: 0 }}, uploads: {} } } } }, asset_resolver: AssetResolver { storage: ObjectStorage { config: ObjectStorageConfig { url: "memory:/", prefix: "", options: [] }, store: InMemory { storage: RwLock { data: Storage { next_etag: 3, map: {Path { raw: "config.yaml" }: Entry { data: b"inline_chunk_threshold_bytes: 512\nunsafe_overwrite_refs: false\nget_partial_values_concurrency: 10\ncompression:\n algorithm: Zstd\n level: 1\ncaching:\n snapshots_cache_size: 2\n manifests_cache_size: 2\n transactions_cache_size: 0\n attributes_cache_size: 2\n chunks_cache_size: 0\nstorage: null\nvirtual_chunk_containers:\n tigris:\n name: tigris\n url_prefix: tigris\n store: !Tigris {}\n file:\n name: file\n url_prefix: file\n store: !LocalFileSystem ''\n az:\n name: az\n url_prefix: az\n store: !Azure {}\n s3:\n name: s3\n url_prefix: s3\n store: !S3\n region: null\n endpoint_url: null\n anonymous: false\n allow_http: false\n gcs:\n name: gcs\n url_prefix: gcs\n store: !Gcs {}\n", last_modified: 2025-01-08T15:41:23.635133963Z, attributes: Attributes({ContentType: AttributeValue("application/yaml")}), e_tag: 2 }, Path { raw: "refs/branch.main/ZZZZZZZZ.json" }: Entry { data: b"{\"snapshot\":\"2T8MSGGT2FVZBS6094K0\"}", last_modified: 2025-01-08T15:41:23.635089347Z, attributes: Attributes({}), e_tag: 1 }, Path { raw: "snapshots/2T8MSGGT2FVZBS6094K0" }: Entry { data: b"ICE\xf0\x9f\xa7\x8aCHUNKic- \x01\x02\x01(\xb5/\xfd\0H\x1d\x03\0\xb4\x05\x9b\0\x80\x90\x90\0\0\x90\x93\xb47BZ1BYSVSA8MSS4WB5JG\xbe2025-01-08T15:41:23.634873394Z\xb6Repository initialized5576Z\x80\x80\x01\0T\xb6U\x14", last_modified: 2025-01-08T15:41:23.635043198Z, attributes: Attributes({Metadata("ic-comp-alg"): AttributeValue("zstd"), Metadata("ic-file-type"): AttributeValue("manifest"), Metadata("ic-spec-ver"): AttributeValue("1"), Metadata("ic-"): AttributeValue("ic-client")}), e_tag: 0 }}, uploads: {} } } } }, storage_settings: Settings { concurrency: ConcurrencySettings { max_concurrent_requests_for_object: 5, min_concurrent_request_size: 1 } }, num_snapshots: 2, num_manifests: 2, num_transactions: 0, num_attributes: 2, num_chunks: 0, snapshot_cache: Cache { .. }, manifest_cache: Cache { .. }, transactions_cache: Cache { .. }, attributes_cache: Cache { .. }, chunk_cache: Cache { .. } }, virtual_resolver: VirtualChunkResolver { containers: [VirtualChunkContainer { name: "tigris", url_prefix: "tigris", store: Tigris }, VirtualChunkContainer { name: "file", url_prefix: "file", store: LocalFileSystem("") }, VirtualChunkContainer { name: "gcs", url_prefix: "gcs", store: Gcs({}) }, VirtualChunkContainer { name: "az", url_prefix: "az", store: Azure }, VirtualChunkContainer { name: "s3", url_prefix: "s3", store: S3(S3Options { region: None, endpoint_url: None, anonymous: false, allow_http: false }) }], credentials: {}, fetchers: RwLock { data: {} } }, branch_name: Some("main"), snapshot_id: 16914cc21a13f7f5e4c04926, change_set: ChangeSet { new_groups: {}, new_arrays: {}, updated_arrays: {}, updated_attributes: {}, set_chunks: {}, deleted_groups: {}, deleted_arrays: {} } } } +cc 60be8007bfbb770a1a404577dde928290d9018a6c7ca8240e9f3e73922be174c # shrinks to input = _TestPropertyExtentsSetOpsArgs { e1: ManifestExtents([0..0, 0..0, 0..0, 0..0]), e2: ManifestExtents([0..0, 0..0, 0..0, 0..0]) } +cc 2ae68e9977c83f4cb719f54d9b37e9e54a660c4b5ec826e1ac34696d4ab0bf4e # shrinks to input = _TestPropertyExtentsSetOpsSameArgs { e: ManifestExtents([0..0, 0..0, 0..0, 0..0]) } diff --git a/icechunk/src/change_set.rs b/icechunk/src/change_set.rs index 8a5f880be..444c031b9 100644 --- a/icechunk/src/change_set.rs +++ b/icechunk/src/change_set.rs @@ -1,7 +1,6 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, iter, - mem::take, }; use bytes::Bytes; @@ -11,10 +10,10 @@ use serde::{Deserialize, Serialize}; use crate::{ format::{ ChunkIndices, NodeId, Path, - manifest::{ChunkInfo, ChunkPayload}, + manifest::{ChunkInfo, ChunkPayload, ManifestExtents, ManifestSplits, Overlap}, snapshot::{ArrayShape, DimensionName, NodeData, NodeSnapshot}, }, - session::SessionResult, + session::{SessionResult, find_coord}, }; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -24,15 +23,18 @@ pub struct ArrayData { pub user_data: Bytes, } +type SplitManifest = BTreeMap>; #[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)] pub struct ChangeSet { new_groups: HashMap, new_arrays: HashMap, updated_arrays: HashMap, updated_groups: HashMap, - // It's important we keep these sorted, we use this fact in TransactionLog creation - // TODO: could track ManifestExtents - set_chunks: BTreeMap>>, + set_chunks: BTreeMap>, + // This map keeps track of any chunk deletes that are + // outside the domain of the current array shape. This is needed to handle + // the very unlikely case of multiple resizes in the same session. + deleted_chunks_outside_bounds: BTreeMap>, deleted_groups: HashSet<(Path, NodeId)>, deleted_arrays: HashSet<(Path, NodeId)>, } @@ -58,11 +60,16 @@ impl ChangeSet { self.deleted_arrays.contains(path_and_id) } - pub fn chunk_changes( + pub fn changed_chunks( &self, - ) -> impl Iterator>)> - { - self.set_chunks.iter() + ) -> impl Iterator)> { + self.set_chunks.iter().map(|(node_id, split_map)| { + (node_id, split_map.values().flat_map(|x| x.keys())) + }) + } + + pub fn is_updated_array(&self, node: &NodeId) -> bool { + self.updated_arrays.contains_key(node) } pub fn has_chunk_changes(&self, node: &NodeId) -> bool { @@ -70,7 +77,7 @@ impl ChangeSet { } pub fn arrays_with_chunk_changes(&self) -> impl Iterator { - self.chunk_changes().map(|(node, _)| node) + self.set_chunks.keys() } pub fn is_empty(&self) -> bool { @@ -104,7 +111,13 @@ impl ChangeSet { self.new_arrays.insert(path, (node_id, array_data)); } - pub fn update_array(&mut self, node_id: &NodeId, path: &Path, array_data: ArrayData) { + pub fn update_array( + &mut self, + node_id: &NodeId, + path: &Path, + array_data: ArrayData, + new_splits: &ManifestSplits, + ) { match self.new_arrays.get(path) { Some((id, _)) => { debug_assert!(!self.updated_arrays.contains_key(id)); @@ -114,6 +127,65 @@ impl ChangeSet { self.updated_arrays.insert(node_id.clone(), array_data); } } + + // update existing splits + let mut to_remove = HashSet::::new(); + if let Some(manifests) = self.set_chunks.remove(node_id) { + let mut new_deleted_chunks = HashSet::::new(); + let mut new_manifests = + HashMap::::with_capacity( + new_splits.len(), + ); + for (old_extents, mut chunks) in manifests.into_iter() { + for new_extents in new_splits.iter() { + if old_extents.overlap_with(new_extents) == Overlap::None { + continue; + } + + // TODO: replace with `BTreeMap.drain_filter` after it is stable. + let mut extracted = + BTreeMap::>::new(); + chunks.retain(|coord, payload| { + let cond = new_extents.contains(coord.0.as_slice()); + if cond { + extracted.insert(coord.clone(), payload.clone()); + } + !cond + }); + new_manifests + .entry(new_extents.clone()) + .or_default() + .extend(extracted); + } + new_deleted_chunks.extend( + chunks.into_iter().filter_map(|(coord, payload)| { + payload.is_none().then_some(coord) + }), + ); + } + + // bring back any previously tracked deletes + if let Some(deletes) = self.deleted_chunks_outside_bounds.get_mut(node_id) { + for coord in deletes.iter() { + if let Some(extents) = new_splits.find(coord) { + new_manifests + .entry(extents.clone()) + .or_default() + .insert(coord.clone(), None); + to_remove.insert(coord.clone()); + }; + } + deletes.retain(|item| !to_remove.contains(item)); + to_remove.drain(); + }; + self.set_chunks.insert(node_id.clone(), new_manifests); + + // keep track of any deletes not inserted in to set_chunks + self.deleted_chunks_outside_bounds + .entry(node_id.clone()) + .or_default() + .extend(new_deleted_chunks); + } } pub fn update_group(&mut self, node_id: &NodeId, path: &Path, definition: Bytes) { @@ -166,15 +238,23 @@ impl ChangeSet { node_id: NodeId, coord: ChunkIndices, data: Option, + splits: &ManifestSplits, ) { + #[allow(clippy::expect_used)] + let extent = splits.find(&coord).expect("logic bug. Trying to set chunk ref but can't find the appropriate split manifest."); // this implementation makes delete idempotent // it allows deleting a deleted chunk by repeatedly setting None. self.set_chunks .entry(node_id) - .and_modify(|h| { - h.insert(coord.clone(), data.clone()); + .or_insert_with(|| { + HashMap::< + ManifestExtents, + BTreeMap>, + >::with_capacity(splits.len()) }) - .or_insert(BTreeMap::from([(coord, data)])); + .entry(extent.clone()) + .or_default() + .insert(coord, data); } pub fn get_chunk_ref( @@ -182,7 +262,11 @@ impl ChangeSet { node_id: &NodeId, coords: &ChunkIndices, ) -> Option<&Option> { - self.set_chunks.get(node_id).and_then(|h| h.get(coords)) + self.set_chunks.get(node_id).and_then(|node_chunks| { + find_coord(node_chunks.keys(), coords).and_then(|(_, extent)| { + node_chunks.get(extent).and_then(|s| s.get(coords)) + }) + }) } /// Drop the updated chunk references for the node. @@ -193,7 +277,19 @@ impl ChangeSet { predicate: impl Fn(&ChunkIndices) -> bool, ) { if let Some(changes) = self.set_chunks.get_mut(node_id) { - changes.retain(|coord, _| !predicate(coord)); + for split in changes.values_mut() { + split.retain(|coord, _| !predicate(coord)); + } + } + } + + pub fn deleted_chunks_iterator( + &self, + node_id: &NodeId, + ) -> impl Iterator { + match self.deleted_chunks_outside_bounds.get(node_id) { + Some(deletes) => Either::Right(deletes.iter()), + None => Either::Left(iter::empty()), } } @@ -201,13 +297,18 @@ impl ChangeSet { &self, node_id: &NodeId, node_path: &Path, + extent: ManifestExtents, ) -> impl Iterator)> + use<'_> { if self.is_deleted(node_path, node_id) { return Either::Left(iter::empty()); } match self.set_chunks.get(node_id) { None => Either::Left(iter::empty()), - Some(h) => Either::Right(h.iter()), + Some(h) => Either::Right( + h.iter() + .filter(move |(manifest_extent, _)| extent.matches(manifest_extent)) + .flat_map(|(_, manifest)| manifest.iter()), + ), } } @@ -215,7 +316,8 @@ impl ChangeSet { &self, ) -> impl Iterator + use<'_> { self.new_arrays.iter().flat_map(|(path, (node_id, _))| { - self.new_array_chunk_iterator(node_id, path).map(|ci| (path.clone(), ci)) + self.new_array_chunk_iterator(node_id, path, ManifestExtents::ALL) + .map(|ci| (path.clone(), ci)) }) } @@ -223,8 +325,9 @@ impl ChangeSet { &'a self, node_id: &'a NodeId, node_path: &Path, + extent: ManifestExtents, ) -> impl Iterator + use<'a> { - self.array_chunks_iterator(node_id, node_path).filter_map( + self.array_chunks_iterator(node_id, node_path, extent).filter_map( move |(coords, payload)| { payload.as_ref().map(|p| ChunkInfo { node: node_id.clone(), @@ -235,6 +338,28 @@ impl ChangeSet { ) } + pub fn modified_manifest_extents_iterator( + &self, + node_id: &NodeId, + node_path: &Path, + ) -> impl Iterator + use<'_> { + if self.is_deleted(node_path, node_id) { + return Either::Left(iter::empty()); + } + match self.set_chunks.get(node_id) { + None => Either::Left(iter::empty()), + Some(h) => Either::Right(h.keys()), + } + } + + pub fn array_manifest( + &self, + node_id: &NodeId, + extent: &ManifestExtents, + ) -> Option<&SplitManifest> { + self.set_chunks.get(node_id).and_then(|x| x.get(extent)) + } + pub fn new_nodes(&self) -> impl Iterator { self.new_groups().chain(self.new_arrays()) } @@ -247,19 +372,6 @@ impl ChangeSet { self.new_arrays.iter().map(|(path, (node_id, _))| (path, node_id)) } - pub fn take_chunks( - &mut self, - ) -> BTreeMap>> { - take(&mut self.set_chunks) - } - - pub fn set_chunks( - &mut self, - chunks: BTreeMap>>, - ) { - self.set_chunks = chunks - } - /// Merge this ChangeSet with `other`. /// /// Results of the merge are applied to `self`. Changes present in `other` take precedence over @@ -274,18 +386,19 @@ impl ChangeSet { self.updated_arrays.extend(other.updated_arrays); self.deleted_groups.extend(other.deleted_groups); self.deleted_arrays.extend(other.deleted_arrays); - - for (node, other_chunks) in other.set_chunks.into_iter() { - match self.set_chunks.remove(&node) { - Some(mut old_value) => { - old_value.extend(other_chunks); - self.set_chunks.insert(node, old_value); - } - None => { - self.set_chunks.insert(node, other_chunks); - } - } - } + // FIXME: do we even test this? + self.deleted_chunks_outside_bounds.extend(other.deleted_chunks_outside_bounds); + + other.set_chunks.into_iter().for_each(|(node, other_splits)| { + let manifests = self.set_chunks.entry(node).or_insert_with(|| { + HashMap::::with_capacity( + other_splits.len(), + ) + }); + other_splits.into_iter().for_each(|(extent, their_manifest)| { + manifests.entry(extent).or_default().extend(their_manifest) + }) + }); } pub fn merge_many>(&mut self, others: T) { @@ -417,7 +530,7 @@ mod tests { change_set::ArrayData, format::{ ChunkIndices, NodeId, - manifest::{ChunkInfo, ChunkPayload}, + manifest::{ChunkInfo, ChunkPayload, ManifestSplits}, snapshot::ArrayShape, }, }; @@ -449,28 +562,41 @@ mod tests { ); assert_eq!(None, change_set.new_arrays_chunk_iterator().next()); - change_set.set_chunk_ref(node_id1.clone(), ChunkIndices(vec![0, 1]), None); + let splits1 = ManifestSplits::from_edges(vec![vec![0, 10], vec![0, 10]]); + + change_set.set_chunk_ref( + node_id1.clone(), + ChunkIndices(vec![0, 1]), + None, + &splits1, + ); assert_eq!(None, change_set.new_arrays_chunk_iterator().next()); change_set.set_chunk_ref( node_id1.clone(), ChunkIndices(vec![1, 0]), Some(ChunkPayload::Inline("bar1".into())), + &splits1, ); change_set.set_chunk_ref( node_id1.clone(), ChunkIndices(vec![1, 1]), Some(ChunkPayload::Inline("bar2".into())), + &splits1, ); + + let splits2 = ManifestSplits::from_edges(vec![vec![0, 10]]); change_set.set_chunk_ref( node_id2.clone(), ChunkIndices(vec![0]), Some(ChunkPayload::Inline("baz1".into())), + &splits2, ); change_set.set_chunk_ref( node_id2.clone(), ChunkIndices(vec![1]), Some(ChunkPayload::Inline("baz2".into())), + &splits2, ); { diff --git a/icechunk/src/conflicts/detector.rs b/icechunk/src/conflicts/detector.rs index 3de442e62..fc3ab7214 100644 --- a/icechunk/src/conflicts/detector.rs +++ b/icechunk/src/conflicts/detector.rs @@ -138,14 +138,13 @@ impl ConflictSolver for ConflictDetector { }); let chunks_double_updated = - current_changes.chunk_changes().filter_map(|(node_id, changes)| { + current_changes.changed_chunks().filter_map(|(node_id, changes)| { let previous_changes: HashSet<_> = previous_change.updated_chunks_for(node_id).collect(); if previous_changes.is_empty() { None } else { let conflicting: HashSet<_> = changes - .keys() .filter(|coord| previous_changes.contains(coord)) .cloned() .collect(); diff --git a/icechunk/src/format/manifest.rs b/icechunk/src/format/manifest.rs index ea03d32d3..37fe3d400 100644 --- a/icechunk/src/format/manifest.rs +++ b/icechunk/src/format/manifest.rs @@ -1,10 +1,17 @@ -use std::{borrow::Cow, convert::Infallible, ops::Range, sync::Arc}; +use std::{ + borrow::Cow, + cmp::{max, min}, + convert::Infallible, + iter::zip, + ops::Range, + sync::Arc, +}; use crate::format::flatbuffers::generated; use bytes::Bytes; use flatbuffers::VerifierOptions; use futures::{Stream, TryStreamExt}; -use itertools::{Itertools, multiunzip}; +use itertools::{Itertools, any, multiunzip}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -18,16 +25,20 @@ use super::{ ChunkId, ChunkIndices, ChunkLength, ChunkOffset, IcechunkResult, ManifestId, NodeId, }; -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ManifestExtents(Vec>); - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ManifestRef { - pub object_id: ManifestId, - pub extents: ManifestExtents, +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Overlap { + Complete, + Partial, + None, } +#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] +pub struct ManifestExtents(Vec>); + impl ManifestExtents { + // sentinel for a "universal set" + pub const ALL: Self = Self(Vec::new()); + pub fn new(from: &[u32], to: &[u32]) -> Self { let v = from .iter() @@ -37,6 +48,10 @@ impl ManifestExtents { Self(v) } + pub fn from_ranges_iter(ranges: impl IntoIterator>) -> Self { + Self(ranges.into_iter().collect()) + } + pub fn contains(&self, coord: &[u32]) -> bool { self.iter().zip(coord.iter()).all(|(range, that)| range.contains(that)) } @@ -52,10 +67,71 @@ impl ManifestExtents { pub fn is_empty(&self) -> bool { self.0.is_empty() } + + pub fn intersection(&self, other: &Self) -> Option { + if self == &Self::ALL { + return Some(other.clone()); + } + + debug_assert_eq!(self.len(), other.len()); + let ranges = zip(self.iter(), other.iter()) + .map(|(a, b)| max(a.start, b.start)..min(a.end, b.end)) + .collect::>(); + if any(ranges.iter(), |r| r.end <= r.start) { None } else { Some(Self(ranges)) } + } + + pub fn union(&self, other: &Self) -> Self { + if self == &Self::ALL { + return Self::ALL; + } + debug_assert_eq!(self.len(), other.len()); + Self::from_ranges_iter( + zip(self.iter(), other.iter()) + .map(|(a, b)| min(a.start, b.start)..max(a.end, b.end)), + ) + } + + pub fn overlap_with(&self, other: &Self) -> Overlap { + // Important: this is not symmetric. + if *other == Self::ALL { + return Overlap::Complete; + } else if *self == Self::ALL { + return Overlap::Partial; + } + debug_assert!( + self.len() == other.len(), + "Length mismatch: self = {:?}, other = {:?}", + &self, + &other + ); + let mut overlap = Overlap::Complete; + for (a, b) in zip(other.iter(), self.iter()) { + debug_assert!(a.start <= a.end, "Invalid range: {:?}", a.clone()); + debug_assert!(b.start <= b.end, "Invalid range: {:?}", b.clone()); + if (a.end <= b.start) || (a.start >= b.end) { + return Overlap::None; + } else if !((a.start <= b.start) && (a.end >= b.end)) { + overlap = Overlap::Partial + } + } + overlap + } + + pub fn matches(&self, other: &ManifestExtents) -> bool { + // used in `.filter` + // ALL always matches any other extents + if *self == Self::ALL { true } else { self == other } + } } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct ManifestSplits(Vec); +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ManifestRef { + pub object_id: ManifestId, + pub extents: ManifestExtents, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ManifestSplits(pub Vec); impl ManifestSplits { /// Used at read-time @@ -108,6 +184,21 @@ impl ManifestSplits { pub fn len(&self) -> usize { self.0.len() } + + pub fn compatible_with(&self, other: &Self) -> bool { + // this is not a simple zip + all(equals) because + // ordering might differ though both sets of splits + // must be complete. + for ours in self.iter() { + if any(other.iter(), |theirs| { + ours.overlap_with(theirs) == Overlap::Partial + || theirs.overlap_with(ours) == Overlap::Partial + }) { + return false; + } + } + true + } } /// Helper function for constructing uniformly spaced manifest split edges @@ -526,43 +617,232 @@ static ROOT_OPTIONS: VerifierOptions = VerifierOptions { #[allow(clippy::unwrap_used, clippy::panic)] mod tests { use super::*; - use crate::strategies::{ShapeDim, shapes_and_dims}; + use crate::strategies::{ShapeDim, manifest_extents, shapes_and_dims}; use icechunk_macros; + use itertools::{all, multizip}; + use proptest::collection::vec; use proptest::prelude::*; + use std::error::Error; + use test_strategy::proptest; + + #[proptest] + fn test_property_extents_set_ops_same( + #[strategy(manifest_extents(4))] e: ManifestExtents, + ) { + prop_assert_eq!(e.intersection(&e), Some(e.clone())); + prop_assert_eq!(e.union(&e), e.clone()); + prop_assert_eq!(e.overlap_with(&e), Overlap::Complete); + } - proptest! { - #[icechunk_macros::test] - fn test_manifest_split_from_edges(shape_dim in shapes_and_dims(Some(5))) { - // Note: using the shape, chunks strategy to generate chunk_shape, split_shape - let ShapeDim { shape, .. } = shape_dim; - - let num_chunks = shape.iter().map(|x| x.array_length()).collect::>(); - let split_shape = shape.iter().map(|x| x.chunk_length()).collect::>(); - - let ndim = shape.len(); - let edges: Vec> = - (0usize..ndim).map(|axis| { - uniform_manifest_split_edges(num_chunks[axis] as u32, &(split_shape[axis] as u32)) - } - ).collect(); - - let splits = ManifestSplits::from_edges(edges.into_iter()); - for edge in splits.iter() { - // must be ndim ranges - prop_assert_eq!(edge.len(), ndim); - for range in edge.iter() { - prop_assert!(range.end > range.start); - } - } + #[proptest] + fn test_property_extents_set_ops( + #[strategy(manifest_extents(4))] e1: ManifestExtents, + #[strategy(manifest_extents(4))] e2: ManifestExtents, + ) { + let union = e1.union(&e2); + let intersection = e1.intersection(&e2); + + prop_assert_eq!(e1.intersection(&union), Some(e1.clone())); + prop_assert_eq!(union.intersection(&e1), Some(e1.clone())); + prop_assert_eq!(e2.intersection(&union), Some(e2.clone())); + prop_assert_eq!(union.intersection(&e2), Some(e2.clone())); + + // order is important for the next 2 + prop_assert_eq!(e1.overlap_with(&union), Overlap::Complete); + prop_assert_eq!(e2.overlap_with(&union), Overlap::Complete); + + if intersection.is_some() { + let int = intersection.unwrap(); + let expected = if e1 == e1 { Overlap::Complete } else { Overlap::Partial }; + prop_assert_eq!(int.overlap_with(&e1), expected.clone()); + prop_assert_eq!(int.overlap_with(&e2), expected); + } else { + prop_assert_eq!(e2.overlap_with(&e1), Overlap::None); + prop_assert_eq!(e1.overlap_with(&e2), Overlap::None); + } + } + + #[proptest] + fn test_property_extents_widths( + #[strategy(manifest_extents(4))] extent1: ManifestExtents, + #[strategy(vec(0..100, 4))] delta_left: Vec, + #[strategy(vec(0..100, 4))] delta_right: Vec, + ) { + let widths = extent1.iter().map(|r| (r.end - r.start) as i32).collect::>(); + let extent2 = ManifestExtents::from_ranges_iter( + multizip((extent1.iter(), delta_left.iter(), delta_right.iter())).map( + |(extent, dleft, dright)| { + ((extent.start as i32 + dleft) as u32) + ..((extent.end as i32 + dright) as u32) + }, + ), + ); + + if all(delta_left.iter(), |elem| elem == &0i32) + && all(delta_right.iter(), |elem| elem == &0i32) + { + prop_assert_eq!(extent2.overlap_with(&extent1), Overlap::Complete); + } + + let extent2 = ManifestExtents::from_ranges_iter( + multizip(( + extent1.iter(), + widths.iter(), + delta_left.iter(), + delta_right.iter(), + )) + .map(|(extent, width, dleft, dright)| { + let (low, high) = (dleft.min(dright), dleft.max(dright)); + ((extent.start as i32 + width + low) as u32) + ..((extent.end as i32 + width + high) as u32) + }), + ); + + prop_assert_eq!(extent2.overlap_with(&extent1), Overlap::None); + + let extent2 = ManifestExtents::from_ranges_iter( + multizip(( + extent1.iter(), + widths.iter(), + delta_left.iter(), + delta_right.iter(), + )) + .map(|(extent, width, dleft, dright)| { + let (low, high) = (dleft.min(dright), dleft.max(dright)); + ((extent.start as i32 - width - high).max(0i32) as u32) + ..((extent.end as i32 - width - low) as u32) + }), + ); + prop_assert_eq!(extent2.overlap_with(&extent1), Overlap::None); + + let extent2 = ManifestExtents::from_ranges_iter( + multizip((extent1.iter(), delta_left.iter(), delta_right.iter())).map( + |(extent, dleft, dright)| { + ((extent.start as i32 - dleft - 1).max(0i32) as u32) + ..((extent.end as i32 + dright + 1) as u32) + }, + ), + ); + prop_assert_eq!(extent2.overlap_with(&extent1), Overlap::Partial); + } + + #[icechunk_macros::test] + fn test_overlaps() -> Result<(), Box> { + let e1 = ManifestExtents::new( + vec![0u32, 1, 2].as_slice(), + vec![2u32, 4, 6].as_slice(), + ); + + let e2 = ManifestExtents::new( + vec![10u32, 1, 2].as_slice(), + vec![12u32, 4, 6].as_slice(), + ); + + let union = ManifestExtents::new( + vec![0u32, 1, 2].as_slice(), + vec![12u32, 4, 6].as_slice(), + ); - // when using from_edges, extents must not exactly overlap - for edges in splits.iter().combinations(2) { - let is_equal = std::iter::zip(edges[0].iter(), edges[1].iter()) - .all(|(range1, range2)| { - (range1.start == range2.start) && (range1.end == range2.end) - }); - prop_assert!(!is_equal); + assert_eq!(e2.overlap_with(&e1), Overlap::None); + assert_eq!(e1.intersection(&e2), None); + assert_eq!(e1.union(&e2), union); + + let e1 = ManifestExtents::new( + vec![0u32, 1, 2].as_slice(), + vec![2u32, 4, 6].as_slice(), + ); + let e2 = ManifestExtents::new( + vec![2u32, 1, 2].as_slice(), + vec![42u32, 4, 6].as_slice(), + ); + assert_eq!(e2.overlap_with(&e1), Overlap::None); + assert_eq!(e1.overlap_with(&e2), Overlap::None); + + // asymmetric case + let e1 = ManifestExtents::new( + vec![0u32, 1, 2].as_slice(), + vec![3u32, 4, 6].as_slice(), + ); + let e2 = ManifestExtents::new( + vec![2u32, 1, 2].as_slice(), + vec![3u32, 4, 6].as_slice(), + ); + let union = ManifestExtents::new( + vec![0u32, 1, 2].as_slice(), + vec![3u32, 4, 6].as_slice(), + ); + let intersection = ManifestExtents::new( + vec![2u32, 1, 2].as_slice(), + vec![3u32, 4, 6].as_slice(), + ); + assert_eq!(e2.overlap_with(&e1), Overlap::Complete); + assert_eq!(e1.overlap_with(&e2), Overlap::Partial); + assert_eq!(e1.union(&e2), union.clone()); + assert_eq!(e2.union(&e1), union.clone()); + assert_eq!(e1.intersection(&e2), Some(intersection)); + + // empty set + let e1 = ManifestExtents::new( + vec![0u32, 1, 2].as_slice(), + vec![3u32, 4, 6].as_slice(), + ); + let e2 = ManifestExtents::new( + vec![2u32, 1, 2].as_slice(), + vec![2u32, 4, 6].as_slice(), + ); + assert_eq!(e1.intersection(&e2), None); + + // this should create non-overlapping extents + let splits = ManifestSplits::from_edges(vec![ + vec![0, 10, 20], + vec![0, 1, 2], + vec![0, 21, 22], + ]); + for vec in splits.iter().combinations(2) { + assert_eq!(vec[0].overlap_with(vec[1]), Overlap::None); + assert_eq!(vec[1].overlap_with(vec[0]), Overlap::None); + } + + Ok(()) + } + + #[proptest] + fn test_manifest_split_from_edges( + #[strategy(shapes_and_dims(Some(5)))] shape_dim: ShapeDim, + ) { + // Note: using the shape, chunks strategy to generate chunk_shape, split_shape + let ShapeDim { shape, .. } = shape_dim; + + let num_chunks = shape.iter().map(|x| x.array_length()).collect::>(); + let split_shape = shape.iter().map(|x| x.chunk_length()).collect::>(); + + let ndim = shape.len(); + let edges: Vec> = (0usize..ndim) + .map(|axis| { + uniform_manifest_split_edges( + num_chunks[axis] as u32, + &(split_shape[axis] as u32), + ) + }) + .collect(); + + let splits = ManifestSplits::from_edges(edges.into_iter()); + for edge in splits.iter() { + // must be ndim ranges + prop_assert_eq!(edge.len(), ndim); + for range in edge.iter() { + prop_assert!(range.end > range.start); } } + + // when using from_edges, extents must not exactly overlap + for edges in splits.iter().combinations(2) { + let is_equal = std::iter::zip(edges[0].iter(), edges[1].iter()).all( + |(range1, range2)| { + (range1.start == range2.start) && (range1.end == range2.end) + }, + ); + prop_assert!(!is_equal); + } } } diff --git a/icechunk/src/format/mod.rs b/icechunk/src/format/mod.rs index 58f1ebc7c..557c0b778 100644 --- a/icechunk/src/format/mod.rs +++ b/icechunk/src/format/mod.rs @@ -244,7 +244,7 @@ pub enum IcechunkFormatErrorKind { NodeNotFound { path: Path }, #[error("chunk coordinates not found `{coords:?}`")] ChunkCoordinatesNotFound { coords: ChunkIndices }, - #[error("manifest information cannot be found in snapshot `{manifest_id}`")] + #[error("manifest information cannot be found in snapshot for id `{manifest_id}`")] ManifestInfoNotFound { manifest_id: ManifestId }, #[error("invalid magic numbers in file")] InvalidMagicNumbers, // TODO: add more info diff --git a/icechunk/src/format/transaction_log.rs b/icechunk/src/format/transaction_log.rs index ee064b66f..2ab20d3d1 100644 --- a/icechunk/src/format/transaction_log.rs +++ b/icechunk/src/format/transaction_log.rs @@ -42,12 +42,11 @@ impl TransactionLog { // these come sorted from the change set let updated_chunks = cs - .chunk_changes() + .changed_chunks() .map(|(node_id, chunks)| { let node_id = generated::ObjectId8::new(&node_id.0); let node_id = Some(&node_id); let chunks = chunks - .keys() .map(|indices| { let coords = Some(builder.create_vector(indices.0.as_slice())); generated::ChunkIndices::create( diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index d7652a815..919c0848d 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -933,8 +933,10 @@ pub async fn raise_if_invalid_snapshot_id( #[cfg(test)] #[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used)] mod tests { - use std::{collections::HashMap, error::Error, path::PathBuf, sync::Arc}; + use std::{collections::HashMap, error::Error, iter::zip, path::PathBuf, sync::Arc}; + use icechunk_macros::tokio_test; + use itertools::enumerate; use storage::logging::LoggingStorage; use tempfile::TempDir; @@ -945,30 +947,44 @@ mod tests { ManifestSplitDim, ManifestSplitDimCondition, ManifestSplittingConfig, RepositoryConfig, }, + conflicts::basic_solver::BasicConflictSolver, format::{ ByteRange, ChunkIndices, manifest::{ChunkPayload, ManifestSplits}, snapshot::{ArrayShape, DimensionName}, }, new_local_filesystem_storage, - session::get_chunk, + session::{SessionError, get_chunk}, storage::new_in_memory_storage, }; use super::*; + fn ravel_multi_index<'a>(index: &[u32], shape: &[u32]) -> u32 { + index + .iter() + .zip(shape.iter()) + .rev() + .fold((0, 1), |(acc, stride), (index, size)| { + (acc + *index * stride, stride * *size) + }) + .0 + } + async fn assert_manifest_count( storage: &Arc, total_manifests: usize, ) { + let expected = storage + .list_manifests(&storage.default_settings()) + .await + .unwrap() + .count() + .await; assert_eq!( - storage - .list_manifests(&storage.default_settings()) - .await - .unwrap() - .count() - .await, - total_manifests + total_manifests, expected, + "Mismatch in manifest count: expected {}, but got {}", + expected, total_manifests, ); } @@ -1150,6 +1166,25 @@ mod tests { )); } + fn reopen_repo_with_new_splitting_config( + repo: &Repository, + split_sizes: Option)>>, + ) -> Repository { + let split_config = ManifestSplittingConfig { split_sizes }; + let man_config = ManifestConfig { + preload: Some(ManifestPreloadConfig { + max_total_refs: None, + preload_if: None, + }), + splitting: Some(split_config.clone()), + }; + let config = RepositoryConfig { + manifest: Some(man_config), + ..RepositoryConfig::default() + }; + repo.reopen(Some(config), None).unwrap() + } + async fn create_repo_with_split_manifest_config( path: &Path, shape: &ArrayShape, @@ -1187,7 +1222,203 @@ mod tests { Ok(repository) } - #[tokio::test] + #[tokio_test] + async fn test_resize_rewrites_manifests() -> Result<(), Box> { + let storage: Arc = new_in_memory_storage().await?; + let repo = Repository::create( + Some(RepositoryConfig { + inline_chunk_threshold_bytes: Some(0), + ..Default::default() + }), + Arc::clone(&storage), + HashMap::new(), + ) + .await?; + let mut session = repo.writable_session("main").await?; + session.add_group(Path::root(), Bytes::copy_from_slice(b"")).await?; + + let array_path: Path = "/array".to_string().try_into().unwrap(); + let shape = ArrayShape::new(vec![(4, 1)]).unwrap(); + let dimension_names = Some(vec!["t".into()]); + let array_def = Bytes::from_static(br#"{"this":"other array"}"#); + + session + .add_array( + array_path.clone(), + shape.clone(), + dimension_names.clone(), + array_def.clone(), + ) + .await?; + + let bytes = Bytes::copy_from_slice(&42i8.to_be_bytes()); + for idx in 0..4 { + let payload = session.get_chunk_writer()(bytes.clone()).await?; + session + .set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload)) + .await?; + } + session.commit("first commit", None).await?; + assert_manifest_count(&storage, 1).await; + + // Important we are not issuing any chunk deletes here (which is what Zarr does) + // Note we are still rewriting the manifest even without chunk changes + // GH604 + let mut session = repo.writable_session("main").await?; + let shape2 = ArrayShape::new(vec![(2, 1)]).unwrap(); + session + .update_array( + &array_path, + shape2.clone(), + dimension_names.clone(), + array_def.clone(), + ) + .await?; + session.commit("second commit", None).await?; + assert_manifest_count(&storage, 2).await; + + // Now we expand the size, but don't write chunks. + // No new manifests need to be written + let mut session = repo.writable_session("main").await?; + let shape3 = ArrayShape::new(vec![(6, 1)]).unwrap(); + session + .update_array( + &array_path, + shape3.clone(), + dimension_names.clone(), + array_def.clone(), + ) + .await?; + session.commit("second commit", None).await?; + assert_manifest_count(&storage, 2).await; + + Ok(()) + } + + #[tokio_test] + async fn test_splits_change_in_session() -> Result<(), Box> { + let shape = ArrayShape::new(vec![(13, 1), (2, 1), (1, 1)]).unwrap(); + let dimension_names = Some(vec!["t".into(), "y".into(), "x".into()]); + let new_dimension_names = Some(vec!["time".into(), "y".into(), "x".into()]); + let array_path: Path = "/temperature".try_into().unwrap(); + let array_def = Bytes::from_static(br#"{"this":"other array"}"#); + + // two possible split sizes t: 3, time: 4; + // then we rename `t` to `time` 😈 + let split_sizes = vec![ + ( + ManifestSplitCondition::PathMatches { regex: r".*".to_string() }, + vec![ManifestSplitDim { + condition: ManifestSplitDimCondition::DimensionName( + "^t$".to_string(), + ), + num_chunks: 3, + }], + ), + ( + ManifestSplitCondition::PathMatches { regex: r".*".to_string() }, + vec![ManifestSplitDim { + condition: ManifestSplitDimCondition::DimensionName( + "time".to_string(), + ), + num_chunks: 4, + }], + ), + ]; + let split_config = ManifestSplittingConfig { split_sizes: Some(split_sizes) }; + + let backend: Arc = new_in_memory_storage().await?; + let logging = Arc::new(LoggingStorage::new(Arc::clone(&backend))); + let storage: Arc = logging.clone(); + let repository = create_repo_with_split_manifest_config( + &array_path, + &shape, + &dimension_names, + &split_config, + Some(Arc::clone(&storage)), + ) + .await?; + + let verify_data = async |session: &Session, offset: u32| { + for idx in 0..12 { + let actual = get_chunk( + session + .get_chunk_reader( + &array_path, + &ChunkIndices(vec![idx.clone(), 0, 0]), + &ByteRange::ALL, + ) + .await + .unwrap(), + ) + .await + .unwrap() + .unwrap(); + let expected = + Bytes::copy_from_slice(format!("{0}", idx + offset).as_bytes()); + assert_eq!(actual, expected); + } + }; + + let mut session = repository.writable_session("main").await?; + for i in 0..12 { + session + .set_chunk_ref( + array_path.clone(), + ChunkIndices(vec![i, 0, 0]), + Some(ChunkPayload::Inline(format!("{0}", i).into())), + ) + .await? + } + verify_data(&session, 0).await; + + let node = session.get_node(&array_path).await?; + let orig_splits = session.lookup_splits(&node.id).cloned(); + assert_eq!( + orig_splits, + Some(ManifestSplits::from_edges(vec![ + vec![0, 3, 6, 9, 12, 13], + vec![0, 2], + vec![0, 1] + ])) + ); + + // this should update the splits + session + .update_array( + &array_path, + shape.clone(), + new_dimension_names.clone(), + array_def.clone(), + ) + .await?; + verify_data(&session, 0).await; + let new_splits = session.lookup_splits(&node.id).cloned(); + assert_eq!( + new_splits, + Some(ManifestSplits::from_edges(vec![ + vec![0, 4, 8, 12, 13], + vec![0, 2], + vec![0, 1] + ])) + ); + + // update data + for i in 0..12 { + session + .set_chunk_ref( + array_path.clone(), + ChunkIndices(vec![i, 0, 0]), + Some(ChunkPayload::Inline(format!("{0}", i + 10).into())), + ) + .await? + } + verify_data(&session, 10).await; + + Ok(()) + } + + #[tokio_test] async fn tests_manifest_splitting_simple() -> Result<(), Box> { let dim_size = 25u32; let chunk_size = 1u32; @@ -1245,7 +1476,7 @@ mod tests { ) .await?; session.commit("last split", None).await?; - total_manifests += 2; // FIXME: this should be +1 once writes are optimized + total_manifests += 1; assert_manifest_count(&storage, total_manifests).await; // check that reads are optimized; we should only fetch the last split for this query @@ -1347,10 +1578,50 @@ mod tests { assert_eq!(val, Bytes::copy_from_slice(format!("{0}", i).as_bytes())); } + // delete all chunks + let mut session = repository.writable_session("main").await?; + for i in 0..dim_size { + session + .set_chunk_ref(temp_path.clone(), ChunkIndices(vec![i, 0, 0]), None) + .await?; + } + total_manifests += 0; + session.commit("clear existing array", None).await?; + assert_manifest_count(&storage, total_manifests).await; + + // add a new array + let def = Bytes::from_static(br#"{"this":"array"}"#); + let array_path: Path = "/array2".to_string().try_into().unwrap(); + let mut session = repository.writable_session("main").await?; + session + .add_array( + array_path.clone(), + shape.clone(), + dimension_names.clone(), + def.clone(), + ) + .await?; + // set a chunk + session + .set_chunk_ref( + array_path.clone(), + ChunkIndices(vec![1, 0, 0]), + Some(ChunkPayload::Inline(format!("{0}", 10).into())), + ) + .await?; + // delete that chunk, so the chunks iterator is empty + // regression test for bug found by hypothesis + session + .set_chunk_ref(array_path.clone(), ChunkIndices(vec![1, 0, 0]), None) + .await?; + total_manifests += 0; + session.commit("clear new array", None).await?; + assert_manifest_count(&storage, total_manifests).await; + Ok(()) } - #[tokio::test] + #[tokio_test] async fn test_manifest_splitting_complex_config() -> Result<(), Box> { let shape = ArrayShape::new(vec![(25, 1), (10, 1), (3, 1), (4, 1)]).unwrap(); let dimension_names = Some(vec!["t".into(), "z".into(), "y".into(), "x".into()]); @@ -1380,24 +1651,15 @@ mod tests { ), ]; let split_config = ManifestSplittingConfig { split_sizes: Some(split_sizes) }; - let repo = create_repo_with_split_manifest_config( - &temp_path, - &shape, - &dimension_names, - &split_config, - None, - ) - .await?; - let session = repo.writable_session("main").await?; - let actual = - split_config.get_split_sizes(&session.get_node(&temp_path).await?)?; let expected = ManifestSplits::from_edges(vec![ vec![0, 12, 24, 25], vec![0, 9, 10], vec![0, 2, 3], vec![0, 4], ]); + + let actual = split_config.get_split_sizes(&temp_path, &shape, &dimension_names); assert_eq!(actual, expected); let split_sizes = vec![( @@ -1418,26 +1680,16 @@ mod tests { ], )]; let split_config = ManifestSplittingConfig { split_sizes: Some(split_sizes) }; - let repo = create_repo_with_split_manifest_config( - &temp_path, - &shape, - &dimension_names, - &split_config, - None, - ) - .await?; - - let session = repo.writable_session("main").await?; - let actual = - split_config.get_split_sizes(&session.get_node(&temp_path).await?)?; + let actual = split_config.get_split_sizes(&temp_path, &shape, &dimension_names); assert_eq!(actual, expected); Ok(()) } - #[tokio::test] + #[tokio_test] async fn test_manifest_splitting_complex_writes() -> Result<(), Box> { let t_split_size = 12u32; + let other_split_size = 9u32; let y_split_size = 2u32; let shape = ArrayShape::new(vec![(25, 1), (10, 1), (3, 1), (4, 1)]).unwrap(); @@ -1463,7 +1715,7 @@ mod tests { ManifestSplitCondition::NameMatches { regex: r".*".to_string() }, vec![ManifestSplitDim { condition: ManifestSplitDimCondition::Any, - num_chunks: 9, + num_chunks: other_split_size, }], ), ]; @@ -1482,6 +1734,7 @@ mod tests { Some(logging_c), ) .await?; + let repo_clone = repository.reopen(None, None)?; let mut total_manifests = 0; assert_manifest_count(&backend, total_manifests).await; @@ -1490,32 +1743,14 @@ mod tests { let ops = logging.fetch_operations(); assert!(ops.is_empty()); - let mut add = 0; - for ax in 0..shape.len() { - let mut session = repository.writable_session("main").await?; - let axis_size = shape.get(ax).unwrap().array_length(); - for i in 0..axis_size { - let mut index = vec![0u32, 0, 0, 0]; - index[ax] = i as u32; - session - .set_chunk_ref( - temp_path.clone(), - ChunkIndices(index), - Some(ChunkPayload::Inline(format!("{0}", i).into())), - ) - .await? - } - - add += (axis_size as u32).div_ceil(expected_split_sizes[ax]) as usize - - 1 * ((ax > 0) as usize); - dbg!(&ax, &add); - total_manifests += add; - session.commit(format!("finished axis {0}", ax).as_ref(), None).await?; - assert_manifest_count(&backend, total_manifests).await; + let array_shape = + shape.iter().map(|x| x.array_length() as u32).collect::>(); + let verify_data = async |ax, session: &Session| { for i in 0..shape.get(ax).unwrap().array_length() { let mut index = vec![0u32, 0, 0, 0]; index[ax] = i as u32; + let ic = index.clone(); let val = get_chunk( session .get_chunk_reader( @@ -1528,10 +1763,456 @@ mod tests { ) .await .unwrap() + .expect(&format!("getting chunk ref failed for {:?}", &ic)); + let expected_value = + ravel_multi_index(ic.as_slice(), array_shape.as_slice()); + let expected = + Bytes::copy_from_slice(format!("{0}", expected_value).as_bytes()); + assert_eq!( + val, expected, + "For chunk {:?}, received {:?}, expected {:?}", + ic, val, expected + ); + } + }; + let verify_all_data = async |repo: &Repository| { + let session = repo + .readonly_session(&VersionInfo::BranchTipRef("main".to_string())) + .await .unwrap(); - assert_eq!(val, Bytes::copy_from_slice(format!("{0}", i).as_bytes())); + for ax in 0..shape.len() { + verify_data(ax, &session).await; + } + }; + + //========================================================= + // This loop iterates over axis and rewrites the boundary chunks. + // Each loop iteration must rewrite chunk_shape/split_size manifests + for ax in 0..shape.len() { + let mut session = repository.writable_session("main").await?; + let axis_size = shape.get(ax).unwrap().array_length(); + for i in 0..axis_size { + let mut index = vec![0u32, 0, 0, 0]; + index[ax] = i as u32; + let value = ravel_multi_index(index.as_slice(), array_shape.as_slice()); + session + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(index), + Some(ChunkPayload::Inline(format!("{0}", value).into())), + ) + .await? } + + total_manifests += + (axis_size as u32).div_ceil(expected_split_sizes[ax]) as usize; + session.commit(format!("finished axis {0}", ax).as_ref(), None).await?; + assert_manifest_count(&backend, total_manifests).await; + + verify_data(ax.clone(), &session).await; + } + verify_all_data(&repository).await; + + //========================================================= + // Now change splitting config + let split_sizes = vec![( + ManifestSplitCondition::AnyArray, + vec![ManifestSplitDim { + condition: ManifestSplitDimCondition::DimensionName("t".to_string()), + num_chunks: t_split_size, + }], + )]; + + let repository = + reopen_repo_with_new_splitting_config(&repository, Some(split_sizes)); + verify_all_data(&repository).await; + let mut session = repository.writable_session("main").await?; + let index = vec![13, 0, 0, 0]; + let value = ravel_multi_index(index.as_slice(), array_shape.as_slice()); + session + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(index), + Some(ChunkPayload::Inline(format!("{0}", value).into())), + ) + .await?; + // Important: we only create one new manifest in this case for + // the first split in the `t`-axis. Since the other splits + // are not modified we preserve all the old manifests + total_manifests += 1; + session.commit(format!("finished time again").as_ref(), None).await?; + assert_manifest_count(&backend, total_manifests).await; + verify_all_data(&repository).await; + + // now modify all splits to trigger a full rewrite + let mut session = repository.writable_session("main").await?; + for idx in [0, 12, 24] { + let index = vec![idx, 0, 0, 0]; + let value = ravel_multi_index(index.as_slice(), array_shape.as_slice()); + session + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(index), + Some(ChunkPayload::Inline(format!("{0}", value).into())), + ) + .await?; + } + total_manifests += + (shape.get(0).unwrap().array_length() as u32).div_ceil(t_split_size) as usize; + session.commit(format!("finished time again").as_ref(), None).await?; + assert_manifest_count(&backend, total_manifests).await; + verify_all_data(&repository).await; + + //========================================================= + // Now get back to original repository with original config + // Modify one `t` split. + let mut session = repo_clone.writable_session("main").await?; + session + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(vec![0, 0, 0, 0]), + Some(ChunkPayload::Inline(format!("{0}", 0).into())), + ) + .await?; + // Important: now we rewrite one split per dimension + total_manifests += 3; + session.commit(format!("finished time again").as_ref(), None).await?; + assert_manifest_count(&backend, total_manifests).await; + verify_all_data(&repo_clone).await; + verify_all_data(&repository).await; + + let mut session = repo_clone.writable_session("main").await?; + for idx in [0, 12, 24] { + let index = vec![idx, 0, 0, 0]; + let value = ravel_multi_index(index.as_slice(), array_shape.as_slice()); + session + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(index), + Some(ChunkPayload::Inline(format!("{0}", value).into())), + ) + .await?; + } + total_manifests += + (shape.get(0).unwrap().array_length() as u32).div_ceil(t_split_size) as usize; + session.commit(format!("finished time again").as_ref(), None).await?; + assert_manifest_count(&backend, total_manifests).await; + verify_all_data(&repo_clone).await; + + // do that again, but with different values and test those specifically + let mut session = repo_clone.writable_session("main").await?; + for idx in [0, 12, 24] { + let index = vec![idx, 0, 0, 0]; + session + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(index), + Some(ChunkPayload::Inline(format!("{0}", idx + 2).into())), + ) + .await?; + } + total_manifests += + (shape.get(0).unwrap().array_length() as u32).div_ceil(t_split_size) as usize; + session.commit(format!("finished time again").as_ref(), None).await?; + assert_manifest_count(&backend, total_manifests).await; + for idx in [0, 12, 24] { + let actual = get_chunk( + session + .get_chunk_reader( + &temp_path, + &ChunkIndices(vec![idx.clone(), 0, 0, 0]), + &ByteRange::ALL, + ) + .await + .unwrap(), + ) + .await + .unwrap() + .unwrap(); + let expected = Bytes::copy_from_slice(format!("{0}", idx + 2).as_bytes()); + assert_eq!(actual, expected); + } + Ok(()) + } + + #[tokio_test] + async fn test_manifest_splits_merge_sessions() -> Result<(), Box> { + let shape = ArrayShape::new(vec![(25, 1), (10, 1), (3, 1), (4, 1)]).unwrap(); + let dimension_names = Some(vec!["t".into(), "z".into(), "y".into(), "x".into()]); + let temp_path: Path = "/temperature".try_into().unwrap(); + + let orig_split_sizes = vec![( + ManifestSplitCondition::AnyArray, + vec![ManifestSplitDim { + condition: ManifestSplitDimCondition::DimensionName("t".to_string()), + num_chunks: 12u32, + }], + )]; + let split_config = + ManifestSplittingConfig { split_sizes: Some(orig_split_sizes.clone()) }; + let backend: Arc = new_in_memory_storage().await?; + let repository = create_repo_with_split_manifest_config( + &temp_path, + &shape, + &dimension_names, + &split_config, + Some(backend), + ) + .await?; + + let indices = + vec![vec![0, 0, 1, 0], vec![0, 0, 0, 0], vec![0, 2, 0, 0], vec![0, 2, 0, 1]]; + + let mut session1 = repository.writable_session("main").await?; + let node_id = session1.get_node(&temp_path).await?.id; + session1 + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(indices[0].clone()), + Some(ChunkPayload::Inline(format!("{0}", 0).into())), + ) + .await?; + session1 + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(indices[1].clone()), + Some(ChunkPayload::Inline(format!("{0}", 1).into())), + ) + .await?; + + for incompatible_size in [1, 11u32, 24u32, u32::MAX] { + let incompatible_split_sizes = vec![( + ManifestSplitCondition::AnyArray, + vec![ManifestSplitDim { + condition: ManifestSplitDimCondition::DimensionName("t".to_string()), + num_chunks: incompatible_size, + }], + )]; + let other_repo = reopen_repo_with_new_splitting_config( + &repository, + Some(incompatible_split_sizes), + ); + + assert_ne!(other_repo.config(), repository.config()); + + let mut session2 = other_repo.writable_session("main").await?; + session2 + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(indices[2].clone()), + Some(ChunkPayload::Inline(format!("{0}", 2).into())), + ) + .await?; + session2 + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(indices[3].clone()), + Some(ChunkPayload::Inline(format!("{0}", 3).into())), + ) + .await?; + + assert!(session1.merge(session2).await.is_err()); + } + + // now with the same split sizes + let other_repo = + reopen_repo_with_new_splitting_config(&repository, Some(orig_split_sizes)); + let mut session2 = other_repo.writable_session("main").await?; + session2 + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(indices[2].clone()), + Some(ChunkPayload::Inline(format!("{0}", 2).into())), + ) + .await?; + session2 + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(indices[3].clone()), + Some(ChunkPayload::Inline(format!("{0}", 3).into())), + ) + .await?; + + // Session.splits should be _complete_ so it should be identical for the same node + // on any two sessions with compatible splits + let splits = session1.lookup_splits(&node_id).unwrap().clone(); + assert_eq!(session1.lookup_splits(&node_id), session2.lookup_splits(&node_id)); + session1.merge(session2).await?; + assert_eq!(session1.lookup_splits(&node_id), Some(&splits)); + for (val, idx) in enumerate(indices.iter()) { + let actual = get_chunk( + session1 + .get_chunk_reader( + &temp_path, + &ChunkIndices(idx.clone()), + &ByteRange::ALL, + ) + .await + .unwrap(), + ) + .await + .unwrap() + .expect(&format!("getting chunk ref failed for {:?}", &idx)); + let expected = Bytes::copy_from_slice(format!("{0}", val).as_bytes()); + assert_eq!(actual, expected); + } + + // now merge two sessions: one with only writes, one with only deletes + let mut session1 = repository.writable_session("main").await?; + session1 + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(indices[0].clone()), + Some(ChunkPayload::Inline(format!("{0}", 3).into())), + ) + .await?; + session1 + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(indices[1].clone()), + Some(ChunkPayload::Inline(format!("{0}", 4).into())), + ) + .await?; + let mut session2 = repository.writable_session("main").await?; + session2 + .set_chunk_ref(temp_path.clone(), ChunkIndices(indices[2].clone()), None) + .await?; + session2 + .set_chunk_ref(temp_path.clone(), ChunkIndices(indices[3].clone()), None) + .await?; + + session1.merge(session2).await?; + let expected = vec![Some(3), Some(4), None, None]; + for (expect, idx) in zip(expected.iter(), indices.iter()) { + let actual = get_chunk( + session1 + .get_chunk_reader( + &temp_path, + &ChunkIndices(idx.clone()), + &ByteRange::ALL, + ) + .await + .unwrap(), + ) + .await + .unwrap(); + let expected_value = + expect.map(|val| Bytes::copy_from_slice(format!("{0}", val).as_bytes())); + assert_eq!(actual, expected_value); + } + + Ok(()) + } + + #[tokio_test] + async fn test_commits_with_conflicting_manifest_splits() -> Result<(), Box> + { + let shape = ArrayShape::new(vec![(25, 1), (10, 1), (3, 1), (4, 1)]).unwrap(); + let dimension_names = Some(vec!["t".into(), "z".into(), "y".into(), "x".into()]); + let temp_path: Path = "/temperature".try_into().unwrap(); + + let orig_split_sizes = vec![( + ManifestSplitCondition::AnyArray, + vec![ManifestSplitDim { + condition: ManifestSplitDimCondition::DimensionName("t".to_string()), + num_chunks: 12u32, + }], + )]; + let split_config = + ManifestSplittingConfig { split_sizes: Some(orig_split_sizes.clone()) }; + let backend: Arc = new_in_memory_storage().await?; + let repository = create_repo_with_split_manifest_config( + &temp_path, + &shape, + &dimension_names, + &split_config, + Some(backend), + ) + .await?; + + let indices = + vec![vec![0, 0, 1, 0], vec![0, 0, 0, 0], vec![0, 2, 0, 0], vec![0, 2, 0, 1]]; + + let mut session1 = repository.writable_session("main").await?; + session1 + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(indices[0].clone()), + Some(ChunkPayload::Inline(format!("{0}", 0).into())), + ) + .await?; + session1 + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(indices[1].clone()), + Some(ChunkPayload::Inline(format!("{0}", 1).into())), + ) + .await?; + + let incompatible_size = 11u32; + let incompatible_split_sizes = vec![( + ManifestSplitCondition::AnyArray, + vec![ManifestSplitDim { + condition: ManifestSplitDimCondition::DimensionName("t".to_string()), + num_chunks: incompatible_size, + }], + )]; + let other_repo = reopen_repo_with_new_splitting_config( + &repository, + Some(incompatible_split_sizes), + ); + + assert_ne!(other_repo.config(), repository.config()); + + let mut session2 = other_repo.writable_session("main").await?; + session2 + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(indices[2].clone()), + Some(ChunkPayload::Inline(format!("{0}", 2).into())), + ) + .await?; + session2 + .set_chunk_ref( + temp_path.clone(), + ChunkIndices(indices[3].clone()), + Some(ChunkPayload::Inline(format!("{0}", 3).into())), + ) + .await?; + + session1.commit("first commit", None).await?; + if let Err(SessionError { kind: SessionErrorKind::Conflict { .. }, .. }) = + session2.commit("second commit", None).await + { + let solver = BasicConflictSolver::default(); + // different chunks were written so this should fast forward + assert!(session2.rebase(&solver).await.is_ok()); + session2.commit("second commit after rebase", None).await?; + } else { + panic!("this should have conflicted!"); } + + let new_session = repository + .readonly_session(&VersionInfo::BranchTipRef("main".into())) + .await?; + for (val, idx) in enumerate(indices.iter()) { + let actual = get_chunk( + new_session + .get_chunk_reader( + &temp_path, + &ChunkIndices(idx.clone()), + &ByteRange::ALL, + ) + .await + .unwrap(), + ) + .await + .unwrap() + .expect(&format!("getting chunk ref failed for {:?}", &idx)); + let expected = Bytes::copy_from_slice(format!("{0}", val).as_bytes()); + assert_eq!(actual, expected); + } + Ok(()) } diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index 8a4c257b2..1e0777b04 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -13,7 +13,7 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use err_into::ErrorInto; use futures::{FutureExt, Stream, StreamExt, TryStreamExt, future::Either, stream}; -use itertools::{Itertools as _, repeat_n}; +use itertools::{Itertools as _, enumerate, repeat_n}; use regex::bytes::Regex; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -32,8 +32,9 @@ use crate::{ IcechunkFormatErrorKind, ManifestId, NodeId, ObjectId, Path, SnapshotId, manifest::{ ChunkInfo, ChunkPayload, ChunkRef, Manifest, ManifestExtents, ManifestRef, - ManifestSplits, VirtualChunkLocation, VirtualChunkRef, VirtualReferenceError, - VirtualReferenceErrorKind, uniform_manifest_split_edges, + ManifestSplits, Overlap, VirtualChunkLocation, VirtualChunkRef, + VirtualReferenceError, VirtualReferenceErrorKind, + uniform_manifest_split_edges, }, snapshot::{ ArrayShape, DimensionName, ManifestFileInfo, NodeData, NodeSnapshot, @@ -108,8 +109,15 @@ pub enum SessionErrorKind { InvalidIndex { coords: ChunkIndices, path: Path }, #[error("invalid chunk index for splitting manifests: {coords:?}")] InvalidIndexForSplitManifests { coords: ChunkIndices }, + #[error("incompatible manifest splitting config when merging two sessions")] + IncompatibleSplittingConfig { + ours: ManifestSplittingConfig, + theirs: ManifestSplittingConfig, + }, #[error("`to` snapshot ancestry doesn't include `from`")] BadSnapshotChainForDiff, + #[error("failed to create manifest from chunk stream")] + ManifestCreationError(#[from] Box), } pub type SessionError = ICError; @@ -160,31 +168,40 @@ impl From for SessionError { pub type SessionResult = Result; +// Returns the index of split_range that includes ChunkIndices +// This can be used at write time to split manifests based on the config +// and at read time to choose which manifest to query for chunk payload +/// It is useful to have this act on an iterator (e.g. get_chunk_ref) +/// The find method on ManifestSplits is simply a helper. +pub fn find_coord<'a, I>( + iter: I, + coord: &'a ChunkIndices, +) -> Option<(usize, &'a ManifestExtents)> +where + I: Iterator, +{ + // split_range[i] must bound ChunkIndices + // 0 <= return value <= split_range.len() + // it is possible that split_range does not include a coord. say we have 2x2 split grid + // but only split (0,0) and split (1,1) are populated with data. + // A coord located in (1, 0) should return Err + // Since split_range need not form a regular grid, we must iterate through and find the first result. + // ManifestExtents in split_range MUST NOT overlap with each other. How do we ensure this? + // ndim must be the same + // Note: I don't think we can distinguish between out of bounds index for the array + // and an index that is part of a split that hasn't been written yet. + enumerate(iter).find(|(_, e)| e.contains(coord.0.as_slice())) +} + impl ManifestSplits { - // Returns the index of split_range that includes ChunkIndices - // This can be used at write time to split manifests based on the config - // and at read time to choose which manifest to query for chunk payload - pub fn which(&self, coord: &ChunkIndices) -> SessionResult { - // split_range[i] must bound ChunkIndices - // 0 <= return value <= split_range.len() - // it is possible that split_range does not include a coord. say we have 2x2 split grid - // but only split (0,0) and split (1,1) are populated with data. - // A coord located in (1, 0) should return Err - // Since split_range need not form a regular grid, we must iterate through and find the first result. - // ManifestExtents in split_range MUST NOT overlap with each other. How do we ensure this? - // ndim must be the same - // debug_assert_eq!(coord.0.len(), split_range[0].len()); - // FIXME: could optimize for unbounded single manifest - // Note: I don't think we can distinguish between out of bounds index for the array - // and an index that is part of a split that hasn't been written yet. - self.iter() - .enumerate() - .find(|(_, e)| e.contains(coord.0.as_slice())) - .map(|(i, _)| i) - .ok_or( - SessionErrorKind::InvalidIndexForSplitManifests { coords: coord.clone() } - .into(), - ) + pub fn find<'a>(&'a self, coord: &'a ChunkIndices) -> Option<&'a ManifestExtents> { + debug_assert_eq!(coord.0.len(), self.0[0].len()); + find_coord(self.iter(), coord).map(|x| x.1) + } + + pub fn position(&self, coord: &ChunkIndices) -> Option { + debug_assert_eq!(coord.0.len(), self.0[0].len()); + find_coord(self.iter(), coord).map(|x| x.0) } } @@ -199,6 +216,8 @@ pub struct Session { snapshot_id: SnapshotId, change_set: ChangeSet, default_commit_metadata: SnapshotProperties, + // This is an optimization so that we needn't figure out the split sizes on every set. + splits: HashMap, } impl Session { @@ -220,6 +239,9 @@ impl Session { snapshot_id, change_set: ChangeSet::default(), default_commit_metadata: SnapshotProperties::default(), + // Splits are populated for a node during + // `add_array`, `update_array`, and `set_chunk_ref` + splits: Default::default(), } } @@ -242,8 +264,9 @@ impl Session { virtual_resolver, branch_name: Some(branch_name), snapshot_id, - change_set: ChangeSet::default(), + change_set: Default::default(), default_commit_metadata, + splits: Default::default(), } } @@ -383,6 +406,7 @@ impl Session { match self.get_node(&path).await { Err(SessionError { kind: SessionErrorKind::NodeNotFound { .. }, .. }) => { let id = NodeId::random(); + self.cache_splits(&id, &path, &shape, &dimension_names); self.change_set.add_array( path, id, @@ -411,10 +435,14 @@ impl Session { user_data: Bytes, ) -> SessionResult<()> { self.get_array(path).await.map(|node| { + // needed to handle a resize for example. + self.cache_splits(&node.id, path, &shape, &dimension_names); self.change_set.update_array( &node.id, path, ArrayData { shape, dimension_names, user_data }, + #[allow(clippy::expect_used)] + self.splits.get(&node.id).expect("getting splits should not fail."), ) }) } @@ -475,6 +503,43 @@ impl Session { self.set_node_chunk_ref(node_snapshot, coord, data).await } + pub fn lookup_splits(&self, node_id: &NodeId) -> Option<&ManifestSplits> { + self.splits.get(node_id) + } + + /// This method is directly called in add_array & update_array + /// where we know we must update the splits HashMap + fn cache_splits( + &mut self, + node_id: &NodeId, + path: &Path, + shape: &ArrayShape, + dimension_names: &Option>, + ) { + // Q: What happens if we set a chunk, then change a dimension name, so + // that the split changes. + // A: we reorg the existing chunk refs in the changeset to the new splits. + let splitting = self.config.manifest().splitting(); + let splits = splitting.get_split_sizes(path, shape, dimension_names); + self.splits.insert(node_id.clone(), splits); + } + + fn get_splits( + &mut self, + node_id: &NodeId, + path: &Path, + shape: &ArrayShape, + dimension_names: &Option>, + ) -> &ManifestSplits { + self.splits.entry(node_id.clone()).or_insert_with(|| { + self.config.manifest().splitting().get_split_sizes( + path, + shape, + dimension_names, + ) + }) + } + // Helper function that accepts a NodeSnapshot instead of a path, // this lets us do bulk sets (and deletes) without repeatedly grabbing the node. #[instrument(skip(self))] @@ -484,9 +549,12 @@ impl Session { coord: ChunkIndices, data: Option, ) -> SessionResult<()> { - if let NodeData::Array { shape, .. } = node.node_data { + if let NodeData::Array { shape, dimension_names, .. } = node.node_data { if shape.valid_chunk_coord(&coord) { - self.change_set.set_chunk_ref(node.id, coord, data); + let splits = self + .get_splits(&node.id, &node.path, &shape, &dimension_names) + .clone(); + self.change_set.set_chunk_ref(node.id, coord, data, &splits); Ok(()) } else { Err(SessionErrorKind::InvalidIndex { @@ -560,6 +628,7 @@ impl Session { &self.change_set, &self.snapshot_id, path, + ManifestExtents::ALL, ) .await } @@ -579,9 +648,12 @@ impl Session { message: "getting chunk reference".to_string(), } .into()), - NodeData::Array { manifests, .. } => { - // Note: at this point, coords could be invalid for the array shape - // but we just let that pass. + NodeData::Array { shape, manifests, .. } => { + if !shape.valid_chunk_coord(coords) { + // this chunk ref cannot exist + return Ok(None); + } + // check the chunks modified in this session first // TODO: I hate rust forces me to clone to search in a hashmap. How to do better? let session_chunk = @@ -744,16 +816,14 @@ impl Session { // in the changeset, return None to Zarr. return Ok(None); } - let splits = ManifestSplits::from_extents( - manifests.iter().map(|m| m.extents.clone()).collect(), - ); - let index = match splits.which(coords) { - Ok(index) => index, + + let index = match find_coord(manifests.iter().map(|m| &m.extents), coords) { + Some((index, _)) => index, // for an invalid coordinate, we bail. // This happens for two cases: // (1) the "coords" is out-of-range for the array shape // (2) the "coords" belongs to a shard that hasn't been written yet. - Err(_) => return Ok(None), + None => return Ok(None), }; let manifest = self.fetch_manifest(&manifests[index].object_id).await?; @@ -800,6 +870,7 @@ impl Session { &self.change_set, &self.snapshot_id, node.clone(), + ManifestExtents::ALL, ) .await .map_ok(|(_path, chunk_info)| chunk_info.coord); @@ -807,7 +878,7 @@ impl Session { let res = try_stream! { let new_chunks = stream::iter( self.change_set - .new_array_chunk_iterator(&node.id, array_path) + .new_array_chunk_iterator(&node.id, array_path, ManifestExtents::ALL) .map(|chunk_info| Ok::(chunk_info.coord)), ); @@ -840,12 +911,30 @@ impl Session { } /// Merge a set of `ChangeSet`s into the repository without committing them - #[instrument(skip(self, changes))] - pub async fn merge(&mut self, changes: ChangeSet) -> SessionResult<()> { + #[instrument(skip(self, other))] + pub async fn merge(&mut self, other: Session) -> SessionResult<()> { if self.read_only() { return Err(SessionErrorKind::ReadOnlySession.into()); } - self.change_set.merge(changes); + let Session { splits: other_splits, change_set, .. } = other; + + if self.splits.iter().any(|(node, our_splits)| { + other_splits + .get(node) + .is_some_and(|their_splits| !our_splits.compatible_with(their_splits)) + }) { + let ours = self.config().manifest().splitting().clone(); + let theirs = self.config().manifest().splitting().clone(); + return Err( + SessionErrorKind::IncompatibleSplittingConfig { ours, theirs }.into() + ); + } + + // Session.splits is _complete_ in that it will include every possible split. + // So a simple `extend` is fine, if the same node appears in two sessions, + // it must have the same splits and overwriting is fine. + self.splits.extend(other_splits); + self.change_set.merge(change_set); Ok(()) } @@ -884,9 +973,9 @@ impl Session { branch_name, &self.snapshot_id, &self.change_set, - &self.config, message, Some(properties), + &self.splits, ) .await } @@ -907,9 +996,9 @@ impl Session { branch_name, &self.snapshot_id, &self.change_set, - &self.config, message, Some(properties), + &self.splits, ) .await } @@ -1112,8 +1201,18 @@ async fn updated_chunk_iterator<'a>( let snapshot = asset_manager.fetch_snapshot(snapshot_id).await?; let nodes = futures::stream::iter(snapshot.iter_arc()); let res = nodes.and_then(move |node| async move { - Ok(updated_node_chunks_iterator(asset_manager, change_set, snapshot_id, node) - .await) + // Note: Confusingly, these NodeSnapshot instances have the metadata stored in the snapshot. + // We have not applied any changeset updates. At the moment, the downstream code only + // use node.id so there is no need to update yet. + + Ok(updated_node_chunks_iterator( + asset_manager, + change_set, + snapshot_id, + node, + ManifestExtents::ALL, + ) + .await) }); Ok(res.try_flatten()) } @@ -1123,6 +1222,7 @@ async fn updated_node_chunks_iterator<'a>( change_set: &'a ChangeSet, snapshot_id: &'a SnapshotId, node: NodeSnapshot, + extent: ManifestExtents, ) -> impl Stream> + 'a { // This iterator should yield chunks for existing arrays + any updates. // we check for deletion here in the case that `path` exists in the snapshot, @@ -1133,9 +1233,15 @@ async fn updated_node_chunks_iterator<'a>( let path = node.path.clone(); Either::Right( // TODO: avoid clone - verified_node_chunk_iterator(asset_manager, snapshot_id, change_set, node) - .await - .map_ok(move |ci| (path.clone(), ci)), + verified_node_chunk_iterator( + asset_manager, + snapshot_id, + change_set, + node, + extent, + ) + .await + .map_ok(move |ci| (path.clone(), ci)), ) } } @@ -1146,11 +1252,18 @@ async fn node_chunk_iterator<'a>( change_set: &'a ChangeSet, snapshot_id: &'a SnapshotId, path: &Path, + extent: ManifestExtents, ) -> impl Stream> + 'a + use<'a> { match get_node(asset_manager, change_set, snapshot_id, path).await { Ok(node) => futures::future::Either::Left( - verified_node_chunk_iterator(asset_manager, snapshot_id, change_set, node) - .await, + verified_node_chunk_iterator( + asset_manager, + snapshot_id, + change_set, + node, + extent, + ) + .await, ), Err(_) => futures::future::Either::Right(futures::stream::empty()), } @@ -1162,20 +1275,25 @@ async fn verified_node_chunk_iterator<'a>( snapshot_id: &'a SnapshotId, change_set: &'a ChangeSet, node: NodeSnapshot, + extent: ManifestExtents, ) -> impl Stream> + 'a { match node.node_data { NodeData::Group => futures::future::Either::Left(futures::stream::empty()), NodeData::Array { manifests, .. } => { let new_chunk_indices: Box> = Box::new( change_set - .array_chunks_iterator(&node.id, &node.path) + .array_chunks_iterator(&node.id, &node.path, extent.clone()) .map(|(idx, _)| idx) + // by chaining here, we make sure we don't pull from the manifest + // any chunks that were deleted prior to resizing in this session + .chain(change_set.deleted_chunks_iterator(&node.id)) .collect(), ); let node_id_c = node.id.clone(); + let extent_c = extent.clone(); let new_chunks = change_set - .array_chunks_iterator(&node.id, &node.path) + .array_chunks_iterator(&node.id, &node.path, extent.clone()) .filter_map(move |(idx, payload)| { payload.as_ref().map(|payload| { Ok(ChunkInfo { @@ -1189,11 +1307,18 @@ async fn verified_node_chunk_iterator<'a>( futures::future::Either::Right( futures::stream::iter(new_chunks).chain( futures::stream::iter(manifests) + .filter(move |manifest_ref| { + futures::future::ready( + extent.overlap_with(&manifest_ref.extents) + != Overlap::None, + ) + }) .then(move |manifest_ref| { let new_chunk_indices = new_chunk_indices.clone(); let node_id_c = node.id.clone(); let node_id_c2 = node.id.clone(); let node_id_c3 = node.id.clone(); + let extent_c2 = extent_c.clone(); async move { let manifest = fetch_manifest( &manifest_ref.object_id, @@ -1207,6 +1332,9 @@ async fn verified_node_chunk_iterator<'a>( .iter(node_id_c.clone()) .filter_ok(move |(coord, _)| { !new_chunk_indices.contains(coord) + // If the manifest we are parsing partially overlaps with `extent`, + // we need to filter all coordinates + && extent_c2.contains(coord.0.as_slice()) }) .map_ok(move |(coord, payload)| ChunkInfo { node: node_id_c2.clone(), @@ -1444,46 +1572,11 @@ pub fn construct_valid_byte_range( } } -#[derive(Default, Debug)] -struct SplitManifest { - from: Vec, - to: Vec, - chunks: Vec, -} - -impl SplitManifest { - fn update(&mut self, chunk: ChunkInfo) { - if self.from.is_empty() { - debug_assert!(self.to.is_empty()); - debug_assert!(self.chunks.is_empty()); - // important to remember that `to` is not inclusive, so we need +1 - let mut coord = chunk.coord.0.clone(); - self.to.extend(coord.iter().map(|n| *n + 1)); - self.from.append(&mut coord); - } else { - for (existing, coord) in self.from.iter_mut().zip(chunk.coord.0.iter()) { - if coord < existing { - *existing = *coord - } - } - for (existing, coord) in self.to.iter_mut().zip(chunk.coord.0.iter()) { - // important to remember that `to` is not inclusive, so we need +1 - let range_value = coord + 1; - if range_value > *existing { - *existing = range_value - } - } - } - - self.chunks.push(chunk) - } -} - struct FlushProcess<'a> { asset_manager: Arc, change_set: &'a ChangeSet, parent_id: &'a SnapshotId, - config: &'a RepositoryConfig, + splits: &'a HashMap, manifest_refs: HashMap>, manifest_files: HashSet, } @@ -1493,67 +1586,64 @@ impl<'a> FlushProcess<'a> { asset_manager: Arc, change_set: &'a ChangeSet, parent_id: &'a SnapshotId, - config: &'a RepositoryConfig, + splits: &'a HashMap, ) -> Self { Self { asset_manager, change_set, parent_id, - config, + splits, manifest_refs: Default::default(), manifest_files: Default::default(), } } - async fn write_manifests_from_iterator( + async fn write_manifest_for_updated_chunks( &mut self, - node_id: &NodeId, - chunks: impl Stream>, - splits: ManifestSplits, - ) -> SessionResult<()> { - // TODO: think about optimizing writes to manifests - // TODO: add benchmarks - let split_refs = chunks - .try_fold( - // TODO: have the changeset track this HashMap - HashMap::::with_capacity(splits.len()), - |mut split_refs, chunk| async { - let split_index = splits.which(&chunk.coord); - split_index.map(|index| { - split_refs.entry(index).or_default().update(chunk); - split_refs - }) - }, - ) - .await?; - - for (_, shard) in split_refs.into_iter() { - let shard_chunks = - stream::iter(shard.chunks.into_iter().map(Ok::)); - - if let Some(new_manifest) = Manifest::from_stream(shard_chunks).await.unwrap() - { - let new_manifest = Arc::new(new_manifest); - let new_manifest_size = - self.asset_manager.write_manifest(Arc::clone(&new_manifest)).await?; - - let file_info = - ManifestFileInfo::new(new_manifest.as_ref(), new_manifest_size); - self.manifest_files.insert(file_info); + node: &NodeSnapshot, + extent: &ManifestExtents, + ) -> SessionResult> { + let asset_manager = Arc::clone(&self.asset_manager); + let updated_chunks = updated_node_chunks_iterator( + asset_manager.as_ref(), + self.change_set, + self.parent_id, + node.clone(), + extent.clone(), + ) + .await + .map_ok(|(_path, chunk_info)| chunk_info); + self.write_manifest_from_iterator(updated_chunks).await + } - let new_ref = ManifestRef { - object_id: new_manifest.id().clone(), - extents: ManifestExtents::new(&shard.from, &shard.to), - }; + async fn write_manifest_from_iterator( + &mut self, + chunks: impl Stream>, + ) -> SessionResult> { + let mut from = vec![]; + let mut to = vec![]; + let chunks = aggregate_extents(&mut from, &mut to, chunks, |ci| &ci.coord); - self.manifest_refs - .entry(node_id.clone()) - .and_modify(|v| v.push(new_ref.clone())) - .or_insert_with(|| vec![new_ref]); - } + if let Some(new_manifest) = Manifest::from_stream(chunks) + .await + .map_err(|e| SessionErrorKind::ManifestCreationError(Box::new(e)))? + { + let new_manifest = Arc::new(new_manifest); + let new_manifest_size = + self.asset_manager.write_manifest(Arc::clone(&new_manifest)).await?; + + let file_info = + ManifestFileInfo::new(new_manifest.as_ref(), new_manifest_size); + self.manifest_files.insert(file_info); + + let new_ref = ManifestRef { + object_id: new_manifest.id().clone(), + extents: ManifestExtents::new(&from, &to), + }; + Ok(Some(new_ref)) + } else { + Ok(None) } - - Ok(()) } /// Write a manifest for a node that was created in this session @@ -1562,12 +1652,29 @@ impl<'a> FlushProcess<'a> { &mut self, node_id: &NodeId, node_path: &Path, - splits: ManifestSplits, ) -> SessionResult<()> { - let chunks = stream::iter( - self.change_set.new_array_chunk_iterator(node_id, node_path).map(Ok), - ); - self.write_manifests_from_iterator(node_id, chunks, splits).await + #[allow(clippy::expect_used)] + let splits = + self.splits.get(node_id).expect("getting split for node unexpectedly failed"); + + for extent in splits.iter() { + if self.change_set.array_manifest(node_id, extent).is_some() { + let chunks = stream::iter( + self.change_set + .new_array_chunk_iterator(node_id, node_path, extent.clone()) + .map(Ok), + ); + #[allow(clippy::expect_used)] + let new_ref = self.write_manifest_from_iterator(chunks).await.expect( + "logic bug. for a new node, we must always write the manifest", + ); + // new_ref is None if there were no chunks in the iterator + if let Some(new_ref) = new_ref { + self.manifest_refs.entry(node_id.clone()).or_default().push(new_ref); + } + } + } + Ok(()) } /// Write a manifest for a node that was modified in this session @@ -1576,19 +1683,82 @@ impl<'a> FlushProcess<'a> { async fn write_manifest_for_existing_node( &mut self, node: &NodeSnapshot, - splits: ManifestSplits, + existing_manifests: Vec, + old_snapshot: &Snapshot, ) -> SessionResult<()> { - let asset_manager = Arc::clone(&self.asset_manager); - let updated_chunks = updated_node_chunks_iterator( - asset_manager.as_ref(), - self.change_set, - self.parent_id, - node.clone(), - ) - .await - .map_ok(|(_path, chunk_info)| chunk_info); + #[allow(clippy::expect_used)] + let splits = + self.splits.get(&node.id).expect("splits should exist for this node."); + let mut refs = + HashMap::>::with_capacity(splits.len()); + + let on_disk_extents = + existing_manifests.iter().map(|m| m.extents.clone()).collect::>(); + + let modified_splits = self + .change_set + .modified_manifest_extents_iterator(&node.id, &node.path) + .collect::>(); + + // ``modified_splits`` (i.e. splits used in this session) + // must be a subset of ``splits`` (the splits set in the config) + debug_assert!(modified_splits.is_subset(&splits.iter().collect::>())); + + for extent in splits.iter() { + if modified_splits.contains(extent) { + // this split was modified in this session, rewrite it completely + self.write_manifest_for_updated_chunks(node, extent) + .await? + .map(|new_ref| refs.insert(extent.clone(), vec![new_ref])); + } else { + // intersection of the current split with extents on disk + let on_disk_bbox = on_disk_extents + .iter() + .filter_map(|e| e.intersection(extent)) + .reduce(|a, b| a.union(&b)); + + // split was unmodified in this session. Let's look at the current manifests + // and see what we need to do with them + for old_ref in existing_manifests.iter() { + // Remember that the extents written to disk are the `from`:`to` ranges + // of populated chunks + match old_ref.extents.overlap_with(extent) { + Overlap::Complete => { + debug_assert!(on_disk_bbox.is_some()); + // Just propagate this ref again, no rewriting necessary + refs.entry(extent.clone()).or_default().push(old_ref.clone()); + // OK to unwrap here since this manifest file must exist in the old snapshot + #[allow(clippy::expect_used)] + self.manifest_files.insert( + old_snapshot.manifest_info(&old_ref.object_id).expect("logic bug. creating manifest file info for an existing manifest failed."), + ); + } + Overlap::Partial => { + // the splits have changed, but no refs in this split have been written in this session + // same as `if` block above + debug_assert!(on_disk_bbox.is_some()); + if let Some(new_ref) = self + .write_manifest_for_updated_chunks(node, extent) + .await? + { + refs.entry(extent.clone()).or_default().push(new_ref); + } + } + Overlap::None => { + // Nothing to do + } + }; + } + } + } - self.write_manifests_from_iterator(&node.id, updated_chunks, splits).await + // FIXME: Assert that bboxes in refs don't overlap + + self.manifest_refs + .entry(node.id.clone()) + .or_default() + .extend(refs.into_values().flatten()); + Ok(()) } /// Record the previous manifests for an array that was not modified in the session @@ -1636,69 +1806,65 @@ impl ManifestSplitDimCondition { } impl ManifestSplittingConfig { - pub fn get_split_sizes(&self, node: &NodeSnapshot) -> SessionResult { - match &node.node_data { - NodeData::Group => Err(SessionErrorKind::NotAnArray { - node: node.clone(), - message: "attempting to split manifest for group".to_string(), - } - .into()), - NodeData::Array { shape, dimension_names, .. } => { - let ndim = shape.len(); - let num_chunks = shape.num_chunks().collect::>(); - let mut edges: Vec> = - (0..ndim).map(|axis| vec![0, num_chunks[axis]]).collect(); - - // This is ugly but necessary to handle: - // - path: * - // manifest-split-size: - // - t : 10 - // - path: * - // manifest-split-size: - // - y : 2 - // which is now identical to: - // - path: * - // manifest-split-size: - // - t : 10 - // - y : 2 - let mut already_matched: HashSet = HashSet::new(); - - #[allow(clippy::expect_used)] - let split_sizes = self - .split_sizes + pub fn get_split_sizes( + &self, + path: &Path, + shape: &ArrayShape, + dimension_names: &Option>, + ) -> ManifestSplits { + let ndim = shape.len(); + let num_chunks = shape.num_chunks().collect::>(); + let mut edges: Vec> = + (0..ndim).map(|axis| vec![0, num_chunks[axis]]).collect(); + + // This is ugly but necessary to handle: + // - path: * + // manifest-split-size: + // - t : 10 + // - path: * + // manifest-split-size: + // - y : 2 + // which is now identical to: + // - path: * + // manifest-split-size: + // - t : 10 + // - y : 2 + let mut already_matched: HashSet = HashSet::new(); + + #[allow(clippy::expect_used)] + let split_sizes = self + .split_sizes + .clone() + .or_else(|| Self::default().split_sizes) + .expect("logic bug in grabbing split sizes from ManifestSplittingConfig"); + + for (condition, dim_specs) in split_sizes.iter() { + if condition.matches(path) { + let dimension_names = dimension_names .clone() - .or_else(|| Self::default().split_sizes) - .expect("logic bug"); - - for (condition, dim_specs) in split_sizes.iter() { - if condition.matches(&node.path) { - let dimension_names = dimension_names.clone().unwrap_or( - repeat_n(DimensionName::NotSpecified, ndim).collect(), - ); - for (axis, dimname) in itertools::enumerate(dimension_names) { - if already_matched.contains(&axis) { - continue; - } - for ManifestSplitDim { - condition: dim_condition, - num_chunks: split_size, - } in dim_specs.iter() - { - if dim_condition.matches(axis, dimname.clone().into()) { - edges[axis] = uniform_manifest_split_edges( - num_chunks[axis], - split_size, - ); - already_matched.insert(axis); - break; - }; - } - } + .unwrap_or(repeat_n(DimensionName::NotSpecified, ndim).collect()); + for (axis, dimname) in itertools::enumerate(dimension_names) { + if already_matched.contains(&axis) { + continue; + } + for ManifestSplitDim { + condition: dim_condition, + num_chunks: split_size, + } in dim_specs.iter() + { + if dim_condition.matches(axis, dimname.clone().into()) { + edges[axis] = uniform_manifest_split_edges( + num_chunks[axis], + split_size, + ); + already_matched.insert(axis); + break; + }; } } - Ok(ManifestSplits::from_edges(edges)) } } + ManifestSplits::from_edges(edges) } } @@ -1713,7 +1879,6 @@ async fn flush( let old_snapshot = flush_data.asset_manager.fetch_snapshot(flush_data.parent_id).await?; - let splitting_config = flush_data.config.manifest().splitting(); // We first go through all existing nodes to see if we need to rewrite any manifests @@ -1728,7 +1893,11 @@ async fn flush( continue; } - if flush_data.change_set.has_chunk_changes(node_id) { + if + // metadata change might have shrunk the array + flush_data.change_set.is_updated_array(node_id) + || flush_data.change_set.has_chunk_changes(node_id) + { trace!(path=%node.path, "Node has changes, writing a new manifest"); // Array wasn't deleted and has changes in this session // get the new node to handle changes in size, e.g. appends. @@ -1739,13 +1908,18 @@ async fn flush( &node.path, ) .await?; - let splits = splitting_config.get_split_sizes(&new_node)?; - flush_data.write_manifest_for_existing_node(&node, splits).await?; + if let NodeData::Array { manifests, .. } = new_node.node_data { + flush_data + .write_manifest_for_existing_node( + &node, + manifests, + old_snapshot.as_ref(), + ) + .await?; + } } else { trace!(path=%node.path, "Node has no changes, keeping the previous manifest"); // Array wasn't deleted but has no changes in this session - // FIXME: deal with the case of metadata shrinking an existing array, we should clear - // extra chunks that no longer fit in the array flush_data.copy_previous_manifest(&node, old_snapshot.as_ref()); } } @@ -1754,17 +1928,20 @@ async fn flush( for (node_path, node_id) in flush_data.change_set.new_arrays() { trace!(path=%node_path, "New node, writing a manifest"); - let node = get_node( - &flush_data.asset_manager, - flush_data.change_set, - flush_data.parent_id, - node_path, - ) - .await?; - let splits = splitting_config.get_split_sizes(&node)?; - flush_data.write_manifest_for_new_node(node_id, node_path, splits).await?; + flush_data.write_manifest_for_new_node(node_id, node_path).await?; } + // manifest_files & manifest_refs _must_ be consistent + debug_assert_eq!( + flush_data.manifest_files.iter().map(|x| x.id.clone()).collect::>(), + flush_data + .manifest_refs + .values() + .flatten() + .map(|x| x.object_id.clone()) + .collect::>(), + ); + trace!("Building new snapshot"); // gather and sort nodes: // this is a requirement for Snapshot::from_iter @@ -1875,14 +2052,14 @@ async fn do_commit( branch_name: &str, snapshot_id: &SnapshotId, change_set: &ChangeSet, - config: &RepositoryConfig, message: &str, properties: Option, + splits: &HashMap, ) -> SessionResult { info!(branch_name, old_snapshot_id=%snapshot_id, "Commit started"); let parent_snapshot = snapshot_id.clone(); let properties = properties.unwrap_or_default(); - let flush_data = FlushProcess::new(asset_manager, change_set, snapshot_id, config); + let flush_data = FlushProcess::new(asset_manager, change_set, snapshot_id, splits); let new_snapshot = flush(flush_data, message, properties).await?; debug!(branch_name, new_snapshot_id=%new_snapshot, "Updating branch"); @@ -1923,6 +2100,63 @@ async fn fetch_manifest( Ok(asset_manager.fetch_manifest(manifest_id, manifest_info.size_bytes).await?) } +/// Map the iterator to accumulate the extents of the chunks traversed +/// +/// As we are processing chunks to create a manifest, we need to keep track +/// of the extents of the manifests. This means, for each coordinate, we need +/// to record its minimum and maximum values. +/// +/// This very ugly code does that, without having to traverse the iterator twice. +/// It adapts the stream using [`StreamExt::map_ok`] and keeps a running min/max +/// for each coordinate. +/// +/// When the iterator is fully traversed, the min and max values will be +/// available in `from` and `to` arguments. +/// +/// Yes, this is horrible. +fn aggregate_extents<'a, T: std::fmt::Debug, E>( + from: &'a mut Vec, + to: &'a mut Vec, + it: impl Stream> + 'a, + extract_index: impl for<'b> Fn(&'b T) -> &'b ChunkIndices + 'a, +) -> impl Stream> + 'a { + // we initialize the destination with an empty array, because we don't know + // the dimensions of the array yet. On the first element we will re-initialize + *from = Vec::new(); + *to = Vec::new(); + it.map_ok(move |t| { + // these are the coordinates for the chunk + let idx = extract_index(&t); + + // we need to initialize the mins/maxes the first time + // we initialize with the value of the first element + // this obviously doesn't work for empty streams + // but we never generate manifests for them + if from.is_empty() { + *from = idx.0.clone(); + // important to remember that `to` is not inclusive, so we need +1 + *to = idx.0.iter().map(|n| n + 1).collect(); + } else { + // We need to iterate over coordinates, and update the + // minimum and maximum for each if needed + for (coord_idx, value) in idx.0.iter().enumerate() { + if let Some(from_current) = from.get_mut(coord_idx) { + if value < from_current { + *from_current = *value + } + } + if let Some(to_current) = to.get_mut(coord_idx) { + let range_value = value + 1; + if range_value > *to_current { + *to_current = range_value + } + } + } + } + t + }) +} + #[cfg(test)] #[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used)] mod tests { @@ -1934,6 +2168,7 @@ mod tests { use crate::{ ObjectStorage, Repository, + config::{ManifestConfig, ManifestSplitCondition}, conflicts::{ basic_solver::{BasicConflictSolver, VersionSelection}, detector::ConflictDetector, @@ -1942,7 +2177,9 @@ mod tests { refs::{Ref, fetch_tag}, repository::VersionInfo, storage::new_in_memory_storage, - strategies::{ShapeDim, empty_writable_session, node_paths, shapes_and_dims}, + strategies::{ + ShapeDim, chunk_indices, empty_writable_session, node_paths, shapes_and_dims, + }, }; use super::*; @@ -2108,20 +2345,52 @@ mod tests { prop_assert!(session.delete_group(path.clone()).await.is_ok()); } + #[proptest(async = "tokio")] + async fn test_aggregate_extents( + #[strategy(proptest::collection::vec(chunk_indices(3, 0..1_000_000), 1..50))] + indices: Vec, + ) { + let mut from = vec![]; + let mut to = vec![]; + + let expected_from = vec![ + indices.iter().map(|i| i.0[0]).min().unwrap(), + indices.iter().map(|i| i.0[1]).min().unwrap(), + indices.iter().map(|i| i.0[2]).min().unwrap(), + ]; + let expected_to = vec![ + indices.iter().map(|i| i.0[0]).max().unwrap() + 1, + indices.iter().map(|i| i.0[1]).max().unwrap() + 1, + indices.iter().map(|i| i.0[2]).max().unwrap() + 1, + ]; + + let _ = aggregate_extents( + &mut from, + &mut to, + stream::iter(indices.into_iter().map(Ok::)), + |idx| idx, + ) + .count() + .await; + + prop_assert_eq!(from, expected_from); + prop_assert_eq!(to, expected_to); + } + #[tokio::test] async fn test_which_split() -> Result<(), Box> { let splits = ManifestSplits::from_edges(vec![vec![0, 10, 20]]); - assert_eq!(splits.which(&ChunkIndices(vec![1])).unwrap(), 0); - assert_eq!(splits.which(&ChunkIndices(vec![11])).unwrap(), 1); + assert_eq!(splits.position(&ChunkIndices(vec![1])), Some(0)); + assert_eq!(splits.position(&ChunkIndices(vec![11])), Some(1)); let edges = vec![vec![0, 10, 20], vec![0, 10, 20]]; let splits = ManifestSplits::from_edges(edges); - assert_eq!(splits.which(&ChunkIndices(vec![1, 1])).unwrap(), 0); - assert_eq!(splits.which(&ChunkIndices(vec![1, 10])).unwrap(), 1); - assert_eq!(splits.which(&ChunkIndices(vec![1, 11])).unwrap(), 1); - assert!(splits.which(&ChunkIndices(vec![21, 21])).is_err()); + assert_eq!(splits.position(&ChunkIndices(vec![1, 1])), Some(0)); + assert_eq!(splits.position(&ChunkIndices(vec![1, 10])), Some(1)); + assert_eq!(splits.position(&ChunkIndices(vec![1, 11])), Some(1)); + assert!(splits.position(&ChunkIndices(vec![21, 21])).is_none()); Ok(()) } @@ -2172,6 +2441,122 @@ mod tests { Ok(()) } + #[tokio_test] + async fn test_repository_with_splits_and_resizes() -> Result<(), Box> { + let storage: Arc = new_in_memory_storage().await?; + + let split_sizes = Some(vec![( + ManifestSplitCondition::PathMatches { regex: r".*".to_string() }, + vec![ManifestSplitDim { + condition: ManifestSplitDimCondition::Any, + num_chunks: 2, + }], + )]); + + let man_config = ManifestConfig { + splitting: Some(ManifestSplittingConfig { split_sizes }), + ..ManifestConfig::default() + }; + + let repo = Repository::create( + Some(RepositoryConfig { + inline_chunk_threshold_bytes: Some(0), + manifest: Some(man_config), + ..Default::default() + }), + storage, + HashMap::new(), + ) + .await?; + let mut session = repo.writable_session("main").await?; + session.add_group(Path::root(), Bytes::copy_from_slice(b"")).await?; + + let array_path: Path = "/array".to_string().try_into().unwrap(); + let shape = ArrayShape::new(vec![(4, 1)]).unwrap(); + let dimension_names = Some(vec!["t".into()]); + let array_def = Bytes::from_static(br#"{"this":"other array"}"#); + + session + .add_array( + array_path.clone(), + shape.clone(), + dimension_names.clone(), + array_def.clone(), + ) + .await?; + + let bytes = Bytes::copy_from_slice(&42i8.to_be_bytes()); + for idx in vec![0, 2] { + let payload = session.get_chunk_writer()(bytes.clone()).await?; + session + .set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload)) + .await?; + } + session.commit("None", None).await?; + + let mut session = repo.writable_session("main").await?; + // This is how Zarr resizes + // first, delete any out of bounds chunks + session.set_chunk_ref(array_path.clone(), ChunkIndices(vec![2]), None).await?; + // second, update metadata + let shape2 = ArrayShape::new(vec![(2, 1)]).unwrap(); + session + .update_array( + &array_path, + shape2.clone(), + dimension_names.clone(), + array_def.clone(), + ) + .await?; + + assert!( + session.get_chunk_ref(&array_path, &ChunkIndices(vec![2])).await?.is_none() + ); + + // resize back to original shape + session + .update_array( + &array_path, + shape.clone(), + dimension_names.clone(), + array_def.clone(), + ) + .await?; + + // should still be deleted + assert!( + session.get_chunk_ref(&array_path, &ChunkIndices(vec![2])).await?.is_none() + ); + + // set another chunk in this split + let payload = session.get_chunk_writer()(bytes.clone()).await?; + session + .set_chunk_ref(array_path.clone(), ChunkIndices(vec![3]), Some(payload)) + .await?; + // should still be deleted + assert!( + session.get_chunk_ref(&array_path, &ChunkIndices(vec![2])).await?.is_none() + ); + // new ref should be present + assert!( + session.get_chunk_ref(&array_path, &ChunkIndices(vec![3])).await?.is_some() + ); + + // write manifests, check number of references in manifest + session.commit("updated", None).await?; + + // should still be deleted + assert!( + session.get_chunk_ref(&array_path, &ChunkIndices(vec![2])).await?.is_none() + ); + // new ref should be present + assert!( + session.get_chunk_ref(&array_path, &ChunkIndices(vec![3])).await?.is_some() + ); + + Ok(()) + } + #[tokio_test] async fn test_repository_with_updates() -> Result<(), Box> { let storage: Arc = new_in_memory_storage().await?; @@ -2357,13 +2742,13 @@ mod tests { let array_def3 = Bytes::from_static(br#"{"this":"more arrays"}"#); ds.update_array( - &array1_path.clone(), + &new_array_path.clone(), shape3.clone(), dimension_names3.clone(), array_def3.clone(), ) .await?; - let node = ds.get_node(&array1_path).await; + let node = ds.get_node(&new_array_path).await; if let Ok(NodeSnapshot { node_data: NodeData::Array { shape, .. }, .. }) = &node { assert_eq!(shape, &shape3); } else { @@ -2373,17 +2758,59 @@ mod tests { // set old array chunk and check them let data = Bytes::copy_from_slice(b"foo".repeat(512).as_slice()); let payload = ds.get_chunk_writer()(data.clone()).await?; - ds.set_chunk_ref(array1_path.clone(), ChunkIndices(vec![0, 0, 0]), Some(payload)) + ds.set_chunk_ref(new_array_path.clone(), ChunkIndices(vec![0]), Some(payload)) .await?; let chunk = get_chunk( - ds.get_chunk_reader( - &array1_path, - &ChunkIndices(vec![0, 0, 0]), - &ByteRange::ALL, - ) - .await - .unwrap(), + ds.get_chunk_reader(&new_array_path, &ChunkIndices(vec![0]), &ByteRange::ALL) + .await + .unwrap(), + ) + .await?; + assert_eq!(chunk, Some(data)); + + // reduce size of dimension + // // update old array zarr metadata and check it + let shape4 = ArrayShape::new(vec![(6, 3)]).unwrap(); + let array_def3 = Bytes::from_static(br#"{"this":"more arrays"}"#); + ds.update_array( + &new_array_path.clone(), + shape4.clone(), + dimension_names3.clone(), + array_def3.clone(), + ) + .await?; + let node = ds.get_node(&new_array_path).await; + if let Ok(NodeSnapshot { node_data: NodeData::Array { shape, .. }, .. }) = &node { + assert_eq!(shape, &shape4); + } else { + panic!("Failed to update zarr metadata"); + } + + // set old array chunk and check them + let data = Bytes::copy_from_slice(b"old".repeat(512).as_slice()); + let payload = ds.get_chunk_writer()(data.clone()).await?; + ds.set_chunk_ref(new_array_path.clone(), ChunkIndices(vec![0]), Some(payload)) + .await?; + let data = Bytes::copy_from_slice(b"new".repeat(512).as_slice()); + let payload = ds.get_chunk_writer()(data.clone()).await?; + ds.set_chunk_ref(new_array_path.clone(), ChunkIndices(vec![1]), Some(payload)) + .await?; + + let chunk = get_chunk( + ds.get_chunk_reader(&new_array_path, &ChunkIndices(vec![1]), &ByteRange::ALL) + .await + .unwrap(), + ) + .await?; + assert_eq!(chunk, Some(data.clone())); + + ds.commit("commit", Some(SnapshotProperties::default())).await?; + + let chunk = get_chunk( + ds.get_chunk_reader(&new_array_path, &ChunkIndices(vec![1]), &ByteRange::ALL) + .await + .unwrap(), ) .await?; assert_eq!(chunk, Some(data)); diff --git a/icechunk/src/strategies.rs b/icechunk/src/strategies.rs index 2b3a20310..d458a702c 100644 --- a/icechunk/src/strategies.rs +++ b/icechunk/src/strategies.rs @@ -7,11 +7,14 @@ use proptest::prelude::*; use proptest::{collection::vec, option, strategy::Strategy}; use crate::Repository; +use crate::format::manifest::ManifestExtents; use crate::format::snapshot::{ArrayShape, DimensionName}; use crate::format::{ChunkIndices, Path}; use crate::session::Session; use crate::storage::new_in_memory_storage; +const MAX_NDIM: usize = 4; + pub fn node_paths() -> impl Strategy { // FIXME: Add valid paths #[allow(clippy::expect_used)] @@ -66,7 +69,7 @@ pub struct ShapeDim { pub fn shapes_and_dims(max_ndim: Option) -> impl Strategy { // FIXME: ndim = 0 - let max_ndim = max_ndim.unwrap_or(4usize); + let max_ndim = max_ndim.unwrap_or(MAX_NDIM); vec(1u64..26u64, 1..max_ndim) .prop_flat_map(|shape| { let ndim = shape.len(); @@ -97,6 +100,15 @@ pub fn shapes_and_dims(max_ndim: Option) -> impl Strategy impl Strategy { + (vec(0u32..1000u32, ndim), vec(1u32..1000u32, ndim)).prop_map(|(start, delta)| { + let stop = std::iter::zip(start.iter(), delta.iter()) + .map(|(s, d)| s + d) + .collect::>(); + ManifestExtents::new(start.as_slice(), stop.as_slice()) + }) +} + //prop_compose! { // pub fn zarr_array_metadata()( // chunk_key_encoding: ChunkKeyEncoding, diff --git a/icechunk/tests/test_gc.rs b/icechunk/tests/test_gc.rs index 7e44f3a4c..ac5d53535 100644 --- a/icechunk/tests/test_gc.rs +++ b/icechunk/tests/test_gc.rs @@ -67,6 +67,7 @@ pub async fn do_test_gc( let storage_settings = storage.default_settings(); let shape = ArrayShape::new(vec![(1100, 1)]).unwrap(); + // intentionally small to create garbage let manifest_split_size = 10; let split_sizes = Some(vec![( ManifestSplitCondition::PathMatches { regex: r".*".to_string() }, @@ -108,10 +109,12 @@ pub async fn do_test_gc( let first_snap_id = ds.commit("first", None).await?; assert_eq!(storage.list_chunks(&storage_settings).await?.count().await, 1100); + assert_eq!(storage.list_manifests(&storage_settings).await?.count().await, 110); let mut ds = repo.writable_session("main").await?; // overwrite 10 chunks + // This will only overwrite one split manifest. for idx in 0..10 { let bytes = Bytes::copy_from_slice(&0i8.to_be_bytes()); let payload = ds.get_chunk_writer()(bytes.clone()).await?; @@ -120,6 +123,7 @@ pub async fn do_test_gc( } let second_snap_id = ds.commit("second", None).await?; assert_eq!(storage.list_chunks(&storage_settings).await?.count().await, 1110); + assert_eq!(storage.list_manifests(&storage_settings).await?.count().await, 111); // verify doing gc without dangling objects doesn't change the repo let now = Utc::now(); @@ -155,6 +159,7 @@ pub async fn do_test_gc( // we still have all the chunks assert_eq!(storage.list_chunks(&storage_settings).await?.count().await, 1110); + assert_eq!(storage.list_manifests(&storage_settings).await?.count().await, 111); let summary = garbage_collect( storage.as_ref(), @@ -164,7 +169,8 @@ pub async fn do_test_gc( ) .await?; assert_eq!(summary.chunks_deleted, 10); - assert_eq!(summary.manifests_deleted, 110); + // only one manifest was re-created, so there is only one garbage manifest + assert_eq!(summary.manifests_deleted, 1); assert_eq!(summary.snapshots_deleted, 1); assert!(summary.bytes_deleted > summary.chunks_deleted);