Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustflags=["-D", "warnings", "-W", "unreachable-pub", "-W", "bare-trait-objects"]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want this here?

12 changes: 12 additions & 0 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,12 @@ class PySession:
async def move_node_async(self, from_path: str, to_path: str) -> None: ...
def all_virtual_chunk_locations(self) -> list[str]: ...
async def all_virtual_chunk_locations_async(self) -> list[str]: ...
def all_virtual_refs(
self,
) -> list[tuple[str, str, int, int, str | None, datetime.datetime | None]]: ...
async def all_virtual_refs_async(
self,
) -> list[tuple[str, str, int, int, str | None, datetime.datetime | None]]: ...
def chunk_coordinates(
self, array_path: str, batch_size: int
) -> AsyncIterator[list[list[int]]]: ...
Expand Down Expand Up @@ -1962,6 +1968,12 @@ class PyStore:
def supports_deletes(self) -> bool: ...
async def set(self, key: str, value: bytes) -> None: ...
async def set_if_not_exists(self, key: str, value: bytes) -> None: ...
def get_virtual_ref(
self, key: str
) -> tuple[str, int, int, str | None, datetime.datetime | None] | None: ...
async def get_virtual_ref_async(
self, key: str
) -> tuple[str, int, int, str | None, datetime.datetime | None] | None: ...
def set_virtual_ref(
self,
key: str,
Expand Down
45 changes: 45 additions & 0 deletions icechunk-python/python/icechunk/session.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import contextlib
import warnings
from datetime import datetime
from collections.abc import AsyncIterator, Callable, Generator, Iterable, Sequence
from typing import Any, NoReturn, Self

Expand Down Expand Up @@ -191,6 +192,50 @@ async def all_virtual_chunk_locations_async(self) -> list[str]:
"""
return await self._session.all_virtual_chunk_locations_async()

def all_virtual_refs(
self,
) -> list[tuple[str, str, int, int, str | datetime | None]]:
"""
Return all virtual references in the session.

Returns
-------
list[tuple[str, str, int, int, str | datetime | None]]
A list of tuples containing:
- zarr_key: The full zarr key (e.g., "array/c/0/0/1")
- location: The storage location URL
- offset: Byte offset in the file
- length: Length in bytes
- checksum: Either an etag string or datetime, or None
"""
result = self._session.all_virtual_refs()
return [
(key, location, offset, length, etag if etag is not None else last_modified)
for key, location, offset, length, etag, last_modified in result
]

async def all_virtual_refs_async(
self,
) -> list[tuple[str, str, int, int, str | datetime | None]]:
"""
Return all virtual references in the session (async version).

Returns
-------
list[tuple[str, str, int, int, str | datetime | None]]
A list of tuples containing:
- zarr_key: The full zarr key (e.g., "array/c/0/0/1")
- location: The storage location URL
- offset: Byte offset in the file
- length: Length in bytes
- checksum: Either an etag string or datetime, or None
"""
result = await self._session.all_virtual_refs_async()
return [
(key, location, offset, length, etag if etag is not None else last_modified)
for key, location, offset, length, etag, last_modified in result
]

async def chunk_coordinates(
self, array_path: str, batch_size: int = 1000
) -> AsyncIterator[tuple[int, ...]]:
Expand Down
46 changes: 46 additions & 0 deletions icechunk-python/python/icechunk/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,52 @@ async def set_if_not_exists(self, key: str, value: Buffer) -> None:
"""
return await self._store.set_if_not_exists(key, value.to_bytes())

def get_virtual_ref(
self, key: str
) -> tuple[str, int, int, str | datetime | None] | None:
"""Get a virtual reference for a chunk.

Parameters
----------
key : str
The chunk to retrieve the reference for. This is the fully qualified zarr key eg: 'array/c/0/0/0'

Returns
-------
tuple[str, int, int, str | datetime | None] | None
A tuple of (location, offset, length, checksum) if the chunk has a virtual reference, None otherwise.
The checksum will be either an etag string or a datetime object.
"""
result = self._store.get_virtual_ref(key)
if result is None:
return None
location, offset, length, etag, last_modified = result
checksum = etag if etag is not None else last_modified
return (location, offset, length, checksum)

async def get_virtual_ref_async(
self, key: str
) -> tuple[str, int, int, str | datetime | None] | None:
"""Get a virtual reference for a chunk asynchronously.

Parameters
----------
key : str
The chunk to retrieve the reference for. This is the fully qualified zarr key eg: 'array/c/0/0/0'

Returns
-------
tuple[str, int, int, str | datetime | None] | None
A tuple of (location, offset, length, checksum) if the chunk has a virtual reference, None otherwise.
The checksum will be either an etag string or a datetime object.
"""
result = await self._store.get_virtual_ref_async(key)
if result is None:
return None
location, offset, length, etag, last_modified = result
checksum = etag if etag is not None else last_modified
return (location, offset, length, checksum)

def set_virtual_ref(
self,
key: str,
Expand Down
87 changes: 86 additions & 1 deletion icechunk-python/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::{borrow::Cow, ops::Deref, sync::Arc};

use async_stream::try_stream;
use chrono::Utc;
use futures::{StreamExt, TryStreamExt};
use icechunk::{
Store,
format::{ChunkIndices, Path, manifest::ChunkPayload},
format::manifest::{Checksum, ChunkPayload},
format::{ChunkIndices, Path},
session::{Session, SessionErrorKind},
store::{StoreError, StoreErrorKind},
};
Expand All @@ -23,6 +25,9 @@ use crate::{
streams::PyAsyncGenerator,
};

type VirtualRefTuple =
(String, String, u64, u64, Option<String>, Option<chrono::DateTime<Utc>>);

#[pyclass]
#[derive(Clone)]
pub struct PySession(pub Arc<RwLock<Session>>);
Expand Down Expand Up @@ -278,6 +283,86 @@ impl PySession {
})
}

pub fn all_virtual_refs(&self, py: Python<'_>) -> PyResult<Vec<VirtualRefTuple>> {
py.allow_threads(move || {
let session = self.0.blocking_read();

pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
let res: Vec<_> = session
.all_virtual_refs()
.await
.map_err(PyIcechunkStoreError::SessionError)?
.map(|result| {
result.map(|(key, vref)| {
let location = vref.location.url().to_string();
let (etag, last_modified) = match vref.checksum {
Some(Checksum::ETag(etag)) => (Some(etag.0), None),
Some(Checksum::LastModified(secs)) => (
None,
Some(
chrono::DateTime::from_timestamp(
secs.0 as i64,
0,
)
.unwrap_or_default(),
),
),
None => (None, None),
};
(key, location, vref.offset, vref.length, etag, last_modified)
})
})
.try_collect()
.await
.map_err(PyIcechunkStoreError::SessionError)?;

Ok(res)
})
})
}

pub fn all_virtual_refs_async<'py>(
&'py self,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let session = self.0.clone();
pyo3_async_runtimes::tokio::future_into_py::<_, Vec<VirtualRefTuple>>(
py,
async move {
let session = session.read().await;
let res: Vec<_> = session
.all_virtual_refs()
.await
.map_err(PyIcechunkStoreError::SessionError)?
.map(|result| {
result.map(|(key, vref)| {
let location = vref.location.url().to_string();
let (etag, last_modified) = match vref.checksum {
Some(Checksum::ETag(etag)) => (Some(etag.0), None),
Some(Checksum::LastModified(secs)) => (
None,
Some(
chrono::DateTime::from_timestamp(
secs.0 as i64,
0,
)
.unwrap_or_default(),
),
),
None => (None, None),
};
(key, location, vref.offset, vref.length, etag, last_modified)
})
})
.try_collect()
.await
.map_err(PyIcechunkStoreError::SessionError)?;

Ok(res)
},
)
}

/// Return vectors of coordinates, up to batch_size in length.
///
/// We batch the results to make it faster.
Expand Down
68 changes: 68 additions & 0 deletions icechunk-python/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ use crate::{
};

type KeyRanges = Vec<(String, (Option<ChunkOffset>, Option<ChunkOffset>))>;
type VirtualRefResult = Option<(
String,
ChunkOffset,
ChunkLength,
Option<String>,
Option<chrono::DateTime<Utc>>,
)>;

#[derive(FromPyObject, Clone, Debug)]
enum ChecksumArgument {
Expand Down Expand Up @@ -304,6 +311,67 @@ impl PyStore {
})
}

fn get_virtual_ref(
&self,
py: Python<'_>,
key: String,
) -> PyIcechunkStoreResult<VirtualRefResult> {
py.allow_threads(move || {
let store = Arc::clone(&self.0);

pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
let vref = store
.get_virtual_ref(&key)
.await
.map_err(PyIcechunkStoreError::from)?;

Ok(vref.map(|vref| {
let location = vref.location.url().to_string();
let (etag, last_modified) = match vref.checksum {
Some(Checksum::ETag(etag)) => (Some(etag.0), None),
Some(Checksum::LastModified(secs)) => (
None,
Some(
chrono::DateTime::from_timestamp(secs.0 as i64, 0)
.unwrap_or_default(),
),
),
None => (None, None),
};
(location, vref.offset, vref.length, etag, last_modified)
}))
})
})
}

fn get_virtual_ref_async<'py>(
&'py self,
py: Python<'py>,
key: String,
) -> PyResult<Bound<'py, PyAny>> {
let store = Arc::clone(&self.0);
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let vref =
store.get_virtual_ref(&key).await.map_err(PyIcechunkStoreError::from)?;

Ok(vref.map(|vref| {
let location = vref.location.url().to_string();
let (etag, last_modified) = match vref.checksum {
Some(Checksum::ETag(etag)) => (Some(etag.0), None),
Some(Checksum::LastModified(secs)) => (
None,
Some(
chrono::DateTime::from_timestamp(secs.0 as i64, 0)
.unwrap_or_default(),
),
),
None => (None, None),
};
(location, vref.offset, vref.length, etag, last_modified)
}))
})
}

#[allow(clippy::too_many_arguments)]
fn set_virtual_ref(
&self,
Expand Down
Loading
Loading