From 8e6223bd9569601472afe76bdecdc7a5f7125cd5 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Tue, 7 Oct 2025 15:38:31 -0400 Subject: [PATCH 01/15] Bump rust lib version --- Cargo.lock | 2 +- icechunk-python/Cargo.toml | 2 +- icechunk/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fd82e57db..a6aaef364 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1706,7 +1706,7 @@ dependencies = [ [[package]] name = "icechunk" -version = "0.3.12" +version = "0.3.13" dependencies = [ "anyhow", "assert_fs", diff --git a/icechunk-python/Cargo.toml b/icechunk-python/Cargo.toml index 5d44a0d36..ce1140dba 100644 --- a/icechunk-python/Cargo.toml +++ b/icechunk-python/Cargo.toml @@ -21,7 +21,7 @@ crate-type = ["cdylib"] bytes = "1.10.1" chrono = { version = "0.4.42" } futures = "0.3.31" -icechunk = { path = "../icechunk", version = "0.3.12", features = ["logs"] } +icechunk = { path = "../icechunk", version = "0.3.13", features = ["logs"] } itertools = "0.14.0" pyo3 = { version = "0.24.2", features = [ "chrono", diff --git a/icechunk/Cargo.toml b/icechunk/Cargo.toml index 927e423e8..4de44af03 100644 --- a/icechunk/Cargo.toml +++ b/icechunk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "icechunk" -version = "0.3.12" +version = "0.3.13" description = "Transactional storage engine for Zarr designed for use on cloud object storage" readme = "../README.md" repository = "https://github.com/earth-mover/icechunk" From 736ecd7b3ff5aa14ad2d3b93a387733035345726 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 22 Oct 2025 09:52:56 -0400 Subject: [PATCH 02/15] Add `get_virtual_ref` and `get_virtual_ref_async` functions to rust and python --- .cargo/config.toml | 2 + .../python/icechunk/_icechunk_python.pyi | 6 ++ icechunk-python/python/icechunk/store.py | 46 +++++++++++++++ icechunk-python/src/store.rs | 51 +++++++++++++++++ icechunk-python/tests/test_virtual_ref.py | 37 ++++++++++++ icechunk/src/store.rs | 18 ++++++ icechunk/tests/test_virtual_refs.rs | 56 ++++++++++++++----- 7 files changed, 201 insertions(+), 15 deletions(-) create mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 000000000..04e2819af --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags=["-D", "warnings", "-W", "unreachable-pub", "-W", "bare-trait-objects"] \ No newline at end of file diff --git a/icechunk-python/python/icechunk/_icechunk_python.pyi b/icechunk-python/python/icechunk/_icechunk_python.pyi index 59e22c12b..b2ddc536f 100644 --- a/icechunk-python/python/icechunk/_icechunk_python.pyi +++ b/icechunk-python/python/icechunk/_icechunk_python.pyi @@ -1730,6 +1730,12 @@ class PyStore: def supports_deletes(self) -> bool: ... async def set(self, key: str, value: bytes) -> None: ... async def set_if_not_exists(self, key: str, value: bytes) -> None: ... + def get_virtual_ref( + self, key: str + ) -> tuple[str, int, int, str | None, datetime.datetime | None] | None: ... + async def get_virtual_ref_async( + self, key: str + ) -> tuple[str, int, int, str | None, datetime.datetime | None] | None: ... def set_virtual_ref( self, key: str, diff --git a/icechunk-python/python/icechunk/store.py b/icechunk-python/python/icechunk/store.py index a0ab1a958..b225e5459 100644 --- a/icechunk-python/python/icechunk/store.py +++ b/icechunk-python/python/icechunk/store.py @@ -232,6 +232,52 @@ async def set_if_not_exists(self, key: str, value: Buffer) -> None: """ return await self._store.set_if_not_exists(key, value.to_bytes()) + def get_virtual_ref( + self, key: str + ) -> tuple[str, int, int, str | datetime | None] | None: + """Get a virtual reference for a chunk. + + Parameters + ---------- + key : str + The chunk to retrieve the reference for. This is the fully qualified zarr key eg: 'array/c/0/0/0' + + Returns + ------- + tuple[str, int, int, str | datetime | None] | None + A tuple of (location, offset, length, checksum) if the chunk has a virtual reference, None otherwise. + The checksum will be either an etag string or a datetime object. + """ + result = self._store.get_virtual_ref(key) + if result is None: + return None + location, offset, length, etag, last_modified = result + checksum = etag if etag is not None else last_modified + return (location, offset, length, checksum) + + async def get_virtual_ref_async( + self, key: str + ) -> tuple[str, int, int, str | datetime | None] | None: + """Get a virtual reference for a chunk asynchronously. + + Parameters + ---------- + key : str + The chunk to retrieve the reference for. This is the fully qualified zarr key eg: 'array/c/0/0/0' + + Returns + ------- + tuple[str, int, int, str | datetime | None] | None + A tuple of (location, offset, length, checksum) if the chunk has a virtual reference, None otherwise. + The checksum will be either an etag string or a datetime object. + """ + result = await self._store.get_virtual_ref_async(key) + if result is None: + return None + location, offset, length, etag, last_modified = result + checksum = etag if etag is not None else last_modified + return (location, offset, length, checksum) + def set_virtual_ref( self, key: str, diff --git a/icechunk-python/src/store.rs b/icechunk-python/src/store.rs index 329c95add..aebb3af34 100644 --- a/icechunk-python/src/store.rs +++ b/icechunk-python/src/store.rs @@ -303,6 +303,57 @@ impl PyStore { }) } + fn get_virtual_ref( + &self, + py: Python<'_>, + key: String, + ) -> PyIcechunkStoreResult, Option>)>> { + py.allow_threads(move || { + let store = Arc::clone(&self.0); + + pyo3_async_runtimes::tokio::get_runtime().block_on(async move { + let vref = store + .get_virtual_ref(&key) + .await + .map_err(PyIcechunkStoreError::from)?; + + Ok(vref.map(|vref| { + let location = vref.location.url().to_string(); + let (etag, last_modified) = match vref.checksum { + Some(Checksum::ETag(etag)) => (Some(etag.0), None), + Some(Checksum::LastModified(secs)) => (None, Some(chrono::DateTime::from_timestamp(secs.0 as i64, 0).unwrap_or_default())), + None => (None, None), + }; + (location, vref.offset, vref.length, etag, last_modified) + })) + }) + }) + } + + fn get_virtual_ref_async<'py>( + &'py self, + py: Python<'py>, + key: String, + ) -> PyResult> { + let store = Arc::clone(&self.0); + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let vref = store + .get_virtual_ref(&key) + .await + .map_err(PyIcechunkStoreError::from)?; + + Ok(vref.map(|vref| { + let location = vref.location.url().to_string(); + let (etag, last_modified) = match vref.checksum { + Some(Checksum::ETag(etag)) => (Some(etag.0), None), + Some(Checksum::LastModified(secs)) => (None, Some(chrono::DateTime::from_timestamp(secs.0 as i64, 0).unwrap_or_default())), + None => (None, None), + }; + (location, vref.offset, vref.length, etag, last_modified) + })) + }) + } + #[allow(clippy::too_many_arguments)] fn set_virtual_ref( &self, diff --git a/icechunk-python/tests/test_virtual_ref.py b/icechunk-python/tests/test_virtual_ref.py index 759dfb781..c25ed8ca2 100644 --- a/icechunk-python/tests/test_virtual_ref.py +++ b/icechunk-python/tests/test_virtual_ref.py @@ -182,6 +182,21 @@ async def test_write_minio_virtual_refs(use_async) -> None: validate_container=True, ) + vref_0_0_0 = await store.get_virtual_ref_async("c/0/0/0") + assert vref_0_0_0 is not None + assert vref_0_0_0[0] == f"s3://testbucket/{prefix}/chunk-1" + assert vref_0_0_0[1] == 0 + assert vref_0_0_0[2] == 4 + + vref_0_0_1 = await store.get_virtual_ref_async("c/0/0/1") + assert vref_0_0_1 is not None + assert vref_0_0_1[0] == f"s3://testbucket/{prefix}/chunk-2" + assert vref_0_0_1[1] == 1 + assert vref_0_0_1[2] == 4 + + vref_none = await store.get_virtual_ref_async("c/0/0/3") + assert vref_none is None + buffer_prototype = zarr.core.buffer.default_buffer_prototype() first = await store.get("c/0/0/0", prototype=buffer_prototype) @@ -282,6 +297,16 @@ async def test_public_virtual_refs( length=288, ) + if use_async: + vref = await store.get_virtual_ref_async("year/c/0") + else: + vref = store.get_virtual_ref("year/c/0") + + assert vref is not None + assert vref[0] == file_path + assert vref[1] == 22306 + assert vref[2] == 288 + nodes = [n async for n in store.list()] assert "year/c/0" in nodes @@ -392,6 +417,12 @@ def test_error_on_nonexisting_virtual_chunk_container() -> None: ], ) + vref = store.get_virtual_ref("c/0") + assert vref is not None + assert vref[0] == "file:///foo" + assert vref[1] == 0 + assert vref[2] == 4 + with pytest.raises( IcechunkError, match="file:///foo.* edit the repository configuration" ): @@ -428,6 +459,12 @@ def test_error_on_non_authorized_virtual_chunk_container() -> None: ], ) + vref = store.get_virtual_ref("c/0") + assert vref is not None + assert vref[0] == "file:///foo/bar" + assert vref[1] == 0 + assert vref[2] == 4 + with pytest.raises(IcechunkError, match="file:///foo.*authorize"): array[0] diff --git a/icechunk/src/store.rs b/icechunk/src/store.rs index bc70e326a..e79671dc8 100644 --- a/icechunk/src/store.rs +++ b/icechunk/src/store.rs @@ -361,6 +361,24 @@ impl Store { } // alternate API would take array path, and a mapping from string coord to ChunkPayload + #[instrument(skip(self))] + pub async fn get_virtual_ref(&self, key: &str) -> StoreResult> { + match Key::parse(key)? { + Key::Chunk { node_path, coords } => { + let session = self.session.read().await; + match session.get_chunk_ref(&node_path, &coords).await? { + Some(ChunkPayload::Virtual(vref)) => Ok(Some(vref)), + Some(ChunkPayload::Ref(_)) | Some(ChunkPayload::Inline(_)) => Ok(None), + None => Ok(None), + } + } + Key::Metadata { .. } | Key::ZarrV2(_) => Err(StoreErrorKind::NotAllowed( + format!("use .get to read metadata for key {key}"), + ) + .into()), + } + } + #[instrument(skip(self))] pub async fn set_virtual_ref( &self, diff --git a/icechunk/tests/test_virtual_refs.rs b/icechunk/tests/test_virtual_refs.rs index 6372fd475..041f75e8d 100644 --- a/icechunk/tests/test_virtual_refs.rs +++ b/icechunk/tests/test_virtual_refs.rs @@ -521,8 +521,17 @@ async fn test_zarr_store_virtual_refs_minio_set_and_get() length: 5, checksum: None, }; - store.set_virtual_ref("array/c/0/0/0", ref1, false).await?; - store.set_virtual_ref("array/c/0/0/1", ref2, false).await?; + store.set_virtual_ref("array/c/0/0/0", ref1.clone(), false).await?; + store.set_virtual_ref("array/c/0/0/1", ref2.clone(), false).await?; + + let retrieved_ref1 = store.get_virtual_ref("array/c/0/0/0").await?; + assert_eq!(retrieved_ref1, Some(ref1.clone())); + + let retrieved_ref2 = store.get_virtual_ref("array/c/0/0/1").await?; + assert_eq!(retrieved_ref2, Some(ref2.clone())); + + let non_existent = store.get_virtual_ref("array/c/0/0/2").await?; + assert_eq!(non_existent, None); assert_eq!(store.get("array/c/0/0/0", &ByteRange::ALL).await?, bytes1,); assert_eq!( @@ -654,11 +663,17 @@ async fn test_zarr_store_virtual_refs_from_public_gcs() checksum: Some(Checksum::ETag(ETag("bad".to_string()))), }; - store.set_virtual_ref("year/c/0", ref1, false).await?; - store.set_virtual_ref("year/c/1", ref2, false).await?; - store.set_virtual_ref("year/c/2", ref3, false).await?; - store.set_virtual_ref("year/c/3", ref_expired, false).await?; - store.set_virtual_ref("year/c/4", ref_bad_tag, false).await?; + store.set_virtual_ref("year/c/0", ref1.clone(), false).await?; + store.set_virtual_ref("year/c/1", ref2.clone(), false).await?; + store.set_virtual_ref("year/c/2", ref3.clone(), false).await?; + store.set_virtual_ref("year/c/3", ref_expired.clone(), false).await?; + store.set_virtual_ref("year/c/4", ref_bad_tag.clone(), false).await?; + + assert_eq!(store.get_virtual_ref("year/c/0").await?, Some(ref1)); + assert_eq!(store.get_virtual_ref("year/c/1").await?, Some(ref2)); + assert_eq!(store.get_virtual_ref("year/c/2").await?, Some(ref3)); + assert_eq!(store.get_virtual_ref("year/c/3").await?, Some(ref_expired)); + assert_eq!(store.get_virtual_ref("year/c/4").await?, Some(ref_bad_tag)); // FIXME: enable this once object_store can access public buckets without credentials // otherwise we get an error in GHA @@ -799,9 +814,13 @@ async fn test_zarr_store_with_multiple_virtual_chunk_containers() length: 5, checksum: Some(Checksum::LastModified(old_timestamp)), }; - store.set_virtual_ref("array/c/0/0/0", ref1, false).await?; - store.set_virtual_ref("array/c/0/0/1", ref2, false).await?; - store.set_virtual_ref("array/c/1/0/0", ref3, false).await?; + store.set_virtual_ref("array/c/0/0/0", ref1.clone(), false).await?; + store.set_virtual_ref("array/c/0/0/1", ref2.clone(), false).await?; + store.set_virtual_ref("array/c/1/0/0", ref3.clone(), false).await?; + + assert_eq!(store.get_virtual_ref("array/c/0/0/0").await?, Some(ref1)); + assert_eq!(store.get_virtual_ref("array/c/0/0/1").await?, Some(ref2)); + assert_eq!(store.get_virtual_ref("array/c/1/0/0").await?, Some(ref3)); // set virtual refs in local filesystem let chunk_1 = chunk_dir.path().join("chunk-1").to_str().unwrap().to_owned(); @@ -847,9 +866,13 @@ async fn test_zarr_store_with_multiple_virtual_chunk_containers() checksum: Some(Checksum::ETag(ETag(String::from("invalid etag")))), }; - store.set_virtual_ref("array/c/0/0/2", ref1, false).await?; - store.set_virtual_ref("array/c/0/0/3", ref2, false).await?; - store.set_virtual_ref("array/c/1/0/1", ref3, false).await?; + store.set_virtual_ref("array/c/0/0/2", ref1.clone(), false).await?; + store.set_virtual_ref("array/c/0/0/3", ref2.clone(), false).await?; + store.set_virtual_ref("array/c/1/0/1", ref3.clone(), false).await?; + + assert_eq!(store.get_virtual_ref("array/c/0/0/2").await?, Some(ref1)); + assert_eq!(store.get_virtual_ref("array/c/0/0/3").await?, Some(ref2)); + assert_eq!(store.get_virtual_ref("array/c/1/0/1").await?, Some(ref3)); // set a virtual ref in a public bucket let public_ref = VirtualChunkRef { @@ -869,8 +892,11 @@ async fn test_zarr_store_with_multiple_virtual_chunk_containers() checksum: Some(Checksum::ETag(ETag(String::from("invalid etag")))), }; - store.set_virtual_ref("array/c/1/1/1", public_ref, false).await?; - store.set_virtual_ref("array/c/1/1/2", public_modified_ref, false).await?; + store.set_virtual_ref("array/c/1/1/1", public_ref.clone(), false).await?; + store.set_virtual_ref("array/c/1/1/2", public_modified_ref.clone(), false).await?; + + assert_eq!(store.get_virtual_ref("array/c/1/1/1").await?, Some(public_ref)); + assert_eq!(store.get_virtual_ref("array/c/1/1/2").await?, Some(public_modified_ref)); // assert we can find all the virtual chunks From 8ed3564374e8b6b8fda6be7adaeb4e8beb531031 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 22 Oct 2025 10:10:53 -0400 Subject: [PATCH 03/15] Add all virtual refs accessor to session --- .../python/icechunk/_icechunk_python.pyi | 6 ++ icechunk-python/python/icechunk/session.py | 45 +++++++++++ icechunk-python/src/session.rs | 78 ++++++++++++++++++- icechunk-python/tests/test_virtual_ref.py | 18 +++++ icechunk/src/session.rs | 20 +++++ icechunk/tests/test_virtual_refs.rs | 39 ++++++++-- 6 files changed, 200 insertions(+), 6 deletions(-) diff --git a/icechunk-python/python/icechunk/_icechunk_python.pyi b/icechunk-python/python/icechunk/_icechunk_python.pyi index b2ddc536f..6665f545b 100644 --- a/icechunk-python/python/icechunk/_icechunk_python.pyi +++ b/icechunk-python/python/icechunk/_icechunk_python.pyi @@ -1667,6 +1667,12 @@ class PySession: def discard_changes(self) -> None: ... def all_virtual_chunk_locations(self) -> list[str]: ... async def all_virtual_chunk_locations_async(self) -> list[str]: ... + def all_virtual_refs( + self, + ) -> list[tuple[str, str, int, int, str | None, datetime.datetime | None]]: ... + async def all_virtual_refs_async( + self, + ) -> list[tuple[str, str, int, int, str | None, datetime.datetime | None]]: ... def chunk_coordinates( self, array_path: str, batch_size: int ) -> AsyncIterator[list[list[int]]]: ... diff --git a/icechunk-python/python/icechunk/session.py b/icechunk-python/python/icechunk/session.py index 2bf9f4a82..cc30bdde6 100644 --- a/icechunk-python/python/icechunk/session.py +++ b/icechunk-python/python/icechunk/session.py @@ -1,6 +1,7 @@ import contextlib import warnings from collections.abc import AsyncIterator, Generator +from datetime import datetime from typing import Any, NoReturn, Self from icechunk import ( @@ -174,6 +175,50 @@ async def all_virtual_chunk_locations_async(self) -> list[str]: """ return await self._session.all_virtual_chunk_locations_async() + def all_virtual_refs( + self, + ) -> list[tuple[str, str, int, int, str | datetime | None]]: + """ + Return all virtual references in the session. + + Returns + ------- + list[tuple[str, str, int, int, str | datetime | None]] + A list of tuples containing: + - zarr_key: The full zarr key (e.g., "array/c/0/0/1") + - location: The storage location URL + - offset: Byte offset in the file + - length: Length in bytes + - checksum: Either an etag string or datetime, or None + """ + result = self._session.all_virtual_refs() + return [ + (key, location, offset, length, etag if etag is not None else last_modified) + for key, location, offset, length, etag, last_modified in result + ] + + async def all_virtual_refs_async( + self, + ) -> list[tuple[str, str, int, int, str | datetime | None]]: + """ + Return all virtual references in the session (async version). + + Returns + ------- + list[tuple[str, str, int, int, str | datetime | None]] + A list of tuples containing: + - zarr_key: The full zarr key (e.g., "array/c/0/0/1") + - location: The storage location URL + - offset: Byte offset in the file + - length: Length in bytes + - checksum: Either an etag string or datetime, or None + """ + result = await self._session.all_virtual_refs_async() + return [ + (key, location, offset, length, etag if etag is not None else last_modified) + for key, location, offset, length, etag, last_modified in result + ] + async def chunk_coordinates( self, array_path: str, batch_size: int = 1000 ) -> AsyncIterator[tuple[int, ...]]: diff --git a/icechunk-python/src/session.rs b/icechunk-python/src/session.rs index 134f46199..2b0d5d57e 100644 --- a/icechunk-python/src/session.rs +++ b/icechunk-python/src/session.rs @@ -1,8 +1,14 @@ use std::{borrow::Cow, ops::Deref, sync::Arc}; use async_stream::try_stream; +use chrono::Utc; use futures::{StreamExt, TryStreamExt}; -use icechunk::{Store, session::Session}; +use icechunk::{ + Store, + format::manifest::{Checksum, SecondsSinceEpoch}, + session::Session, + storage::ETag, +}; use pyo3::{prelude::*, types::PyType}; use tokio::sync::{Mutex, RwLock}; @@ -155,6 +161,76 @@ impl PySession { }) } + pub fn all_virtual_refs( + &self, + py: Python<'_>, + ) -> PyResult, Option>)>> + { + py.allow_threads(move || { + let session = self.0.blocking_read(); + + pyo3_async_runtimes::tokio::get_runtime().block_on(async move { + let res: Vec<_> = session + .all_virtual_refs() + .await + .map_err(PyIcechunkStoreError::SessionError)? + .map(|result| { + result.map(|(key, vref)| { + let location = vref.location.url().to_string(); + let (etag, last_modified) = match vref.checksum { + Some(Checksum::ETag(etag)) => (Some(etag.0), None), + Some(Checksum::LastModified(secs)) => { + (None, Some(chrono::DateTime::from_timestamp(secs.0 as i64, 0).unwrap_or_default())) + } + None => (None, None), + }; + (key, location, vref.offset, vref.length, etag, last_modified) + }) + }) + .try_collect() + .await + .map_err(PyIcechunkStoreError::SessionError)?; + + Ok(res) + }) + }) + } + + pub fn all_virtual_refs_async<'py>( + &'py self, + py: Python<'py>, + ) -> PyResult> { + let session = self.0.clone(); + pyo3_async_runtimes::tokio::future_into_py::< + _, + Vec<(String, String, u64, u64, Option, Option>)>, + >(py, async move { + let session = session.read().await; + let res: Vec<_> = session + .all_virtual_refs() + .await + .map_err(PyIcechunkStoreError::SessionError)? + .map(|result| { + result.map(|(key, vref)| { + let location = vref.location.url().to_string(); + let (etag, last_modified) = match vref.checksum { + Some(Checksum::ETag(etag)) => (Some(etag.0), None), + Some(Checksum::LastModified(secs)) => { + (None, Some(chrono::DateTime::from_timestamp(secs.0 as i64, 0).unwrap_or_default())) + } + None => (None, None), + }; + (key, location, vref.offset, vref.length, etag, last_modified) + }) + }) + .try_collect() + .await + .map_err(PyIcechunkStoreError::SessionError)?; + + Ok(res) + }) + } + /// Return vectors of coordinates, up to batch_size in length. /// /// We batch the results to make it faster. diff --git a/icechunk-python/tests/test_virtual_ref.py b/icechunk-python/tests/test_virtual_ref.py index c25ed8ca2..87d35bb62 100644 --- a/icechunk-python/tests/test_virtual_ref.py +++ b/icechunk-python/tests/test_virtual_ref.py @@ -197,6 +197,13 @@ async def test_write_minio_virtual_refs(use_async) -> None: vref_none = await store.get_virtual_ref_async("c/0/0/3") assert vref_none is None + all_refs = await session.all_virtual_refs_async() + all_refs_dict = {key: (location, offset, length) for key, location, offset, length, _ in all_refs} + + assert len(all_refs_dict) >= 2 + assert all_refs_dict.get("c/0/0/0") == (f"s3://testbucket/{prefix}/chunk-1", 0, 4) + assert all_refs_dict.get("c/0/0/1") == (f"s3://testbucket/{prefix}/chunk-2", 1, 4) + buffer_prototype = zarr.core.buffer.default_buffer_prototype() first = await store.get("c/0/0/0", prototype=buffer_prototype) @@ -307,6 +314,17 @@ async def test_public_virtual_refs( assert vref[1] == 22306 assert vref[2] == 288 + if use_async: + all_refs = await session.all_virtual_refs_async() + else: + all_refs = session.all_virtual_refs() + + assert len(all_refs) == 1 + assert all_refs[0][0] == "year/c/0" + assert all_refs[0][1] == file_path + assert all_refs[0][2] == 22306 + assert all_refs[0][3] == 288 + nodes = [n async for n in store.list()] assert "year/c/0" in nodes diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index 8f09f15f1..4ebe00b83 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -933,6 +933,26 @@ impl Session { Ok(stream) } + #[instrument(skip(self))] + pub async fn all_virtual_refs( + &self, + ) -> SessionResult> + '_> { + let stream = self.all_chunks().await?.try_filter_map(|(path, info)| { + match info.payload { + ChunkPayload::Virtual(reference) => { + let coords = info.coord.0.iter().map(|c| c.to_string()).join("/"); + let key = [path.to_string()[1..].to_string(), "c".to_string(), coords] + .iter() + .filter(|s| !s.is_empty()) + .join("/"); + ready(Ok(Some((key, reference)))) + } + _ => ready(Ok(None)), + } + }); + Ok(stream) + } + /// Discard all uncommitted changes and return them as a `ChangeSet` #[instrument(skip(self))] pub fn discard_changes(&mut self) -> ChangeSet { diff --git a/icechunk/tests/test_virtual_refs.rs b/icechunk/tests/test_virtual_refs.rs index 041f75e8d..5c2aff6eb 100644 --- a/icechunk/tests/test_virtual_refs.rs +++ b/icechunk/tests/test_virtual_refs.rs @@ -533,6 +533,19 @@ async fn test_zarr_store_virtual_refs_minio_set_and_get() let non_existent = store.get_virtual_ref("array/c/0/0/2").await?; assert_eq!(non_existent, None); + let session = store.session(); + let all_refs: HashMap = session + .read() + .await + .all_virtual_refs() + .await? + .try_collect() + .await?; + + assert_eq!(all_refs.len(), 2); + assert_eq!(all_refs.get("array/c/0/0/0"), Some(&ref1)); + assert_eq!(all_refs.get("array/c/0/0/1"), Some(&ref2)); + assert_eq!(store.get("array/c/0/0/0", &ByteRange::ALL).await?, bytes1,); assert_eq!( store.get("array/c/0/0/1", &ByteRange::ALL).await?, @@ -669,11 +682,27 @@ async fn test_zarr_store_virtual_refs_from_public_gcs() store.set_virtual_ref("year/c/3", ref_expired.clone(), false).await?; store.set_virtual_ref("year/c/4", ref_bad_tag.clone(), false).await?; - assert_eq!(store.get_virtual_ref("year/c/0").await?, Some(ref1)); - assert_eq!(store.get_virtual_ref("year/c/1").await?, Some(ref2)); - assert_eq!(store.get_virtual_ref("year/c/2").await?, Some(ref3)); - assert_eq!(store.get_virtual_ref("year/c/3").await?, Some(ref_expired)); - assert_eq!(store.get_virtual_ref("year/c/4").await?, Some(ref_bad_tag)); + assert_eq!(store.get_virtual_ref("year/c/0").await?, Some(ref1.clone())); + assert_eq!(store.get_virtual_ref("year/c/1").await?, Some(ref2.clone())); + assert_eq!(store.get_virtual_ref("year/c/2").await?, Some(ref3.clone())); + assert_eq!(store.get_virtual_ref("year/c/3").await?, Some(ref_expired.clone())); + assert_eq!(store.get_virtual_ref("year/c/4").await?, Some(ref_bad_tag.clone())); + + let session = store.session(); + let all_refs: HashMap = session + .read() + .await + .all_virtual_refs() + .await? + .try_collect() + .await?; + + assert_eq!(all_refs.len(), 5); + assert_eq!(all_refs.get("year/c/0"), Some(&ref1)); + assert_eq!(all_refs.get("year/c/1"), Some(&ref2)); + assert_eq!(all_refs.get("year/c/2"), Some(&ref3)); + assert_eq!(all_refs.get("year/c/3"), Some(&ref_expired)); + assert_eq!(all_refs.get("year/c/4"), Some(&ref_bad_tag)); // FIXME: enable this once object_store can access public buckets without credentials // otherwise we get an error in GHA From a87bba5f4f38e443b22ba0c6b10d8b8ca0e450fd Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 22 Oct 2025 10:17:36 -0400 Subject: [PATCH 04/15] Lint fixes --- icechunk-python/src/session.rs | 88 ++++++++++++++++------------- icechunk-python/src/store.rs | 35 +++++++++--- icechunk/src/session.rs | 19 ++++--- icechunk/src/store.rs | 9 ++- icechunk/tests/test_virtual_refs.rs | 22 ++------ 5 files changed, 97 insertions(+), 76 deletions(-) diff --git a/icechunk-python/src/session.rs b/icechunk-python/src/session.rs index 2b0d5d57e..4ac651caf 100644 --- a/icechunk-python/src/session.rs +++ b/icechunk-python/src/session.rs @@ -3,12 +3,7 @@ use std::{borrow::Cow, ops::Deref, sync::Arc}; use async_stream::try_stream; use chrono::Utc; use futures::{StreamExt, TryStreamExt}; -use icechunk::{ - Store, - format::manifest::{Checksum, SecondsSinceEpoch}, - session::Session, - storage::ETag, -}; +use icechunk::{Store, format::manifest::Checksum, session::Session}; use pyo3::{prelude::*, types::PyType}; use tokio::sync::{Mutex, RwLock}; @@ -21,6 +16,9 @@ use crate::{ streams::PyAsyncGenerator, }; +type VirtualRefTuple = + (String, String, u64, u64, Option, Option>); + #[pyclass] #[derive(Clone)] pub struct PySession(pub Arc>); @@ -161,11 +159,7 @@ impl PySession { }) } - pub fn all_virtual_refs( - &self, - py: Python<'_>, - ) -> PyResult, Option>)>> - { + pub fn all_virtual_refs(&self, py: Python<'_>) -> PyResult> { py.allow_threads(move || { let session = self.0.blocking_read(); @@ -179,9 +173,16 @@ impl PySession { let location = vref.location.url().to_string(); let (etag, last_modified) = match vref.checksum { Some(Checksum::ETag(etag)) => (Some(etag.0), None), - Some(Checksum::LastModified(secs)) => { - (None, Some(chrono::DateTime::from_timestamp(secs.0 as i64, 0).unwrap_or_default())) - } + Some(Checksum::LastModified(secs)) => ( + None, + Some( + chrono::DateTime::from_timestamp( + secs.0 as i64, + 0, + ) + .unwrap_or_default(), + ), + ), None => (None, None), }; (key, location, vref.offset, vref.length, etag, last_modified) @@ -201,34 +202,41 @@ impl PySession { py: Python<'py>, ) -> PyResult> { let session = self.0.clone(); - pyo3_async_runtimes::tokio::future_into_py::< - _, - Vec<(String, String, u64, u64, Option, Option>)>, - >(py, async move { - let session = session.read().await; - let res: Vec<_> = session - .all_virtual_refs() - .await - .map_err(PyIcechunkStoreError::SessionError)? - .map(|result| { - result.map(|(key, vref)| { - let location = vref.location.url().to_string(); - let (etag, last_modified) = match vref.checksum { - Some(Checksum::ETag(etag)) => (Some(etag.0), None), - Some(Checksum::LastModified(secs)) => { - (None, Some(chrono::DateTime::from_timestamp(secs.0 as i64, 0).unwrap_or_default())) - } - None => (None, None), - }; - (key, location, vref.offset, vref.length, etag, last_modified) + pyo3_async_runtimes::tokio::future_into_py::<_, Vec>( + py, + async move { + let session = session.read().await; + let res: Vec<_> = session + .all_virtual_refs() + .await + .map_err(PyIcechunkStoreError::SessionError)? + .map(|result| { + result.map(|(key, vref)| { + let location = vref.location.url().to_string(); + let (etag, last_modified) = match vref.checksum { + Some(Checksum::ETag(etag)) => (Some(etag.0), None), + Some(Checksum::LastModified(secs)) => ( + None, + Some( + chrono::DateTime::from_timestamp( + secs.0 as i64, + 0, + ) + .unwrap_or_default(), + ), + ), + None => (None, None), + }; + (key, location, vref.offset, vref.length, etag, last_modified) + }) }) - }) - .try_collect() - .await - .map_err(PyIcechunkStoreError::SessionError)?; + .try_collect() + .await + .map_err(PyIcechunkStoreError::SessionError)?; - Ok(res) - }) + Ok(res) + }, + ) } /// Return vectors of coordinates, up to batch_size in length. diff --git a/icechunk-python/src/store.rs b/icechunk-python/src/store.rs index aebb3af34..2e3420521 100644 --- a/icechunk-python/src/store.rs +++ b/icechunk-python/src/store.rs @@ -29,6 +29,13 @@ use crate::{ }; type KeyRanges = Vec<(String, (Option, Option))>; +type VirtualRefResult = Option<( + String, + ChunkOffset, + ChunkLength, + Option, + Option>, +)>; #[derive(FromPyObject, Clone, Debug)] enum ChecksumArgument { @@ -307,7 +314,7 @@ impl PyStore { &self, py: Python<'_>, key: String, - ) -> PyIcechunkStoreResult, Option>)>> { + ) -> PyIcechunkStoreResult { py.allow_threads(move || { let store = Arc::clone(&self.0); @@ -316,12 +323,18 @@ impl PyStore { .get_virtual_ref(&key) .await .map_err(PyIcechunkStoreError::from)?; - + Ok(vref.map(|vref| { let location = vref.location.url().to_string(); let (etag, last_modified) = match vref.checksum { Some(Checksum::ETag(etag)) => (Some(etag.0), None), - Some(Checksum::LastModified(secs)) => (None, Some(chrono::DateTime::from_timestamp(secs.0 as i64, 0).unwrap_or_default())), + Some(Checksum::LastModified(secs)) => ( + None, + Some( + chrono::DateTime::from_timestamp(secs.0 as i64, 0) + .unwrap_or_default(), + ), + ), None => (None, None), }; (location, vref.offset, vref.length, etag, last_modified) @@ -337,16 +350,20 @@ impl PyStore { ) -> PyResult> { let store = Arc::clone(&self.0); pyo3_async_runtimes::tokio::future_into_py(py, async move { - let vref = store - .get_virtual_ref(&key) - .await - .map_err(PyIcechunkStoreError::from)?; - + let vref = + store.get_virtual_ref(&key).await.map_err(PyIcechunkStoreError::from)?; + Ok(vref.map(|vref| { let location = vref.location.url().to_string(); let (etag, last_modified) = match vref.checksum { Some(Checksum::ETag(etag)) => (Some(etag.0), None), - Some(Checksum::LastModified(secs)) => (None, Some(chrono::DateTime::from_timestamp(secs.0 as i64, 0).unwrap_or_default())), + Some(Checksum::LastModified(secs)) => ( + None, + Some( + chrono::DateTime::from_timestamp(secs.0 as i64, 0) + .unwrap_or_default(), + ), + ), None => (None, None), }; (location, vref.offset, vref.length, etag, last_modified) diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index 4ebe00b83..741fe9cc2 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -936,20 +936,21 @@ impl Session { #[instrument(skip(self))] pub async fn all_virtual_refs( &self, - ) -> SessionResult> + '_> { - let stream = self.all_chunks().await?.try_filter_map(|(path, info)| { - match info.payload { + ) -> SessionResult> + '_> + { + let stream = + self.all_chunks().await?.try_filter_map(|(path, info)| match info.payload { ChunkPayload::Virtual(reference) => { let coords = info.coord.0.iter().map(|c| c.to_string()).join("/"); - let key = [path.to_string()[1..].to_string(), "c".to_string(), coords] - .iter() - .filter(|s| !s.is_empty()) - .join("/"); + let key = + [path.to_string()[1..].to_string(), "c".to_string(), coords] + .iter() + .filter(|s| !s.is_empty()) + .join("/"); ready(Ok(Some((key, reference)))) } _ => ready(Ok(None)), - } - }); + }); Ok(stream) } diff --git a/icechunk/src/store.rs b/icechunk/src/store.rs index e79671dc8..4ca7616f2 100644 --- a/icechunk/src/store.rs +++ b/icechunk/src/store.rs @@ -362,13 +362,18 @@ impl Store { // alternate API would take array path, and a mapping from string coord to ChunkPayload #[instrument(skip(self))] - pub async fn get_virtual_ref(&self, key: &str) -> StoreResult> { + pub async fn get_virtual_ref( + &self, + key: &str, + ) -> StoreResult> { match Key::parse(key)? { Key::Chunk { node_path, coords } => { let session = self.session.read().await; match session.get_chunk_ref(&node_path, &coords).await? { Some(ChunkPayload::Virtual(vref)) => Ok(Some(vref)), - Some(ChunkPayload::Ref(_)) | Some(ChunkPayload::Inline(_)) => Ok(None), + Some(ChunkPayload::Ref(_)) | Some(ChunkPayload::Inline(_)) => { + Ok(None) + } None => Ok(None), } } diff --git a/icechunk/tests/test_virtual_refs.rs b/icechunk/tests/test_virtual_refs.rs index 5c2aff6eb..55481fdd7 100644 --- a/icechunk/tests/test_virtual_refs.rs +++ b/icechunk/tests/test_virtual_refs.rs @@ -534,14 +534,9 @@ async fn test_zarr_store_virtual_refs_minio_set_and_get() assert_eq!(non_existent, None); let session = store.session(); - let all_refs: HashMap = session - .read() - .await - .all_virtual_refs() - .await? - .try_collect() - .await?; - + let all_refs: HashMap = + session.read().await.all_virtual_refs().await?.try_collect().await?; + assert_eq!(all_refs.len(), 2); assert_eq!(all_refs.get("array/c/0/0/0"), Some(&ref1)); assert_eq!(all_refs.get("array/c/0/0/1"), Some(&ref2)); @@ -689,14 +684,9 @@ async fn test_zarr_store_virtual_refs_from_public_gcs() assert_eq!(store.get_virtual_ref("year/c/4").await?, Some(ref_bad_tag.clone())); let session = store.session(); - let all_refs: HashMap = session - .read() - .await - .all_virtual_refs() - .await? - .try_collect() - .await?; - + let all_refs: HashMap = + session.read().await.all_virtual_refs().await?.try_collect().await?; + assert_eq!(all_refs.len(), 5); assert_eq!(all_refs.get("year/c/0"), Some(&ref1)); assert_eq!(all_refs.get("year/c/1"), Some(&ref2)); From 857afcef3d72c5a75f0a05e2fa9a944692d9daff Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 22 Oct 2025 10:25:47 -0400 Subject: [PATCH 05/15] lint --- .cargo/config.toml | 2 +- icechunk-python/tests/test_virtual_ref.py | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 04e2819af..7ea258ace 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,2 +1,2 @@ [build] -rustflags=["-D", "warnings", "-W", "unreachable-pub", "-W", "bare-trait-objects"] \ No newline at end of file +rustflags=["-D", "warnings", "-W", "unreachable-pub", "-W", "bare-trait-objects"] diff --git a/icechunk-python/tests/test_virtual_ref.py b/icechunk-python/tests/test_virtual_ref.py index 87d35bb62..a272478da 100644 --- a/icechunk-python/tests/test_virtual_ref.py +++ b/icechunk-python/tests/test_virtual_ref.py @@ -198,8 +198,10 @@ async def test_write_minio_virtual_refs(use_async) -> None: assert vref_none is None all_refs = await session.all_virtual_refs_async() - all_refs_dict = {key: (location, offset, length) for key, location, offset, length, _ in all_refs} - + all_refs_dict = { + key: (location, offset, length) for key, location, offset, length, _ in all_refs + } + assert len(all_refs_dict) >= 2 assert all_refs_dict.get("c/0/0/0") == (f"s3://testbucket/{prefix}/chunk-1", 0, 4) assert all_refs_dict.get("c/0/0/1") == (f"s3://testbucket/{prefix}/chunk-2", 1, 4) @@ -308,7 +310,7 @@ async def test_public_virtual_refs( vref = await store.get_virtual_ref_async("year/c/0") else: vref = store.get_virtual_ref("year/c/0") - + assert vref is not None assert vref[0] == file_path assert vref[1] == 22306 @@ -318,7 +320,7 @@ async def test_public_virtual_refs( all_refs = await session.all_virtual_refs_async() else: all_refs = session.all_virtual_refs() - + assert len(all_refs) == 1 assert all_refs[0][0] == "year/c/0" assert all_refs[0][1] == file_path From a91ed7e604a4324c67dc73a488b2585062176db6 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 22 Oct 2025 11:00:50 -0400 Subject: [PATCH 06/15] Fix tests --- icechunk-python/tests/test_virtual_ref.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/icechunk-python/tests/test_virtual_ref.py b/icechunk-python/tests/test_virtual_ref.py index a272478da..8b72c27c2 100644 --- a/icechunk-python/tests/test_virtual_ref.py +++ b/icechunk-python/tests/test_virtual_ref.py @@ -310,9 +310,9 @@ async def test_public_virtual_refs( vref = await store.get_virtual_ref_async("year/c/0") else: vref = store.get_virtual_ref("year/c/0") - + assert vref is not None - assert vref[0] == file_path + assert vref[0].endswith("/netcdf/oscar_vel2018.nc") assert vref[1] == 22306 assert vref[2] == 288 @@ -320,10 +320,10 @@ async def test_public_virtual_refs( all_refs = await session.all_virtual_refs_async() else: all_refs = session.all_virtual_refs() - + assert len(all_refs) == 1 assert all_refs[0][0] == "year/c/0" - assert all_refs[0][1] == file_path + assert all_refs[0][1].endswith("/netcdf/oscar_vel2018.nc") assert all_refs[0][2] == 22306 assert all_refs[0][3] == 288 From 6d259f61d95eebd5c728aa533d3c4c802d028bb8 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 22 Oct 2025 11:01:03 -0400 Subject: [PATCH 07/15] fmt --- icechunk-python/tests/test_virtual_ref.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/icechunk-python/tests/test_virtual_ref.py b/icechunk-python/tests/test_virtual_ref.py index 8b72c27c2..8c98a2dd6 100644 --- a/icechunk-python/tests/test_virtual_ref.py +++ b/icechunk-python/tests/test_virtual_ref.py @@ -310,7 +310,7 @@ async def test_public_virtual_refs( vref = await store.get_virtual_ref_async("year/c/0") else: vref = store.get_virtual_ref("year/c/0") - + assert vref is not None assert vref[0].endswith("/netcdf/oscar_vel2018.nc") assert vref[1] == 22306 @@ -320,7 +320,7 @@ async def test_public_virtual_refs( all_refs = await session.all_virtual_refs_async() else: all_refs = session.all_virtual_refs() - + assert len(all_refs) == 1 assert all_refs[0][0] == "year/c/0" assert all_refs[0][1].endswith("/netcdf/oscar_vel2018.nc") From b8e369f398b4b17d94fac2bbc3dee8ccfbe98272 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 22 Oct 2025 11:23:56 -0400 Subject: [PATCH 08/15] Some error handling --- icechunk-python/src/errors.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/icechunk-python/src/errors.rs b/icechunk-python/src/errors.rs index d5fc0bff8..7198c33d5 100644 --- a/icechunk-python/src/errors.rs +++ b/icechunk-python/src/errors.rs @@ -136,6 +136,7 @@ impl IcechunkError { #[pymethods] impl IcechunkError { #[new] + #[pyo3(signature = (message = String::new()))] pub fn new(message: String) -> Self { Self { message } } @@ -172,6 +173,7 @@ impl PyConflictError { #[pymethods] impl PyConflictError { #[new] + #[pyo3(signature = (expected_parent = None, actual_parent = None))] pub fn new(expected_parent: Option, actual_parent: Option) -> Self { Self { expected_parent, actual_parent } } @@ -214,6 +216,7 @@ impl PyRebaseFailedError { #[pymethods] impl PyRebaseFailedError { #[new] + #[pyo3(signature = (snapshot = String::new(), conflicts = Vec::new()))] pub fn new(snapshot: String, conflicts: Vec) -> Self { Self { snapshot, conflicts } } From 9e194d2125ba9832896aae7f59e21aab265eef9d Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 22 Oct 2025 11:40:04 -0400 Subject: [PATCH 09/15] More --- icechunk-python/src/errors.rs | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/icechunk-python/src/errors.rs b/icechunk-python/src/errors.rs index 7198c33d5..5657df1bc 100644 --- a/icechunk-python/src/errors.rs +++ b/icechunk-python/src/errors.rs @@ -8,7 +8,7 @@ use icechunk::{ }; use miette::{Diagnostic, GraphicalReportHandler}; use pyo3::{ - PyErr, + IntoPyObjectExt, PyErr, exceptions::{PyException, PyKeyError, PyValueError}, prelude::*, }; @@ -148,6 +148,15 @@ impl IcechunkError { fn __str__(&self) -> String { self.message.clone() } + + fn __reduce__<'py>( + &self, + py: Python<'py>, + ) -> PyResult<(Bound<'py, PyAny>, Bound<'py, PyAny>)> { + let cls = py.get_type::(); + let args = (self.message.clone(),).into_py_any(py)?.into_bound(py); + Ok((cls.into_any(), args)) + } } impl_pickle!(IcechunkError); @@ -192,6 +201,17 @@ impl PyConflictError { self.expected_parent, self.actual_parent ) } + + fn __reduce__<'py>( + &self, + py: Python<'py>, + ) -> PyResult<(Bound<'py, PyAny>, Bound<'py, PyAny>)> { + let cls = py.get_type::(); + let args = (self.expected_parent.clone(), self.actual_parent.clone()) + .into_py_any(py)? + .into_bound(py); + Ok((cls.into_any(), args)) + } } impl_pickle!(PyConflictError); @@ -235,6 +255,17 @@ impl PyRebaseFailedError { self.conflicts.len() ) } + + fn __reduce__<'py>( + &self, + py: Python<'py>, + ) -> PyResult<(Bound<'py, PyAny>, Bound<'py, PyAny>)> { + let cls = py.get_type::(); + let args = (self.snapshot.clone(), self.conflicts.clone()) + .into_py_any(py)? + .into_bound(py); + Ok((cls.into_any(), args)) + } } impl_pickle!(PyRebaseFailedError); From 67d095a5ab4d59c392cfb8614f704eb880f667ad Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Wed, 14 Jan 2026 16:55:58 -0500 Subject: [PATCH 10/15] fmt --- icechunk-python/src/errors.rs | 1 - icechunk-python/src/session.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/icechunk-python/src/errors.rs b/icechunk-python/src/errors.rs index 46675b5d8..a1daefb32 100644 --- a/icechunk-python/src/errors.rs +++ b/icechunk-python/src/errors.rs @@ -205,7 +205,6 @@ impl PyConflictError { ) } - // Control pickling to work with tblib fn __reduce__(&self, py: Python<'_>) -> PyResult<(Py, Py)> { let cls = py.get_type::().into_py_any(py)?; diff --git a/icechunk-python/src/session.rs b/icechunk-python/src/session.rs index da4622754..bc1d55e3c 100644 --- a/icechunk-python/src/session.rs +++ b/icechunk-python/src/session.rs @@ -5,8 +5,8 @@ use chrono::Utc; use futures::{StreamExt, TryStreamExt}; use icechunk::{ Store, + format::manifest::{Checksum, ChunkPayload}, format::{ChunkIndices, Path}, - format::manifest::{ChunkPayload, Checksum}, session::{Session, SessionErrorKind}, store::{StoreError, StoreErrorKind}, }; From b20f807c537356c925607084e1d0342147de7c74 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Thu, 15 Jan 2026 10:18:21 -0500 Subject: [PATCH 11/15] fix use of allow_threads --- icechunk-python/src/session.rs | 2 +- icechunk-python/src/store.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/icechunk-python/src/session.rs b/icechunk-python/src/session.rs index bc1d55e3c..89d11a4ec 100644 --- a/icechunk-python/src/session.rs +++ b/icechunk-python/src/session.rs @@ -284,7 +284,7 @@ impl PySession { } pub fn all_virtual_refs(&self, py: Python<'_>) -> PyResult> { - py.allow_threads(move || { + py.detach(move || { let session = self.0.blocking_read(); pyo3_async_runtimes::tokio::get_runtime().block_on(async move { diff --git a/icechunk-python/src/store.rs b/icechunk-python/src/store.rs index 73457011b..956c67dac 100644 --- a/icechunk-python/src/store.rs +++ b/icechunk-python/src/store.rs @@ -316,7 +316,7 @@ impl PyStore { py: Python<'_>, key: String, ) -> PyIcechunkStoreResult { - py.allow_threads(move || { + py.detach(move || { let store = Arc::clone(&self.0); pyo3_async_runtimes::tokio::get_runtime().block_on(async move { From 44d9b9bcee9f3046b9a5a59e070152f44533ae95 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Thu, 15 Jan 2026 10:42:34 -0500 Subject: [PATCH 12/15] move get_all_virtual_refs to the store class for consistency --- .../python/icechunk/_icechunk_python.pyi | 12 +-- icechunk-python/src/session.rs | 86 +------------------ icechunk-python/src/store.rs | 72 ++++++++++++++++ icechunk/src/session.rs | 21 ----- icechunk/src/store.rs | 23 +++++ 5 files changed, 102 insertions(+), 112 deletions(-) diff --git a/icechunk-python/python/icechunk/_icechunk_python.pyi b/icechunk-python/python/icechunk/_icechunk_python.pyi index a2ecb7ed4..7d289640d 100644 --- a/icechunk-python/python/icechunk/_icechunk_python.pyi +++ b/icechunk-python/python/icechunk/_icechunk_python.pyi @@ -1883,12 +1883,6 @@ class PySession: async def move_node_async(self, from_path: str, to_path: str) -> None: ... def all_virtual_chunk_locations(self) -> list[str]: ... async def all_virtual_chunk_locations_async(self) -> list[str]: ... - def all_virtual_refs( - self, - ) -> list[tuple[str, str, int, int, str | None, datetime.datetime | None]]: ... - async def all_virtual_refs_async( - self, - ) -> list[tuple[str, str, int, int, str | None, datetime.datetime | None]]: ... def chunk_coordinates( self, array_path: str, batch_size: int ) -> AsyncIterator[list[list[int]]]: ... @@ -1974,6 +1968,12 @@ class PyStore: async def get_virtual_ref_async( self, key: str ) -> tuple[str, int, int, str | None, datetime.datetime | None] | None: ... + def all_virtual_refs( + self, + ) -> list[tuple[str, str, int, int, str | None, datetime.datetime | None]]: ... + async def all_virtual_refs_async( + self, + ) -> list[tuple[str, str, int, int, str | None, datetime.datetime | None]]: ... def set_virtual_ref( self, key: str, diff --git a/icechunk-python/src/session.rs b/icechunk-python/src/session.rs index 89d11a4ec..cf3853887 100644 --- a/icechunk-python/src/session.rs +++ b/icechunk-python/src/session.rs @@ -1,11 +1,10 @@ use std::{borrow::Cow, ops::Deref, sync::Arc}; use async_stream::try_stream; -use chrono::Utc; use futures::{StreamExt, TryStreamExt}; use icechunk::{ Store, - format::manifest::{Checksum, ChunkPayload}, + format::manifest::ChunkPayload, format::{ChunkIndices, Path}, session::{Session, SessionErrorKind}, store::{StoreError, StoreErrorKind}, @@ -25,9 +24,6 @@ use crate::{ streams::PyAsyncGenerator, }; -type VirtualRefTuple = - (String, String, u64, u64, Option, Option>); - #[pyclass] #[derive(Clone)] pub struct PySession(pub Arc>); @@ -283,86 +279,6 @@ impl PySession { }) } - pub fn all_virtual_refs(&self, py: Python<'_>) -> PyResult> { - py.detach(move || { - let session = self.0.blocking_read(); - - pyo3_async_runtimes::tokio::get_runtime().block_on(async move { - let res: Vec<_> = session - .all_virtual_refs() - .await - .map_err(PyIcechunkStoreError::SessionError)? - .map(|result| { - result.map(|(key, vref)| { - let location = vref.location.url().to_string(); - let (etag, last_modified) = match vref.checksum { - Some(Checksum::ETag(etag)) => (Some(etag.0), None), - Some(Checksum::LastModified(secs)) => ( - None, - Some( - chrono::DateTime::from_timestamp( - secs.0 as i64, - 0, - ) - .unwrap_or_default(), - ), - ), - None => (None, None), - }; - (key, location, vref.offset, vref.length, etag, last_modified) - }) - }) - .try_collect() - .await - .map_err(PyIcechunkStoreError::SessionError)?; - - Ok(res) - }) - }) - } - - pub fn all_virtual_refs_async<'py>( - &'py self, - py: Python<'py>, - ) -> PyResult> { - let session = self.0.clone(); - pyo3_async_runtimes::tokio::future_into_py::<_, Vec>( - py, - async move { - let session = session.read().await; - let res: Vec<_> = session - .all_virtual_refs() - .await - .map_err(PyIcechunkStoreError::SessionError)? - .map(|result| { - result.map(|(key, vref)| { - let location = vref.location.url().to_string(); - let (etag, last_modified) = match vref.checksum { - Some(Checksum::ETag(etag)) => (Some(etag.0), None), - Some(Checksum::LastModified(secs)) => ( - None, - Some( - chrono::DateTime::from_timestamp( - secs.0 as i64, - 0, - ) - .unwrap_or_default(), - ), - ), - None => (None, None), - }; - (key, location, vref.offset, vref.length, etag, last_modified) - }) - }) - .try_collect() - .await - .map_err(PyIcechunkStoreError::SessionError)?; - - Ok(res) - }, - ) - } - /// Return vectors of coordinates, up to batch_size in length. /// /// We batch the results to make it faster. diff --git a/icechunk-python/src/store.rs b/icechunk-python/src/store.rs index 956c67dac..607bc5492 100644 --- a/icechunk-python/src/store.rs +++ b/icechunk-python/src/store.rs @@ -37,6 +37,8 @@ type VirtualRefResult = Option<( Option, Option>, )>; +type VirtualRefTuple = + (String, String, u64, u64, Option, Option>); #[derive(FromPyObject, Clone, Debug)] enum ChecksumArgument { @@ -372,6 +374,76 @@ impl PyStore { }) } + fn all_virtual_refs( + &self, + py: Python<'_>, + ) -> PyIcechunkStoreResult> { + py.detach(move || { + let store = Arc::clone(&self.0); + + pyo3_async_runtimes::tokio::get_runtime().block_on(async move { + let refs = + store.all_virtual_refs().await.map_err(PyIcechunkStoreError::from)?; + + let res: Vec<_> = refs + .into_iter() + .map(|(key, vref)| { + let location = vref.location.url().to_string(); + let (etag, last_modified) = match vref.checksum { + Some(Checksum::ETag(etag)) => (Some(etag.0), None), + Some(Checksum::LastModified(secs)) => ( + None, + Some( + chrono::DateTime::from_timestamp(secs.0 as i64, 0) + .unwrap_or_default(), + ), + ), + None => (None, None), + }; + (key, location, vref.offset, vref.length, etag, last_modified) + }) + .collect(); + + Ok(res) + }) + }) + } + + fn all_virtual_refs_async<'py>( + &'py self, + py: Python<'py>, + ) -> PyResult> { + let store = Arc::clone(&self.0); + pyo3_async_runtimes::tokio::future_into_py::<_, Vec>( + py, + async move { + let refs = + store.all_virtual_refs().await.map_err(PyIcechunkStoreError::from)?; + + let res: Vec<_> = refs + .into_iter() + .map(|(key, vref)| { + let location = vref.location.url().to_string(); + let (etag, last_modified) = match vref.checksum { + Some(Checksum::ETag(etag)) => (Some(etag.0), None), + Some(Checksum::LastModified(secs)) => ( + None, + Some( + chrono::DateTime::from_timestamp(secs.0 as i64, 0) + .unwrap_or_default(), + ), + ), + None => (None, None), + }; + (key, location, vref.offset, vref.length, etag, last_modified) + }) + .collect(); + + Ok(res) + }, + ) + } + #[allow(clippy::too_many_arguments)] fn set_virtual_ref( &self, diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index c3da78fb3..cf4955a8d 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -1086,27 +1086,6 @@ impl Session { Ok(stream) } - #[instrument(skip(self))] - pub async fn all_virtual_refs( - &self, - ) -> SessionResult> + '_> - { - let stream = - self.all_chunks().await?.try_filter_map(|(path, info)| match info.payload { - ChunkPayload::Virtual(reference) => { - let coords = info.coord.0.iter().map(|c| c.to_string()).join("/"); - let key = - [path.to_string()[1..].to_string(), "c".to_string(), coords] - .iter() - .filter(|s| !s.is_empty()) - .join("/"); - ready(Ok(Some((key, reference)))) - } - _ => ready(Ok(None)), - }); - Ok(stream) - } - /// Discard all uncommitted changes #[instrument(skip(self))] pub fn discard_changes(&mut self) -> SessionResult<()> { diff --git a/icechunk/src/store.rs b/icechunk/src/store.rs index 56a697501..e2fdac279 100644 --- a/icechunk/src/store.rs +++ b/icechunk/src/store.rs @@ -460,6 +460,29 @@ impl Store { } } + #[instrument(skip(self))] + pub async fn all_virtual_refs(&self) -> StoreResult> { + let session = self.session.read().await; + let refs: Vec<_> = session + .all_chunks() + .await? + .try_filter_map(|(path, info)| match info.payload { + ChunkPayload::Virtual(reference) => { + let coords = info.coord.0.iter().map(|c| c.to_string()).join("/"); + let key = + [path.to_string()[1..].to_string(), "c".to_string(), coords] + .iter() + .filter(|s| !s.is_empty()) + .join("/"); + std::future::ready(Ok(Some((key, reference)))) + } + _ => std::future::ready(Ok(None)), + }) + .try_collect() + .await?; + Ok(refs) + } + #[instrument(skip(self))] pub async fn delete_dir(&self, prefix: &str) -> StoreResult<()> { if self.read_only().await { From 0cc26183355c6345ccde15f4e57edcea1991c7e7 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Thu, 15 Jan 2026 10:53:28 -0500 Subject: [PATCH 13/15] update tests --- icechunk/tests/test_virtual_refs.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/icechunk/tests/test_virtual_refs.rs b/icechunk/tests/test_virtual_refs.rs index 1c55bc8fd..69c54c501 100644 --- a/icechunk/tests/test_virtual_refs.rs +++ b/icechunk/tests/test_virtual_refs.rs @@ -613,9 +613,8 @@ async fn test_zarr_store_virtual_refs_minio_set_and_get() let non_existent = store.get_virtual_ref("array/c/0/0/2").await?; assert_eq!(non_existent, None); - let session = store.session(); let all_refs: HashMap = - session.read().await.all_virtual_refs().await?.try_collect().await?; + store.all_virtual_refs().await?.into_iter().collect(); assert_eq!(all_refs.len(), 2); assert_eq!(all_refs.get("array/c/0/0/0"), Some(&ref1)); @@ -817,9 +816,8 @@ async fn test_zarr_store_virtual_refs_from_public_gcs() assert_eq!(store.get_virtual_ref("year/c/3").await?, Some(ref_expired.clone())); assert_eq!(store.get_virtual_ref("year/c/4").await?, Some(ref_bad_tag.clone())); - let session = store.session(); let all_refs: HashMap = - session.read().await.all_virtual_refs().await?.try_collect().await?; + store.all_virtual_refs().await?.into_iter().collect(); assert_eq!(all_refs.len(), 5); assert_eq!(all_refs.get("year/c/0"), Some(&ref1)); From 1e7715dd76cc745f9088065438c66042c0c2e0e0 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Thu, 15 Jan 2026 11:13:50 -0500 Subject: [PATCH 14/15] update python API --- icechunk-python/python/icechunk/session.py | 44 ---------------------- icechunk-python/python/icechunk/store.py | 44 ++++++++++++++++++++++ icechunk-python/tests/test_virtual_ref.py | 6 +-- 3 files changed, 47 insertions(+), 47 deletions(-) diff --git a/icechunk-python/python/icechunk/session.py b/icechunk-python/python/icechunk/session.py index 8c1820571..7b7a45418 100644 --- a/icechunk-python/python/icechunk/session.py +++ b/icechunk-python/python/icechunk/session.py @@ -192,50 +192,6 @@ async def all_virtual_chunk_locations_async(self) -> list[str]: """ return await self._session.all_virtual_chunk_locations_async() - def all_virtual_refs( - self, - ) -> list[tuple[str, str, int, int, str | datetime | None]]: - """ - Return all virtual references in the session. - - Returns - ------- - list[tuple[str, str, int, int, str | datetime | None]] - A list of tuples containing: - - zarr_key: The full zarr key (e.g., "array/c/0/0/1") - - location: The storage location URL - - offset: Byte offset in the file - - length: Length in bytes - - checksum: Either an etag string or datetime, or None - """ - result = self._session.all_virtual_refs() - return [ - (key, location, offset, length, etag if etag is not None else last_modified) - for key, location, offset, length, etag, last_modified in result - ] - - async def all_virtual_refs_async( - self, - ) -> list[tuple[str, str, int, int, str | datetime | None]]: - """ - Return all virtual references in the session (async version). - - Returns - ------- - list[tuple[str, str, int, int, str | datetime | None]] - A list of tuples containing: - - zarr_key: The full zarr key (e.g., "array/c/0/0/1") - - location: The storage location URL - - offset: Byte offset in the file - - length: Length in bytes - - checksum: Either an etag string or datetime, or None - """ - result = await self._session.all_virtual_refs_async() - return [ - (key, location, offset, length, etag if etag is not None else last_modified) - for key, location, offset, length, etag, last_modified in result - ] - async def chunk_coordinates( self, array_path: str, batch_size: int = 1000 ) -> AsyncIterator[tuple[int, ...]]: diff --git a/icechunk-python/python/icechunk/store.py b/icechunk-python/python/icechunk/store.py index b225e5459..ea46f89da 100644 --- a/icechunk-python/python/icechunk/store.py +++ b/icechunk-python/python/icechunk/store.py @@ -278,6 +278,50 @@ async def get_virtual_ref_async( checksum = etag if etag is not None else last_modified return (location, offset, length, checksum) + def all_virtual_refs( + self, + ) -> list[tuple[str, str, int, int, str | datetime | None]]: + """ + Return all virtual references in the store. + + Returns + ------- + list[tuple[str, str, int, int, str | datetime | None]] + A list of tuples containing: + - zarr_key: The full zarr key (e.g., "array/c/0/0/1") + - location: The storage location URL + - offset: Byte offset in the file + - length: Length in bytes + - checksum: Either an etag string or datetime, or None + """ + result = self._store.all_virtual_refs() + return [ + (key, location, offset, length, etag if etag is not None else last_modified) + for key, location, offset, length, etag, last_modified in result + ] + + async def all_virtual_refs_async( + self, + ) -> list[tuple[str, str, int, int, str | datetime | None]]: + """ + Return all virtual references in the store (async version). + + Returns + ------- + list[tuple[str, str, int, int, str | datetime | None]] + A list of tuples containing: + - zarr_key: The full zarr key (e.g., "array/c/0/0/1") + - location: The storage location URL + - offset: Byte offset in the file + - length: Length in bytes + - checksum: Either an etag string or datetime, or None + """ + result = await self._store.all_virtual_refs_async() + return [ + (key, location, offset, length, etag if etag is not None else last_modified) + for key, location, offset, length, etag, last_modified in result + ] + def set_virtual_ref( self, key: str, diff --git a/icechunk-python/tests/test_virtual_ref.py b/icechunk-python/tests/test_virtual_ref.py index dd0bef985..f21758688 100644 --- a/icechunk-python/tests/test_virtual_ref.py +++ b/icechunk-python/tests/test_virtual_ref.py @@ -200,7 +200,7 @@ async def test_write_minio_virtual_refs( vref_none = await store.get_virtual_ref_async("c/0/0/3") assert vref_none is None - all_refs = await session.all_virtual_refs_async() + all_refs = await store.all_virtual_refs_async() all_refs_dict = { key: (location, offset, length) for key, location, offset, length, _ in all_refs } @@ -322,9 +322,9 @@ async def test_public_virtual_refs( assert vref[2] == 288 if use_async: - all_refs = await session.all_virtual_refs_async() + all_refs = await store.all_virtual_refs_async() else: - all_refs = session.all_virtual_refs() + all_refs = store.all_virtual_refs() assert len(all_refs) == 1 assert all_refs[0][0] == "year/c/0" From 44a0217a337c61963690dc2eadbe45d798ef9c6c Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Sat, 24 Jan 2026 11:48:28 -0500 Subject: [PATCH 15/15] python lint --- icechunk-python/python/icechunk/session.py | 1 - 1 file changed, 1 deletion(-) diff --git a/icechunk-python/python/icechunk/session.py b/icechunk-python/python/icechunk/session.py index 7b7a45418..e5b5fa041 100644 --- a/icechunk-python/python/icechunk/session.py +++ b/icechunk-python/python/icechunk/session.py @@ -1,6 +1,5 @@ import contextlib import warnings -from datetime import datetime from collections.abc import AsyncIterator, Callable, Generator, Iterable, Sequence from typing import Any, NoReturn, Self