Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions icechunk-python/python/icechunk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
AnyObjectStoreConfig,
azure_storage,
gcs_storage,
gcs_store,
http_store,
in_memory_storage,
local_filesystem_storage,
r2_storage,
Expand Down Expand Up @@ -144,6 +146,8 @@
"gcs_refreshable_credentials",
"gcs_static_credentials",
"gcs_storage",
"gcs_store",
"http_store",
"in_memory_storage",
"initialize_logs",
"local_filesystem_storage",
Expand Down
10 changes: 7 additions & 3 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ class ObjectStoreConfig:
def __init__(self, options: S3Options) -> None: ...

class Gcs:
def __init__(self) -> None: ...
def __init__(self, opts: dict[str, str] | None = None) -> None: ...

class Azure:
def __init__(self) -> None: ...
def __init__(self, opts: dict[str, str] | None = None) -> None: ...

class Tigris:
def __init__(self) -> None: ...
def __init__(self, opts: S3Options) -> None: ...

class Http:
def __init__(self, opts: dict[str, str] | None = None) -> None: ...

AnyObjectStoreConfig = (
ObjectStoreConfig.InMemory
Expand All @@ -61,6 +64,7 @@ AnyObjectStoreConfig = (
| ObjectStoreConfig.Gcs
| ObjectStoreConfig.Azure
| ObjectStoreConfig.Tigris
| ObjectStoreConfig.Http
)

class VirtualChunkContainer:
Expand Down
40 changes: 38 additions & 2 deletions icechunk-python/python/icechunk/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ def local_filesystem_storage(path: str) -> Storage:
return Storage.new_local_filesystem(path)


def http_store(
opts: dict[str, str] | None = None,
) -> ObjectStoreConfig.Http:
"""Build an ObjectStoreConfig instance for HTTP object stores.

Parameters
----------
opts: dict[str, str] | None
A dictionary of options for the HTTP object store. See https://docs.rs/object_store/latest/object_store/client/enum.ClientConfigKey.html#variants for a list of possible keys in snake case format.
"""
return ObjectStoreConfig.Http(opts)


def s3_store(
region: str | None = None,
endpoint_url: str | None = None,
Expand Down Expand Up @@ -332,6 +345,19 @@ def r2_storage(
)


def gcs_store(
opts: dict[str, str] | None = None,
) -> ObjectStoreConfig.Gcs:
"""Build an ObjectStoreConfig instance for Google Cloud Storage object stores.

Parameters
----------
opts: dict[str, str] | None
A dictionary of options for the Google Cloud Storage object store. See https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html#variants for a list of possible configuration keys.
"""
return ObjectStoreConfig.Gcs(opts)


def gcs_storage(
*,
bucket: str,
Expand All @@ -353,10 +379,18 @@ def gcs_storage(
The bucket where the repository will store its data
prefix: str | None
The prefix within the bucket that is the root directory of the repository
from_env: bool | None
Fetch credentials from the operative system environment
service_account_file: str | None
The path to the service account file
service_account_key: str | None
The service account key
application_credentials: str | None
The path to the application credentials file
bearer_token: str | None
The bearer token to use for the object store
from_env: bool | None
Fetch credentials from the operative system environment
config: dict[str, str] | None
A dictionary of options for the Google Cloud Storage object store. See https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html#variants for a list of possible configuration keys.
get_credentials: Callable[[], GcsBearerCredential] | None
Use this function to get and refresh object store credentials
scatter_initial_credentials: bool, optional
Expand Down Expand Up @@ -412,6 +446,8 @@ def azure_storage(
Azure Blob Storage credential bearer token
from_env: bool | None
Fetch credentials from the operative system environment
config: dict[str, str] | None
A dictionary of options for the Azure Blob Storage object store. See https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants for a list of possible configuration keys.
"""
credentials = azure_credentials(
access_key=access_key,
Expand Down
13 changes: 10 additions & 3 deletions icechunk-python/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,9 @@ pub enum PyObjectStoreConfig {
S3Compatible(PyS3Options),
S3(PyS3Options),
Gcs(Option<HashMap<String, String>>),
Azure(HashMap<String, String>),
Azure(Option<HashMap<String, String>>),
Tigris(PyS3Options),
Http(Option<HashMap<String, String>>),
}

impl From<&PyObjectStoreConfig> for ObjectStoreConfig {
Expand All @@ -477,8 +478,13 @@ impl From<&PyObjectStoreConfig> for ObjectStoreConfig {
PyObjectStoreConfig::Gcs(opts) => {
ObjectStoreConfig::Gcs(opts.clone().unwrap_or_default())
}
PyObjectStoreConfig::Azure(opts) => ObjectStoreConfig::Azure(opts.clone()),
PyObjectStoreConfig::Azure(opts) => {
ObjectStoreConfig::Azure(opts.clone().unwrap_or_default())
}
PyObjectStoreConfig::Tigris(opts) => ObjectStoreConfig::Tigris(opts.into()),
PyObjectStoreConfig::Http(opts) => {
ObjectStoreConfig::Http(opts.clone().unwrap_or_default())
}
}
}
}
Expand All @@ -495,8 +501,9 @@ impl From<ObjectStoreConfig> for PyObjectStoreConfig {
}
ObjectStoreConfig::S3(opts) => PyObjectStoreConfig::S3(opts.into()),
ObjectStoreConfig::Gcs(opts) => PyObjectStoreConfig::Gcs(Some(opts)),
ObjectStoreConfig::Azure(opts) => PyObjectStoreConfig::Azure(opts),
ObjectStoreConfig::Azure(opts) => PyObjectStoreConfig::Azure(Some(opts)),
ObjectStoreConfig::Tigris(opts) => PyObjectStoreConfig::Tigris(opts.into()),
ObjectStoreConfig::Http(opts) => PyObjectStoreConfig::Http(Some(opts)),
}
}
}
Expand Down
38 changes: 26 additions & 12 deletions icechunk-python/tests/test_virtual_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
VirtualChunkContainer,
VirtualChunkSpec,
containers_credentials,
http_store,
in_memory_storage,
local_filesystem_storage,
s3_credentials,
Expand Down Expand Up @@ -213,21 +214,33 @@ async def test_write_minio_virtual_refs() -> None:
_snapshot_id = session.commit("Add virtual refs")


async def test_from_s3_public_virtual_refs(tmpdir: Path) -> None:
@pytest.mark.parametrize(
"container_type,url_prefix,store_config",
[
(
"s3",
"s3://earthmover-sample-data",
ObjectStoreConfig.S3(S3Options(region="us-east-1", anonymous=True)),
),
(
"http",
"https://earthmover-sample-data.s3.amazonaws.com",
http_store(),
),
],
)
async def test_public_virtual_refs(
tmpdir: Path,
container_type: str,
url_prefix: str,
store_config: ObjectStoreConfig.S3 | ObjectStoreConfig.Http,
) -> None:
config = RepositoryConfig.default()
store_config = ObjectStoreConfig.S3(
S3Options(
region="us-east-1",
anonymous=True,
)
)
container = VirtualChunkContainer(
"sample-data", "s3://earthmover-sample-data", store_config
)
container = VirtualChunkContainer("sample-data", url_prefix, store_config)
config.set_virtual_chunk_container(container)

repo = Repository.open_or_create(
storage=local_filesystem_storage(f"{tmpdir}/virtual"),
storage=local_filesystem_storage(f"{tmpdir}/virtual-{container_type}"),
config=config,
)
session = repo.writable_session("main")
Expand All @@ -238,9 +251,10 @@ async def test_from_s3_public_virtual_refs(tmpdir: Path) -> None:
name="year", shape=((72,)), chunks=((72,)), dtype="float32", compressors=None
)

file_path = f"{url_prefix}/netcdf/oscar_vel2018.nc"
store.set_virtual_ref(
"year/c/0",
"s3://earthmover-sample-data/netcdf/oscar_vel2018.nc",
file_path,
offset=22306,
length=288,
)
Expand Down
2 changes: 1 addition & 1 deletion icechunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ bytes = { version = "1.10.1", features = ["serde"] }
base64 = "0.22.1"
futures = "0.3.31"
itertools = "0.14.0"
object_store = { version = "0.12.2", features = ["aws", "gcp", "azure"] }
object_store = { version = "0.12.2", features = ["aws", "gcp", "azure", "http"] }
rand = "0.9.1"
thiserror = "2.0.12"
serde_json = "1.0.140"
Expand Down
1 change: 1 addition & 0 deletions icechunk/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl fmt::Display for S3Options {
pub enum ObjectStoreConfig {
InMemory,
LocalFileSystem(PathBuf),
Http(HashMap<String, String>),
S3Compatible(S3Options),
S3(S3Options),
Gcs(HashMap<String, String>),
Expand Down
87 changes: 83 additions & 4 deletions icechunk/src/storage/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ use futures::{
stream::{self, BoxStream},
};
use object_store::{
Attribute, AttributeValue, Attributes, BackoffConfig, CredentialProvider, GetOptions,
ObjectMeta, ObjectStore, PutMode, PutOptions, PutPayload, RetryConfig,
StaticCredentialProvider, UpdateVersion,
Attribute, AttributeValue, Attributes, BackoffConfig, ClientConfigKey,
CredentialProvider, GetOptions, ObjectMeta, ObjectStore, PutMode, PutOptions,
PutPayload, RetryConfig, StaticCredentialProvider, UpdateVersion,
aws::AmazonS3Builder,
azure::{AzureConfigKey, MicrosoftAzureBuilder},
gcp::{GcpCredential, GoogleCloudStorageBuilder, GoogleConfigKey},
http::HttpBuilder,
local::LocalFileSystem,
memory::InMemory,
path::Path as ObjectPath,
Expand Down Expand Up @@ -278,7 +279,7 @@ impl private::Sealed for ObjectStorage {}
#[typetag::serde]
impl Storage for ObjectStorage {
fn can_write(&self) -> bool {
true
self.backend.can_write()
}

#[instrument(skip_all)]
Expand Down Expand Up @@ -653,6 +654,10 @@ pub trait ObjectStoreBackend: Debug + Display + Sync + Send {
}

fn default_settings(&self) -> Settings;

fn can_write(&self) -> bool {
true
}
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -756,6 +761,80 @@ impl ObjectStoreBackend for LocalFileSystemObjectStoreBackend {
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct HttpObjectStoreBackend {
pub url: String,
pub config: Option<HashMap<ClientConfigKey, String>>,
}

impl fmt::Display for HttpObjectStoreBackend {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"HttpObjectStoreBackend(url={}, config={})",
self.url,
self.config
.as_ref()
.map(|c| c
.iter()
.map(|(k, v)| format!("{:?}={}", k, v))
.collect::<Vec<_>>()
.join(", "))
.unwrap_or("None".to_string())
)
}
}

#[typetag::serde(name = "http_object_store_provider")]
impl ObjectStoreBackend for HttpObjectStoreBackend {
fn mk_object_store(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to override the can_write method to return false.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

&self,
settings: &Settings,
) -> Result<Arc<dyn ObjectStore>, StorageError> {
let builder = HttpBuilder::new().with_url(&self.url);

// Add options
let builder = self
.config
.as_ref()
.unwrap_or(&HashMap::new())
.iter()
.fold(builder, |builder, (key, value)| builder.with_config(*key, value));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mpiannucci can you use the Settings you get as argument to configure the retry attempts this store will do? You can base it on the S3 or GCS examples in this same file

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


let builder = builder.with_retry(RetryConfig {
backoff: BackoffConfig {
init_backoff: core::time::Duration::from_millis(
settings.retries().initial_backoff_ms() as u64,
),
max_backoff: core::time::Duration::from_millis(
settings.retries().max_backoff_ms() as u64,
),
base: 2.,
},
max_retries: settings.retries().max_tries().get() as usize - 1,
retry_timeout: core::time::Duration::from_secs(5 * 60),
});

let store =
builder.build().map_err(|e| StorageErrorKind::Other(e.to_string()))?;

Ok(Arc::new(store))
}

fn prefix(&self) -> String {
"".to_string()
}

fn default_settings(&self) -> Settings {
Default::default()
}

fn can_write(&self) -> bool {
// TODO: Support write operations?
false
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct S3ObjectStoreBackend {
bucket: String,
Expand Down
Loading