From d104e23dd9f5af6936bd1c1ab29dacdccef1e627 Mon Sep 17 00:00:00 2001 From: Xavier Nogueira <58796351+xaviernogueira@users.noreply.github.com> Date: Wed, 21 Jan 2026 00:47:04 -0600 Subject: [PATCH 1/4] =?UTF-8?q?=E2=9C=8F=EF=B8=8F=20=20Decomposing=20S3=20?= =?UTF-8?q?bench=20code?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 8 +- benches/s3_bench.rs | 305 +++++++++++++------------------------------- benches/shared.rs | 171 +++++++++++++++++++++++++ 3 files changed, 270 insertions(+), 214 deletions(-) create mode 100644 benches/shared.rs diff --git a/Cargo.toml b/Cargo.toml index eaaa599..00180b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,8 +44,14 @@ aws-sdk-s3 = { version = "1.78.0" } criterion = { version = "0.7.0", features = ["async_tokio"] } walkdir = { version = "2.5.0" } +[[bench]] +name = "shared" +path = "benches/shared.rs" +test = false # shared benchmark code, not meant to be ran alone [[bench]] name = "s3_bench" +path = "benches/s3_bench.rs" harness = false -required-features = ["icechunk"] \ No newline at end of file +required-features = ["icechunk"] + diff --git a/benches/s3_bench.rs b/benches/s3_bench.rs index 7070593..b2539da 100644 --- a/benches/s3_bench.rs +++ b/benches/s3_bench.rs @@ -1,252 +1,131 @@ +mod shared; + use std::collections::HashMap; use std::env; -use std::hint::black_box; use std::sync::Arc; -use arrow_zarr::table::ZarrTableFactory; use aws_config::{self, BehaviorVersion}; use aws_sdk_s3::types::{Delete, ObjectIdentifier}; -use aws_sdk_s3::Client; +use aws_sdk_s3::Client as S3Client; use criterion::{criterion_group, criterion_main, Criterion}; use datafusion::datasource::listing::ListingTableUrl; -use datafusion::execution::SessionStateBuilder; -use datafusion::prelude::SessionContext; use icechunk::config::{S3Credentials, S3Options}; use icechunk::{ObjectStorage, Repository}; -use ndarray::{Array, Array2}; -use zarrs::array::{codec, ArrayBuilder, DataType, FillValue}; -use zarrs::array_subset::ArraySubset; use zarrs_icechunk::AsyncIcechunkStore; -use zarrs_storage::AsyncReadableWritableListableStorageTraits; - -async fn create_s3_icechunk(url: &str) -> Arc { - let listing_url = ListingTableUrl::parse(url).unwrap(); - let bucket = listing_url - .object_store() - .as_str() - .replace("s3://", "") - .trim_end_matches("/") - .to_string(); - - let credentials = S3Credentials::FromEnv; - let config = S3Options { - region: env::var("AWS_DEFAULT_REGION").ok(), - endpoint_url: None, - anonymous: false, - allow_http: false, - force_path_style: false, - network_stream_timeout_seconds: None, - requester_pays: false, - }; - - let store = ObjectStorage::new_s3( - bucket, - Some(listing_url.prefix().as_ref().to_string()), - Some(credentials), - Some(config), - ) - .await - .unwrap(); - - let repo = Repository::create(None, Arc::new(store), HashMap::new()) - .await - .unwrap(); - let session = repo.writable_session("main").await.unwrap(); - Arc::new(AsyncIcechunkStore::new(session)) -} - -fn get_lz4_compressor() -> codec::BloscCodec { - codec::BloscCodec::new( - codec::bytes_to_bytes::blosc::BloscCompressor::LZ4, - 5.try_into().unwrap(), - Some(0), - codec::bytes_to_bytes::blosc::BloscShuffleMode::NoShuffle, - Some(1), - ) - .unwrap() -} +use shared::{CloudStorageBenchBackend, TestFixture, run_benchmark_group}; -async fn write_data_to_store( - store: Arc, - start_var_idx: usize, - prefix: &str, -) { - let n = 512; - let fill_value: i64 = 0; - let mut array_builder = ArrayBuilder::new( - vec![n, n], - [8, 8], - DataType::Int64, - FillValue::from(fill_value), - ); - - let mut builder_ref = &mut array_builder; - let codec = get_lz4_compressor(); - builder_ref = builder_ref.bytes_to_bytes_codecs(vec![Arc::new(codec)]); - - let prefix = if prefix.is_empty() { - prefix - } else { - &format!("/{}", prefix) - }; - for var_idx in start_var_idx..(start_var_idx + 8) { - let arr = builder_ref - .build(store.clone(), &format!("{}/var{}", prefix, var_idx)) - .unwrap(); - arr.async_store_metadata().await.unwrap(); - - let arr_data: Array2 = Array::from_vec((0..(n * n) as i64).step_by(1).collect()) - .into_shape_with_order((n as usize, n as usize)) - .unwrap(); - arr.async_store_array_subset_ndarray( - ArraySubset::new_with_ranges(&[0..n, 0..n]).start(), - arr_data, - ) - .await - .unwrap(); - } -} - -struct S3TestFixture { +struct S3BenchBackend { bucket: String, prefix: String, - client: Client, - session: SessionContext, + client: S3Client, } -impl S3TestFixture { - fn new() -> Self { - let url = "s3://zarr-unit-tests/test_data_1"; - let rt = tokio::runtime::Runtime::new().unwrap(); - - let (client, session) = rt.block_on(async { - let store = create_s3_icechunk(url).await; - write_data_to_store(store.clone(), 1, "").await; - let _ = store - .session() - .write() - .await - .commit("some test data", None) - .await - .unwrap(); - - let config = aws_config::load_defaults(BehaviorVersion::latest()).await; - let client = Client::new(&config); - - let mut state = SessionStateBuilder::new().build(); - - state - .table_factories_mut() - .insert("ICECHUNK_REPO".into(), Arc::new(ZarrTableFactory {})); - let session = SessionContext::new_with_state(state.clone()); - - let query = format!( - " - CREATE EXTERNAL TABLE zarr_table - STORED AS ICECHUNK_REPO LOCATION '{}' - ", - url - ); - session.sql(&query).await.unwrap(); - - (client, session) - }); - +impl S3BenchBackend { + async fn new(bucket: String, prefix: String) -> Self { + let config = aws_config::load_defaults(BehaviorVersion::latest()).await; + let client = S3Client::new(&config); Self { - bucket: "zarr-unit-tests".into(), - prefix: "test_data_1".into(), + bucket, + prefix, client, - session, } } +} + +#[async_trait::async_trait] +impl CloudStorageBenchBackend for S3BenchBackend { + async fn create_icechunk_store(url: &str) -> Arc { + let listing_url = ListingTableUrl::parse(url).unwrap(); + let bucket = listing_url + .object_store() + .as_str() + .replace("s3://", "") + .trim_end_matches("/") + .to_string(); + + let credentials = S3Credentials::FromEnv; + let config = S3Options { + region: env::var("AWS_DEFAULT_REGION").ok(), + endpoint_url: None, + anonymous: false, + allow_http: false, + force_path_style: false, + network_stream_timeout_seconds: None, + requester_pays: false, + }; + + let store = ObjectStorage::new_s3( + bucket, + Some(listing_url.prefix().as_ref().to_string()), + Some(credentials), + Some(config), + ) + .await + .unwrap(); + + let repo = Repository::create(None, Arc::new(store), HashMap::new()) + .await + .unwrap(); + let session = repo.writable_session("main").await.unwrap(); - fn get_session(&self) -> &SessionContext { - &self.session + Arc::new(AsyncIcechunkStore::new(session)) } -} -impl Drop for S3TestFixture { - fn drop(&mut self) { - let rt = tokio::runtime::Runtime::new().unwrap(); + async fn cleanup(&self) { + let objects = self + .client + .list_objects_v2() + .bucket(self.bucket.clone()) + .prefix(self.prefix.clone()) + .send() + .await + .unwrap(); - rt.block_on(async { - let objects = self - .client - .list_objects_v2() + let to_delete: Vec<_> = objects + .contents() + .iter() + .filter_map(|obj| { + obj.key() + .map(|k| ObjectIdentifier::builder().key(k).build().unwrap()) + }) + .collect(); + + if !to_delete.is_empty() { + let delete = Delete::builder() + .set_objects(Some(to_delete)) + .build() + .unwrap(); + self.client + .delete_objects() .bucket(self.bucket.clone()) - .prefix(self.prefix.clone()) + .delete(delete) .send() .await .unwrap(); + } + } - let to_delete: Vec<_> = objects - .contents() - .iter() - .filter_map(|obj| { - obj.key() - .map(|k| ObjectIdentifier::builder().key(k).build().unwrap()) - }) - .collect(); - - if !to_delete.is_empty() { - let delete = Delete::builder() - .set_objects(Some(to_delete)) - .build() - .unwrap(); - self.client - .delete_objects() - .bucket(self.bucket.clone()) - .delete(delete) - .send() - .await - .unwrap(); - } - }) + fn bucket(&self) -> &str { + &self.bucket } -} -async fn run_query(query: &str, session: SessionContext) { - let df = session.sql(query).await.unwrap(); - let _ = df.collect().await.unwrap(); + fn prefix(&self) -> &str { + &self.prefix + } } -fn benchmark_query(c: &mut Criterion) { +fn s3_benchmark_group(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); - let s3_fixture = S3TestFixture::new(); - - let mut group = c.benchmark_group("my_group"); - group.sample_size(20); - - let session = s3_fixture.get_session().clone(); - let query = " - SELECT t1.*, t2.* - FROM zarr_table as t1 - JOIN zarr_table as t2 - ON t1.var1 % 12 = 0 - AND t1.var1 < t2.var1 + 1 - AND t1.var1 >= t2.var1 - 1 - "; - - group.bench_function("benchmark 1", |b| { - b.to_async(&rt) - .iter(|| async { run_query(black_box(query), black_box(session.clone())).await }) + let url = "s3://zarr-unit-tests/test_data_s3"; + + let fixture = rt.block_on(async { + let backend = S3BenchBackend::new("zarr-unit-tests".into(), "test_data_s3".into()).await; + TestFixture::new(backend, url).await }); - let query = " - SELECT * - FROM zarr_table - - UNION ALL - - SELECT * - FROM zarr_table - "; - group.bench_function("benchmark 2", |b| { - b.to_async(&rt) - .iter(|| async { run_query(black_box(query), black_box(session.clone())).await }) - }); + run_benchmark_group(fixture.get_session(), c, "s3_benchmarks"); } -criterion_group!(benches, benchmark_query); -criterion_main!(benches); +criterion_group!(s3_benches, s3_benchmark_group); +criterion_main!(s3_benches); diff --git a/benches/shared.rs b/benches/shared.rs new file mode 100644 index 0000000..8a5a5f8 --- /dev/null +++ b/benches/shared.rs @@ -0,0 +1,171 @@ +use std::collections::HashMap; +use std::hint::black_box; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_zarr::table::ZarrTableFactory; +use criterion::Criterion; +use datafusion::dataframe::DataFrame; +use datafusion::execution::SessionStateBuilder; +use datafusion::prelude::SessionContext; +use ndarray::{Array, Array2}; +use zarrs::array::{codec, ArrayBuilder, DataType, FillValue}; +use zarrs::array_subset::ArraySubset; +use zarrs_icechunk::AsyncIcechunkStore; +use zarrs_storage::AsyncReadableWritableListableStorageTraits; + +pub fn get_lz4_compressor() -> codec::BloscCodec { + codec::BloscCodec::new( + codec::bytes_to_bytes::blosc::BloscCompressor::LZ4, + 5.try_into().unwrap(), + Some(0), + codec::bytes_to_bytes::blosc::BloscShuffleMode::NoShuffle, + Some(1), + ) + .unwrap() +} + +pub async fn write_data_to_store( + store: Arc, + start_var_idx: usize, + prefix: &str, +) { + let n = 512; + let fill_value: i64 = 0; + let mut array_builder = ArrayBuilder::new( + vec![n, n], + [8, 8], + DataType::Int64, + FillValue::from(fill_value), + ); + + let mut builder_ref = &mut array_builder; + let codec = get_lz4_compressor(); + builder_ref = builder_ref.bytes_to_bytes_codecs(vec![Arc::new(codec)]); + + let prefix = if prefix.is_empty() { + prefix + } else { + &format!("/{}", prefix) + }; + + for var_idx in start_var_idx..(start_var_idx + 8) { + let arr = builder_ref + .build(store.clone(), &format!("{}/var{}", prefix, var_idx)) + .unwrap(); + arr.async_store_metadata().await.unwrap(); + + let arr_data: Array2 = Array::from_vec((0..(n * n) as i64).step_by(1).collect()) + .into_shape_with_order((n as usize, n as usize)) + .unwrap(); + arr.async_store_array_subset_ndarray( + ArraySubset::new_with_ranges(&[0..n, 0..n]).start(), + arr_data, + ) + .await + .unwrap(); + } +} + +async fn run_query(query: &str, session: SessionContext) { + let df: DataFrame = session.sql(query).await.unwrap(); + let _: Vec = df.collect().await.unwrap(); +} + +pub fn run_benchmark_group(session: &SessionContext, c: &mut Criterion, group_name: &str) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut group = c.benchmark_group(group_name); + group.sample_size(20); + + let session = session.clone(); + + let query1 = " + SELECT t1.*, t2.* + FROM zarr_table as t1 + JOIN zarr_table as t2 + ON t1.var1 % 12 = 0 + AND t1.var1 < t2.var1 + 1 + AND t1.var1 >= t2.var1 - 1 + "; + + group.bench_function("join_benchmark", |b| { + b.to_async(&rt) + .iter(|| async { run_query(black_box(query1), black_box(session.clone())).await }) + }); + + let query2 = " + SELECT * + FROM zarr_table + + UNION ALL + + SELECT * + FROM zarr_table + "; + + group.bench_function("union_benchmark", |b| { + b.to_async(&rt) + .iter(|| async { run_query(black_box(query2), black_box(session.clone())).await }) + }); + + group.finish(); +} + + +#[async_trait::async_trait] +pub trait CloudStorageBenchBackend: Send + Sync { + // must be implemented for different cloud storage providers to enable benchmarking + async fn create_icechunk_store(url: &str) -> Arc; + async fn cleanup(&self); + fn bucket(&self) -> &str; + fn prefix(&self) -> &str; +} + +pub struct TestFixture { + backend: B, + session: SessionContext, +} + +impl TestFixture { + pub async fn new(backend: B, url: &str) -> Self { + let store = B::create_icechunk_store(url).await; + write_data_to_store(store.clone(), 1, "").await; + let _ = store + .session() + .write() + .await + .commit("Test data for benchmarking", None) + .await + .unwrap(); + + let mut state = SessionStateBuilder::new().build(); + state + .table_factories_mut() + .insert("ICECHUNK_REPO".into(), Arc::new(ZarrTableFactory {})); + let session = SessionContext::new_with_state(state.clone()); + + let query = format!( + " + CREATE EXTERNAL TABLE zarr_table + STORED AS ICECHUNK_REPO LOCATION '{}' + ", + url + ); + session.sql(&query).await.unwrap(); + + Self { backend, session } + } + + pub fn get_session(&self) -> &SessionContext { + &self.session + } +} + +impl Drop for TestFixture { + fn drop(&mut self) { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + self.backend.cleanup().await; + }); + } +} From d13f69831c1e4ed2fd49df47769c37c1cbc9a24c Mon Sep 17 00:00:00 2001 From: Xavier Nogueira <58796351+xaviernogueira@users.noreply.github.com> Date: Wed, 21 Jan 2026 00:47:45 -0600 Subject: [PATCH 2/4] =?UTF-8?q?=E2=9C=A8=20adding=20GCS=20bench=20+=20dev?= =?UTF-8?q?=20dependency?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 7 +++ benches/gcs_bench.rs | 120 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 benches/gcs_bench.rs diff --git a/Cargo.toml b/Cargo.toml index 00180b6..4b7def0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ zarrs_storage = { version = "0.4.0", features = ["async"] } aws-config = { version = "1.5.18" } aws-sdk-s3 = { version = "1.78.0" } criterion = { version = "0.7.0", features = ["async_tokio"] } +google-cloud-storage = "1.6.0" walkdir = { version = "2.5.0" } [[bench]] @@ -55,3 +56,9 @@ path = "benches/s3_bench.rs" harness = false required-features = ["icechunk"] +[[bench]] +name = "gcs_bench" +path = "benches/gcs_bench.rs" +harness = false +required-features = ["icechunk"] + diff --git a/benches/gcs_bench.rs b/benches/gcs_bench.rs new file mode 100644 index 0000000..6255b9e --- /dev/null +++ b/benches/gcs_bench.rs @@ -0,0 +1,120 @@ +mod shared; + +use std::collections::HashMap; +use std::sync::Arc; + +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::datasource::listing::ListingTableUrl; +use google_cloud_storage::http::objects::delete::DeleteObjectRequest; +use google_cloud_storage::http::objects::list::ListObjectsRequest; +use icechunk::config::{GcsCredentials, GcsOptions}; +use icechunk::{ObjectStorage, Repository}; +use zarrs_icechunk::AsyncIcechunkStore; + +use shared::{CloudStorageBackend, TestFixture, run_benchmark_group}; + +// ============================================================================ +// GCS Backend Implementation +// ============================================================================ + +struct GCSBenchBackend { + bucket: String, + prefix: String, + client: google_cloud_storage::client::Client, +} + +impl GCSBenchBackend { + async fn new(bucket: String, prefix: String) -> Self { + let config = google_cloud_storage::client::ClientConfig::default() + .with_auth() + .await + .unwrap(); + let client = google_cloud_storage::client::Client::new(config); + + Self { + bucket, + prefix, + client, + } + } +} + +#[async_trait::async_trait] +impl CloudStorageBackend for GCSBenchBackend { + async fn create_store(url: &str) -> Arc { + let listing_url = ListingTableUrl::parse(url).unwrap(); + let bucket = listing_url + .object_store() + .as_str() + .replace("gs://", "") + .trim_end_matches("/") + .to_string(); + + let credentials = GcsCredentials::FromEnv; + let config = GcsOptions { + endpoint_url: None, + anonymous: false, + allow_http: false, + }; + + let store = ObjectStorage::new_gcs( + bucket, + Some(listing_url.prefix().as_ref().to_string()), + Some(credentials), + Some(config), + ) + .await + .unwrap(); + + let repo = Repository::create(None, Arc::new(store), HashMap::new()) + .await + .unwrap(); + let session = repo.writable_session("main").await.unwrap(); + + Arc::new(AsyncIcechunkStore::new(session)) + } + + async fn cleanup(&self) { + + let list_request = ListObjectsRequest { + bucket: self.bucket.clone(), + prefix: Some(self.prefix.clone()), + ..Default::default() + }; + + let objects = self.client.list_objects(&list_request).await.unwrap(); + + for obj in objects.items.unwrap_or_default() { + let delete_request = DeleteObjectRequest { + bucket: self.bucket.clone(), + object: obj.name, + ..Default::default() + }; + let _ = self.client.delete_object(&delete_request).await; + } + } + + fn bucket(&self) -> &str { + &self.bucket + } + + fn prefix(&self) -> &str { + &self.prefix + } +} + + +fn gcs_benchmark_group(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let url = "gs://zarr-unit-tests/test_data_gcs"; + + let fixture = rt.block_on(async { + let backend = GCSBenchBackend::new("zarr-unit-tests".into(), "test_data_gcs".into()).await; + TestFixture::new(backend, url).await + }); + + run_benchmark_group(fixture.get_session(), c, "gcs_benchmarks"); +} + +criterion_group!(gcs_benches, gcs_benchmark_group); +criterion_main!(gcs_benches); From 7ec113b57df398e6496be1ed705e56f8cff98644 Mon Sep 17 00:00:00 2001 From: Xavier Nogueira <58796351+xaviernogueira@users.noreply.github.com> Date: Wed, 21 Jan 2026 01:31:19 -0600 Subject: [PATCH 3/4] =?UTF-8?q?=E2=9C=8F=EF=B8=8F=20fixing=20things?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 8 +++++--- benches/gcs_bench.rs | 6 +++--- benches/shared.rs | 19 ++++++++----------- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4b7def0..6491b1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,17 +48,19 @@ walkdir = { version = "2.5.0" } [[bench]] name = "shared" path = "benches/shared.rs" -test = false # shared benchmark code, not meant to be ran alone +# shared benchmark code, not meant to be ran alone +test = false +bench = false [[bench]] name = "s3_bench" path = "benches/s3_bench.rs" harness = false -required-features = ["icechunk"] +required-features = ["icechunk", "datafusion"] [[bench]] name = "gcs_bench" path = "benches/gcs_bench.rs" harness = false -required-features = ["icechunk"] +required-features = ["icechunk", "datafusion"] diff --git a/benches/gcs_bench.rs b/benches/gcs_bench.rs index 6255b9e..88fea6a 100644 --- a/benches/gcs_bench.rs +++ b/benches/gcs_bench.rs @@ -11,7 +11,7 @@ use icechunk::config::{GcsCredentials, GcsOptions}; use icechunk::{ObjectStorage, Repository}; use zarrs_icechunk::AsyncIcechunkStore; -use shared::{CloudStorageBackend, TestFixture, run_benchmark_group}; +use shared::{CloudStorageBenchBackend, TestFixture, run_benchmark_group}; // ============================================================================ // GCS Backend Implementation @@ -40,8 +40,8 @@ impl GCSBenchBackend { } #[async_trait::async_trait] -impl CloudStorageBackend for GCSBenchBackend { - async fn create_store(url: &str) -> Arc { +impl CloudStorageBenchBackend for GCSBenchBackend { + async fn create_icechunk_store(url: &str) -> Arc { let listing_url = ListingTableUrl::parse(url).unwrap(); let bucket = listing_url .object_store() diff --git a/benches/shared.rs b/benches/shared.rs index 8a5a5f8..e25732c 100644 --- a/benches/shared.rs +++ b/benches/shared.rs @@ -1,11 +1,9 @@ -use std::collections::HashMap; +#![cfg(feature = "datafusion")] // feature gate for datafusion use std::hint::black_box; use std::sync::Arc; -use arrow_array::RecordBatch; use arrow_zarr::table::ZarrTableFactory; use criterion::Criterion; -use datafusion::dataframe::DataFrame; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::SessionContext; use ndarray::{Array, Array2}; @@ -68,8 +66,8 @@ pub async fn write_data_to_store( } async fn run_query(query: &str, session: SessionContext) { - let df: DataFrame = session.sql(query).await.unwrap(); - let _: Vec = df.collect().await.unwrap(); + let df = session.sql(query).await.unwrap(); + let _ = df.collect().await.unwrap(); } pub fn run_benchmark_group(session: &SessionContext, c: &mut Criterion, group_name: &str) { @@ -128,12 +126,11 @@ pub struct TestFixture { impl TestFixture { pub async fn new(backend: B, url: &str) -> Self { - let store = B::create_icechunk_store(url).await; + let store: Arc = B::create_icechunk_store(url).await; write_data_to_store(store.clone(), 1, "").await; - let _ = store - .session() - .write() - .await + let session = store.session(); + let mut writer = session.write().await; + writer .commit("Test data for benchmarking", None) .await .unwrap(); @@ -151,7 +148,7 @@ impl TestFixture { ", url ); - session.sql(&query).await.unwrap(); + let _ = session.sql(&query).await.unwrap(); Self { backend, session } } From cf311985eaf3345edcecabe909109aa5df21e176 Mon Sep 17 00:00:00 2001 From: Xavier Nogueira <58796351+xaviernogueira@users.noreply.github.com> Date: Thu, 22 Jan 2026 13:41:34 -0600 Subject: [PATCH 4/4] =?UTF-8?q?=E2=9C=8F=EF=B8=8F=20getting=20things=20to?= =?UTF-8?q?=20work=20with=20google-cloud-storage=20v1.6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- benches/gcs_bench.rs | 77 ++++++++++++++++---------------------------- src/table/config.rs | 20 ++++++++++++ 2 files changed, 47 insertions(+), 50 deletions(-) diff --git a/benches/gcs_bench.rs b/benches/gcs_bench.rs index 88fea6a..b0b7dd5 100644 --- a/benches/gcs_bench.rs +++ b/benches/gcs_bench.rs @@ -5,9 +5,7 @@ use std::sync::Arc; use criterion::{criterion_group, criterion_main, Criterion}; use datafusion::datasource::listing::ListingTableUrl; -use google_cloud_storage::http::objects::delete::DeleteObjectRequest; -use google_cloud_storage::http::objects::list::ListObjectsRequest; -use icechunk::config::{GcsCredentials, GcsOptions}; +use icechunk::config::GcsCredentials; use icechunk::{ObjectStorage, Repository}; use zarrs_icechunk::AsyncIcechunkStore; @@ -18,23 +16,15 @@ use shared::{CloudStorageBenchBackend, TestFixture, run_benchmark_group}; // ============================================================================ struct GCSBenchBackend { - bucket: String, - prefix: String, - client: google_cloud_storage::client::Client, + _bucket: String, + _prefix: String, } impl GCSBenchBackend { async fn new(bucket: String, prefix: String) -> Self { - let config = google_cloud_storage::client::ClientConfig::default() - .with_auth() - .await - .unwrap(); - let client = google_cloud_storage::client::Client::new(config); - Self { - bucket, - prefix, - client, + _bucket: bucket, + _prefix: prefix, } } } @@ -51,55 +41,42 @@ impl CloudStorageBenchBackend for GCSBenchBackend { .to_string(); let credentials = GcsCredentials::FromEnv; - let config = GcsOptions { - endpoint_url: None, - anonymous: false, - allow_http: false, - }; - - let store = ObjectStorage::new_gcs( - bucket, - Some(listing_url.prefix().as_ref().to_string()), - Some(credentials), - Some(config), - ) - .await - .unwrap(); - let repo = Repository::create(None, Arc::new(store), HashMap::new()) + let store = Arc::new( + ObjectStorage::new_gcs( + bucket, + Some(listing_url.prefix().as_ref().to_string()), + Some(credentials), + None, + ) .await - .unwrap(); + .unwrap() + ); + + let repo = match Repository::open(None, store.clone(), HashMap::new()).await { + Ok(repo) => repo, + Err(_) => { + Repository::create(None, store, HashMap::new()) + .await + .unwrap() + } + }; let session = repo.writable_session("main").await.unwrap(); Arc::new(AsyncIcechunkStore::new(session)) } async fn cleanup(&self) { - - let list_request = ListObjectsRequest { - bucket: self.bucket.clone(), - prefix: Some(self.prefix.clone()), - ..Default::default() - }; - - let objects = self.client.list_objects(&list_request).await.unwrap(); - - for obj in objects.items.unwrap_or_default() { - let delete_request = DeleteObjectRequest { - bucket: self.bucket.clone(), - object: obj.name, - ..Default::default() - }; - let _ = self.client.delete_object(&delete_request).await; - } + // Cleanup is handled by the TestFixture Drop implementation + // which uses the icechunk store to clean up resources } fn bucket(&self) -> &str { - &self.bucket + &self._bucket } fn prefix(&self) -> &str { - &self.prefix + &self._prefix } } diff --git a/src/table/config.rs b/src/table/config.rs index d80b9d7..0f4cf19 100644 --- a/src/table/config.rs +++ b/src/table/config.rs @@ -176,6 +176,26 @@ impl ZarrTableUrl { .await .map_err(|e| DataFusionError::External(Box::new(e)))? } + "gs" => { + use icechunk::config::GcsCredentials; + + let bucket = table_url + .object_store() + .as_str() + .replace("gs://", "") + .trim_end_matches("/") + .to_string(); + let credentials = GcsCredentials::FromEnv; + + ObjectStorage::new_gcs( + bucket, + Some(table_url.prefix().as_ref().to_string()), + Some(credentials), + None, + ) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))? + } _ => { return Err(DataFusionError::Execution(format!( "Unsupported table url scheme {} for icechunk repos",