diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 000000000..7ea258ace --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags=["-D", "warnings", "-W", "unreachable-pub", "-W", "bare-trait-objects"] diff --git a/icechunk-python/python/icechunk/_icechunk_python.pyi b/icechunk-python/python/icechunk/_icechunk_python.pyi index 3e8f85ba3..7d289640d 100644 --- a/icechunk-python/python/icechunk/_icechunk_python.pyi +++ b/icechunk-python/python/icechunk/_icechunk_python.pyi @@ -1962,6 +1962,18 @@ class PyStore: def supports_deletes(self) -> bool: ... async def set(self, key: str, value: bytes) -> None: ... async def set_if_not_exists(self, key: str, value: bytes) -> None: ... + def get_virtual_ref( + self, key: str + ) -> tuple[str, int, int, str | None, datetime.datetime | None] | None: ... + async def get_virtual_ref_async( + self, key: str + ) -> tuple[str, int, int, str | None, datetime.datetime | None] | None: ... + def all_virtual_refs( + self, + ) -> list[tuple[str, str, int, int, str | None, datetime.datetime | None]]: ... + async def all_virtual_refs_async( + self, + ) -> list[tuple[str, str, int, int, str | None, datetime.datetime | None]]: ... def set_virtual_ref( self, key: str, diff --git a/icechunk-python/python/icechunk/store.py b/icechunk-python/python/icechunk/store.py index a0ab1a958..ea46f89da 100644 --- a/icechunk-python/python/icechunk/store.py +++ b/icechunk-python/python/icechunk/store.py @@ -232,6 +232,96 @@ async def set_if_not_exists(self, key: str, value: Buffer) -> None: """ return await self._store.set_if_not_exists(key, value.to_bytes()) + def get_virtual_ref( + self, key: str + ) -> tuple[str, int, int, str | datetime | None] | None: + """Get a virtual reference for a chunk. + + Parameters + ---------- + key : str + The chunk to retrieve the reference for. This is the fully qualified zarr key eg: 'array/c/0/0/0' + + Returns + ------- + tuple[str, int, int, str | datetime | None] | None + A tuple of (location, offset, length, checksum) if the chunk has a virtual reference, None otherwise. + The checksum will be either an etag string or a datetime object. + """ + result = self._store.get_virtual_ref(key) + if result is None: + return None + location, offset, length, etag, last_modified = result + checksum = etag if etag is not None else last_modified + return (location, offset, length, checksum) + + async def get_virtual_ref_async( + self, key: str + ) -> tuple[str, int, int, str | datetime | None] | None: + """Get a virtual reference for a chunk asynchronously. + + Parameters + ---------- + key : str + The chunk to retrieve the reference for. This is the fully qualified zarr key eg: 'array/c/0/0/0' + + Returns + ------- + tuple[str, int, int, str | datetime | None] | None + A tuple of (location, offset, length, checksum) if the chunk has a virtual reference, None otherwise. + The checksum will be either an etag string or a datetime object. + """ + result = await self._store.get_virtual_ref_async(key) + if result is None: + return None + location, offset, length, etag, last_modified = result + checksum = etag if etag is not None else last_modified + return (location, offset, length, checksum) + + def all_virtual_refs( + self, + ) -> list[tuple[str, str, int, int, str | datetime | None]]: + """ + Return all virtual references in the store. + + Returns + ------- + list[tuple[str, str, int, int, str | datetime | None]] + A list of tuples containing: + - zarr_key: The full zarr key (e.g., "array/c/0/0/1") + - location: The storage location URL + - offset: Byte offset in the file + - length: Length in bytes + - checksum: Either an etag string or datetime, or None + """ + result = self._store.all_virtual_refs() + return [ + (key, location, offset, length, etag if etag is not None else last_modified) + for key, location, offset, length, etag, last_modified in result + ] + + async def all_virtual_refs_async( + self, + ) -> list[tuple[str, str, int, int, str | datetime | None]]: + """ + Return all virtual references in the store (async version). + + Returns + ------- + list[tuple[str, str, int, int, str | datetime | None]] + A list of tuples containing: + - zarr_key: The full zarr key (e.g., "array/c/0/0/1") + - location: The storage location URL + - offset: Byte offset in the file + - length: Length in bytes + - checksum: Either an etag string or datetime, or None + """ + result = await self._store.all_virtual_refs_async() + return [ + (key, location, offset, length, etag if etag is not None else last_modified) + for key, location, offset, length, etag, last_modified in result + ] + def set_virtual_ref( self, key: str, diff --git a/icechunk-python/src/session.rs b/icechunk-python/src/session.rs index 67b49c749..cf3853887 100644 --- a/icechunk-python/src/session.rs +++ b/icechunk-python/src/session.rs @@ -4,7 +4,8 @@ use async_stream::try_stream; use futures::{StreamExt, TryStreamExt}; use icechunk::{ Store, - format::{ChunkIndices, Path, manifest::ChunkPayload}, + format::manifest::ChunkPayload, + format::{ChunkIndices, Path}, session::{Session, SessionErrorKind}, store::{StoreError, StoreErrorKind}, }; diff --git a/icechunk-python/src/store.rs b/icechunk-python/src/store.rs index 1c4b6eabf..607bc5492 100644 --- a/icechunk-python/src/store.rs +++ b/icechunk-python/src/store.rs @@ -30,6 +30,15 @@ use crate::{ }; type KeyRanges = Vec<(String, (Option, Option))>; +type VirtualRefResult = Option<( + String, + ChunkOffset, + ChunkLength, + Option, + Option>, +)>; +type VirtualRefTuple = + (String, String, u64, u64, Option, Option>); #[derive(FromPyObject, Clone, Debug)] enum ChecksumArgument { @@ -304,6 +313,137 @@ impl PyStore { }) } + fn get_virtual_ref( + &self, + py: Python<'_>, + key: String, + ) -> PyIcechunkStoreResult { + py.detach(move || { + let store = Arc::clone(&self.0); + + pyo3_async_runtimes::tokio::get_runtime().block_on(async move { + let vref = store + .get_virtual_ref(&key) + .await + .map_err(PyIcechunkStoreError::from)?; + + Ok(vref.map(|vref| { + let location = vref.location.url().to_string(); + let (etag, last_modified) = match vref.checksum { + Some(Checksum::ETag(etag)) => (Some(etag.0), None), + Some(Checksum::LastModified(secs)) => ( + None, + Some( + chrono::DateTime::from_timestamp(secs.0 as i64, 0) + .unwrap_or_default(), + ), + ), + None => (None, None), + }; + (location, vref.offset, vref.length, etag, last_modified) + })) + }) + }) + } + + fn get_virtual_ref_async<'py>( + &'py self, + py: Python<'py>, + key: String, + ) -> PyResult> { + let store = Arc::clone(&self.0); + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let vref = + store.get_virtual_ref(&key).await.map_err(PyIcechunkStoreError::from)?; + + Ok(vref.map(|vref| { + let location = vref.location.url().to_string(); + let (etag, last_modified) = match vref.checksum { + Some(Checksum::ETag(etag)) => (Some(etag.0), None), + Some(Checksum::LastModified(secs)) => ( + None, + Some( + chrono::DateTime::from_timestamp(secs.0 as i64, 0) + .unwrap_or_default(), + ), + ), + None => (None, None), + }; + (location, vref.offset, vref.length, etag, last_modified) + })) + }) + } + + fn all_virtual_refs( + &self, + py: Python<'_>, + ) -> PyIcechunkStoreResult> { + py.detach(move || { + let store = Arc::clone(&self.0); + + pyo3_async_runtimes::tokio::get_runtime().block_on(async move { + let refs = + store.all_virtual_refs().await.map_err(PyIcechunkStoreError::from)?; + + let res: Vec<_> = refs + .into_iter() + .map(|(key, vref)| { + let location = vref.location.url().to_string(); + let (etag, last_modified) = match vref.checksum { + Some(Checksum::ETag(etag)) => (Some(etag.0), None), + Some(Checksum::LastModified(secs)) => ( + None, + Some( + chrono::DateTime::from_timestamp(secs.0 as i64, 0) + .unwrap_or_default(), + ), + ), + None => (None, None), + }; + (key, location, vref.offset, vref.length, etag, last_modified) + }) + .collect(); + + Ok(res) + }) + }) + } + + fn all_virtual_refs_async<'py>( + &'py self, + py: Python<'py>, + ) -> PyResult> { + let store = Arc::clone(&self.0); + pyo3_async_runtimes::tokio::future_into_py::<_, Vec>( + py, + async move { + let refs = + store.all_virtual_refs().await.map_err(PyIcechunkStoreError::from)?; + + let res: Vec<_> = refs + .into_iter() + .map(|(key, vref)| { + let location = vref.location.url().to_string(); + let (etag, last_modified) = match vref.checksum { + Some(Checksum::ETag(etag)) => (Some(etag.0), None), + Some(Checksum::LastModified(secs)) => ( + None, + Some( + chrono::DateTime::from_timestamp(secs.0 as i64, 0) + .unwrap_or_default(), + ), + ), + None => (None, None), + }; + (key, location, vref.offset, vref.length, etag, last_modified) + }) + .collect(); + + Ok(res) + }, + ) + } + #[allow(clippy::too_many_arguments)] fn set_virtual_ref( &self, diff --git a/icechunk-python/tests/test_virtual_ref.py b/icechunk-python/tests/test_virtual_ref.py index 2523324d8..f21758688 100644 --- a/icechunk-python/tests/test_virtual_ref.py +++ b/icechunk-python/tests/test_virtual_ref.py @@ -185,6 +185,30 @@ async def test_write_minio_virtual_refs( validate_container=True, ) + vref_0_0_0 = await store.get_virtual_ref_async("c/0/0/0") + assert vref_0_0_0 is not None + assert vref_0_0_0[0] == f"s3://testbucket/{prefix}/chunk-1" + assert vref_0_0_0[1] == 0 + assert vref_0_0_0[2] == 4 + + vref_0_0_1 = await store.get_virtual_ref_async("c/0/0/1") + assert vref_0_0_1 is not None + assert vref_0_0_1[0] == f"s3://testbucket/{prefix}/chunk-2" + assert vref_0_0_1[1] == 1 + assert vref_0_0_1[2] == 4 + + vref_none = await store.get_virtual_ref_async("c/0/0/3") + assert vref_none is None + + all_refs = await store.all_virtual_refs_async() + all_refs_dict = { + key: (location, offset, length) for key, location, offset, length, _ in all_refs + } + + assert len(all_refs_dict) >= 2 + assert all_refs_dict.get("c/0/0/0") == (f"s3://testbucket/{prefix}/chunk-1", 0, 4) + assert all_refs_dict.get("c/0/0/1") == (f"s3://testbucket/{prefix}/chunk-2", 1, 4) + buffer_prototype = zarr.core.buffer.default_buffer_prototype() first = await store.get("c/0/0/0", prototype=buffer_prototype) @@ -287,6 +311,27 @@ async def test_public_virtual_refs( length=288, ) + if use_async: + vref = await store.get_virtual_ref_async("year/c/0") + else: + vref = store.get_virtual_ref("year/c/0") + + assert vref is not None + assert vref[0].endswith("/netcdf/oscar_vel2018.nc") + assert vref[1] == 22306 + assert vref[2] == 288 + + if use_async: + all_refs = await store.all_virtual_refs_async() + else: + all_refs = store.all_virtual_refs() + + assert len(all_refs) == 1 + assert all_refs[0][0] == "year/c/0" + assert all_refs[0][1].endswith("/netcdf/oscar_vel2018.nc") + assert all_refs[0][2] == 22306 + assert all_refs[0][3] == 288 + nodes = [n async for n in store.list()] assert "year/c/0" in nodes @@ -400,6 +445,12 @@ def test_error_on_nonexisting_virtual_chunk_container( ], ) + vref = store.get_virtual_ref("c/0") + assert vref is not None + assert vref[0] == "file:///foo" + assert vref[1] == 0 + assert vref[2] == 4 + with pytest.raises( IcechunkError, match=r"file:///foo.* edit the repository configuration" ): @@ -439,6 +490,13 @@ def test_error_on_non_authorized_virtual_chunk_container( ], ) + # don't need authorization just to view what the virtual chunk is, only to fetch from that location + vref = store.get_virtual_ref("c/0") + assert vref is not None + assert vref[0] == "file:///foo/bar" + assert vref[1] == 0 + assert vref[2] == 4 + with pytest.raises(IcechunkError, match=r"file:///foo.*authorize"): array[0] diff --git a/icechunk/src/store.rs b/icechunk/src/store.rs index 1833d06b5..e2fdac279 100644 --- a/icechunk/src/store.rs +++ b/icechunk/src/store.rs @@ -361,6 +361,29 @@ impl Store { } // alternate API would take array path, and a mapping from string coord to ChunkPayload + #[instrument(skip(self))] + pub async fn get_virtual_ref( + &self, + key: &str, + ) -> StoreResult> { + match Key::parse(key)? { + Key::Chunk { node_path, coords } => { + let session = self.session.read().await; + match session.get_chunk_ref(&node_path, &coords).await? { + Some(ChunkPayload::Virtual(vref)) => Ok(Some(vref)), + Some(ChunkPayload::Ref(_)) | Some(ChunkPayload::Inline(_)) => { + Ok(None) + } + None => Ok(None), + } + } + Key::Metadata { .. } | Key::ZarrV2(_) => Err(StoreErrorKind::NotAllowed( + format!("use .get to read metadata for key {key}"), + ) + .into()), + } + } + #[instrument(skip(self))] pub async fn set_virtual_ref( &self, @@ -437,6 +460,29 @@ impl Store { } } + #[instrument(skip(self))] + pub async fn all_virtual_refs(&self) -> StoreResult> { + let session = self.session.read().await; + let refs: Vec<_> = session + .all_chunks() + .await? + .try_filter_map(|(path, info)| match info.payload { + ChunkPayload::Virtual(reference) => { + let coords = info.coord.0.iter().map(|c| c.to_string()).join("/"); + let key = + [path.to_string()[1..].to_string(), "c".to_string(), coords] + .iter() + .filter(|s| !s.is_empty()) + .join("/"); + std::future::ready(Ok(Some((key, reference)))) + } + _ => std::future::ready(Ok(None)), + }) + .try_collect() + .await?; + Ok(refs) + } + #[instrument(skip(self))] pub async fn delete_dir(&self, prefix: &str) -> StoreResult<()> { if self.read_only().await { diff --git a/icechunk/tests/test_virtual_refs.rs b/icechunk/tests/test_virtual_refs.rs index e0c6c3d9e..69c54c501 100644 --- a/icechunk/tests/test_virtual_refs.rs +++ b/icechunk/tests/test_virtual_refs.rs @@ -601,8 +601,24 @@ async fn test_zarr_store_virtual_refs_minio_set_and_get() length: 5, checksum: None, }; - store.set_virtual_ref("array/c/0/0/0", ref1, false).await?; - store.set_virtual_ref("array/c/0/0/1", ref2, false).await?; + store.set_virtual_ref("array/c/0/0/0", ref1.clone(), false).await?; + store.set_virtual_ref("array/c/0/0/1", ref2.clone(), false).await?; + + let retrieved_ref1 = store.get_virtual_ref("array/c/0/0/0").await?; + assert_eq!(retrieved_ref1, Some(ref1.clone())); + + let retrieved_ref2 = store.get_virtual_ref("array/c/0/0/1").await?; + assert_eq!(retrieved_ref2, Some(ref2.clone())); + + let non_existent = store.get_virtual_ref("array/c/0/0/2").await?; + assert_eq!(non_existent, None); + + let all_refs: HashMap = + store.all_virtual_refs().await?.into_iter().collect(); + + assert_eq!(all_refs.len(), 2); + assert_eq!(all_refs.get("array/c/0/0/0"), Some(&ref1)); + assert_eq!(all_refs.get("array/c/0/0/1"), Some(&ref2)); assert_eq!(store.get("array/c/0/0/0", &ByteRange::ALL).await?, bytes1,); assert_eq!( @@ -788,11 +804,27 @@ async fn test_zarr_store_virtual_refs_from_public_gcs() checksum: Some(Checksum::ETag(ETag("bad".to_string()))), }; - store.set_virtual_ref("year/c/0", ref1, false).await?; - store.set_virtual_ref("year/c/1", ref2, false).await?; - store.set_virtual_ref("year/c/2", ref3, false).await?; - store.set_virtual_ref("year/c/3", ref_expired, false).await?; - store.set_virtual_ref("year/c/4", ref_bad_tag, false).await?; + store.set_virtual_ref("year/c/0", ref1.clone(), false).await?; + store.set_virtual_ref("year/c/1", ref2.clone(), false).await?; + store.set_virtual_ref("year/c/2", ref3.clone(), false).await?; + store.set_virtual_ref("year/c/3", ref_expired.clone(), false).await?; + store.set_virtual_ref("year/c/4", ref_bad_tag.clone(), false).await?; + + assert_eq!(store.get_virtual_ref("year/c/0").await?, Some(ref1.clone())); + assert_eq!(store.get_virtual_ref("year/c/1").await?, Some(ref2.clone())); + assert_eq!(store.get_virtual_ref("year/c/2").await?, Some(ref3.clone())); + assert_eq!(store.get_virtual_ref("year/c/3").await?, Some(ref_expired.clone())); + assert_eq!(store.get_virtual_ref("year/c/4").await?, Some(ref_bad_tag.clone())); + + let all_refs: HashMap = + store.all_virtual_refs().await?.into_iter().collect(); + + assert_eq!(all_refs.len(), 5); + assert_eq!(all_refs.get("year/c/0"), Some(&ref1)); + assert_eq!(all_refs.get("year/c/1"), Some(&ref2)); + assert_eq!(all_refs.get("year/c/2"), Some(&ref3)); + assert_eq!(all_refs.get("year/c/3"), Some(&ref_expired)); + assert_eq!(all_refs.get("year/c/4"), Some(&ref_bad_tag)); // FIXME: enable this once object_store can access public buckets without credentials // otherwise we get an error in GHA @@ -935,9 +967,13 @@ async fn test_zarr_store_with_multiple_virtual_chunk_containers() length: 5, checksum: Some(Checksum::LastModified(old_timestamp)), }; - store.set_virtual_ref("array/c/0/0/0", ref1, false).await?; - store.set_virtual_ref("array/c/0/0/1", ref2, false).await?; - store.set_virtual_ref("array/c/1/0/0", ref3, false).await?; + store.set_virtual_ref("array/c/0/0/0", ref1.clone(), false).await?; + store.set_virtual_ref("array/c/0/0/1", ref2.clone(), false).await?; + store.set_virtual_ref("array/c/1/0/0", ref3.clone(), false).await?; + + assert_eq!(store.get_virtual_ref("array/c/0/0/0").await?, Some(ref1)); + assert_eq!(store.get_virtual_ref("array/c/0/0/1").await?, Some(ref2)); + assert_eq!(store.get_virtual_ref("array/c/1/0/0").await?, Some(ref3)); // set virtual refs in local filesystem let chunk_1 = chunk_dir.path().join("chunk-1").to_str().unwrap().to_owned(); @@ -983,9 +1019,13 @@ async fn test_zarr_store_with_multiple_virtual_chunk_containers() checksum: Some(Checksum::ETag(ETag(String::from("invalid etag")))), }; - store.set_virtual_ref("array/c/0/0/2", ref1, false).await?; - store.set_virtual_ref("array/c/0/0/3", ref2, false).await?; - store.set_virtual_ref("array/c/1/0/1", ref3, false).await?; + store.set_virtual_ref("array/c/0/0/2", ref1.clone(), false).await?; + store.set_virtual_ref("array/c/0/0/3", ref2.clone(), false).await?; + store.set_virtual_ref("array/c/1/0/1", ref3.clone(), false).await?; + + assert_eq!(store.get_virtual_ref("array/c/0/0/2").await?, Some(ref1)); + assert_eq!(store.get_virtual_ref("array/c/0/0/3").await?, Some(ref2)); + assert_eq!(store.get_virtual_ref("array/c/1/0/1").await?, Some(ref3)); // set a virtual ref in a public bucket let public_ref = VirtualChunkRef { @@ -1005,8 +1045,11 @@ async fn test_zarr_store_with_multiple_virtual_chunk_containers() checksum: Some(Checksum::ETag(ETag(String::from("invalid etag")))), }; - store.set_virtual_ref("array/c/1/1/1", public_ref, false).await?; - store.set_virtual_ref("array/c/1/1/2", public_modified_ref, false).await?; + store.set_virtual_ref("array/c/1/1/1", public_ref.clone(), false).await?; + store.set_virtual_ref("array/c/1/1/2", public_modified_ref.clone(), false).await?; + + assert_eq!(store.get_virtual_ref("array/c/1/1/1").await?, Some(public_ref)); + assert_eq!(store.get_virtual_ref("array/c/1/1/2").await?, Some(public_modified_ref)); // assert we can find all the virtual chunks