diff --git a/crates/arrow-zarr/benches/gcs_bench.rs b/crates/arrow-zarr/benches/gcs_bench.rs new file mode 100644 index 0000000..efa6769 --- /dev/null +++ b/crates/arrow-zarr/benches/gcs_bench.rs @@ -0,0 +1,89 @@ +mod shared; + +use std::collections::HashMap; +use std::sync::Arc; + +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::datasource::listing::ListingTableUrl; +use icechunk::config::GcsCredentials; +use icechunk::{ObjectStorage, Repository}; +use zarrs_icechunk::AsyncIcechunkStore; + +use shared::{CloudStorageBenchBackend, TestFixture, run_benchmark_group}; + +// ============================================================================ +// GCS Backend Implementation +// ============================================================================ + +struct GCSBenchBackend { + bucket: String, + prefix: String, +} + +impl GCSBenchBackend { + async fn new(bucket: String, prefix: String) -> Self { + Self { bucket, prefix } + } +} + +#[async_trait::async_trait] +impl CloudStorageBenchBackend for GCSBenchBackend { + async fn create_icechunk_store(url: &str) -> Arc { + let listing_url = ListingTableUrl::parse(url).unwrap(); + let bucket = listing_url + .object_store() + .as_str() + .replace("gs://", "") + .trim_end_matches("/") + .to_string(); + + let credentials = GcsCredentials::FromEnv; + + let store = Arc::new( + ObjectStorage::new_gcs( + bucket, + Some(listing_url.prefix().as_ref().to_string()), + Some(credentials), + None, + ) + .await + .unwrap() + ); + + let repo = match Repository::open(None, store.clone(), HashMap::new()).await { + Ok(repo) => repo, + Err(_) => { + Repository::create(None, store, HashMap::new()) + .await + .unwrap() + } + }; + let session = repo.writable_session("main").await.unwrap(); + + Arc::new(AsyncIcechunkStore::new(session)) + } + + fn bucket(&self) -> &str { + &self.bucket + } + + fn prefix(&self) -> &str { + &self.prefix + } +} + + +fn gcs_benchmark_group(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let url = "gs://zarr-unit-tests/test_data_gcs"; + + let fixture = rt.block_on(async { + let backend = GCSBenchBackend::new("zarr-unit-tests".into(), "test_data_gcs".into()).await; + TestFixture::new(backend, url).await + }); + + run_benchmark_group(fixture.get_session(), c, "gcs_benchmarks"); +} + +criterion_group!(gcs_benches, gcs_benchmark_group); +criterion_main!(gcs_benches); diff --git a/crates/arrow-zarr/benches/s3_bench.rs b/crates/arrow-zarr/benches/s3_bench.rs index 7070593..b2539da 100644 --- a/crates/arrow-zarr/benches/s3_bench.rs +++ b/crates/arrow-zarr/benches/s3_bench.rs @@ -1,252 +1,131 @@ +mod shared; + use std::collections::HashMap; use std::env; -use std::hint::black_box; use std::sync::Arc; -use arrow_zarr::table::ZarrTableFactory; use aws_config::{self, BehaviorVersion}; use aws_sdk_s3::types::{Delete, ObjectIdentifier}; -use aws_sdk_s3::Client; +use aws_sdk_s3::Client as S3Client; use criterion::{criterion_group, criterion_main, Criterion}; use datafusion::datasource::listing::ListingTableUrl; -use datafusion::execution::SessionStateBuilder; -use datafusion::prelude::SessionContext; use icechunk::config::{S3Credentials, S3Options}; use icechunk::{ObjectStorage, Repository}; -use ndarray::{Array, Array2}; -use zarrs::array::{codec, ArrayBuilder, DataType, FillValue}; -use zarrs::array_subset::ArraySubset; use zarrs_icechunk::AsyncIcechunkStore; -use zarrs_storage::AsyncReadableWritableListableStorageTraits; - -async fn create_s3_icechunk(url: &str) -> Arc { - let listing_url = ListingTableUrl::parse(url).unwrap(); - let bucket = listing_url - .object_store() - .as_str() - .replace("s3://", "") - .trim_end_matches("/") - .to_string(); - - let credentials = S3Credentials::FromEnv; - let config = S3Options { - region: env::var("AWS_DEFAULT_REGION").ok(), - endpoint_url: None, - anonymous: false, - allow_http: false, - force_path_style: false, - network_stream_timeout_seconds: None, - requester_pays: false, - }; - - let store = ObjectStorage::new_s3( - bucket, - Some(listing_url.prefix().as_ref().to_string()), - Some(credentials), - Some(config), - ) - .await - .unwrap(); - - let repo = Repository::create(None, Arc::new(store), HashMap::new()) - .await - .unwrap(); - let session = repo.writable_session("main").await.unwrap(); - Arc::new(AsyncIcechunkStore::new(session)) -} - -fn get_lz4_compressor() -> codec::BloscCodec { - codec::BloscCodec::new( - codec::bytes_to_bytes::blosc::BloscCompressor::LZ4, - 5.try_into().unwrap(), - Some(0), - codec::bytes_to_bytes::blosc::BloscShuffleMode::NoShuffle, - Some(1), - ) - .unwrap() -} +use shared::{CloudStorageBenchBackend, TestFixture, run_benchmark_group}; -async fn write_data_to_store( - store: Arc, - start_var_idx: usize, - prefix: &str, -) { - let n = 512; - let fill_value: i64 = 0; - let mut array_builder = ArrayBuilder::new( - vec![n, n], - [8, 8], - DataType::Int64, - FillValue::from(fill_value), - ); - - let mut builder_ref = &mut array_builder; - let codec = get_lz4_compressor(); - builder_ref = builder_ref.bytes_to_bytes_codecs(vec![Arc::new(codec)]); - - let prefix = if prefix.is_empty() { - prefix - } else { - &format!("/{}", prefix) - }; - for var_idx in start_var_idx..(start_var_idx + 8) { - let arr = builder_ref - .build(store.clone(), &format!("{}/var{}", prefix, var_idx)) - .unwrap(); - arr.async_store_metadata().await.unwrap(); - - let arr_data: Array2 = Array::from_vec((0..(n * n) as i64).step_by(1).collect()) - .into_shape_with_order((n as usize, n as usize)) - .unwrap(); - arr.async_store_array_subset_ndarray( - ArraySubset::new_with_ranges(&[0..n, 0..n]).start(), - arr_data, - ) - .await - .unwrap(); - } -} - -struct S3TestFixture { +struct S3BenchBackend { bucket: String, prefix: String, - client: Client, - session: SessionContext, + client: S3Client, } -impl S3TestFixture { - fn new() -> Self { - let url = "s3://zarr-unit-tests/test_data_1"; - let rt = tokio::runtime::Runtime::new().unwrap(); - - let (client, session) = rt.block_on(async { - let store = create_s3_icechunk(url).await; - write_data_to_store(store.clone(), 1, "").await; - let _ = store - .session() - .write() - .await - .commit("some test data", None) - .await - .unwrap(); - - let config = aws_config::load_defaults(BehaviorVersion::latest()).await; - let client = Client::new(&config); - - let mut state = SessionStateBuilder::new().build(); - - state - .table_factories_mut() - .insert("ICECHUNK_REPO".into(), Arc::new(ZarrTableFactory {})); - let session = SessionContext::new_with_state(state.clone()); - - let query = format!( - " - CREATE EXTERNAL TABLE zarr_table - STORED AS ICECHUNK_REPO LOCATION '{}' - ", - url - ); - session.sql(&query).await.unwrap(); - - (client, session) - }); - +impl S3BenchBackend { + async fn new(bucket: String, prefix: String) -> Self { + let config = aws_config::load_defaults(BehaviorVersion::latest()).await; + let client = S3Client::new(&config); Self { - bucket: "zarr-unit-tests".into(), - prefix: "test_data_1".into(), + bucket, + prefix, client, - session, } } +} + +#[async_trait::async_trait] +impl CloudStorageBenchBackend for S3BenchBackend { + async fn create_icechunk_store(url: &str) -> Arc { + let listing_url = ListingTableUrl::parse(url).unwrap(); + let bucket = listing_url + .object_store() + .as_str() + .replace("s3://", "") + .trim_end_matches("/") + .to_string(); + + let credentials = S3Credentials::FromEnv; + let config = S3Options { + region: env::var("AWS_DEFAULT_REGION").ok(), + endpoint_url: None, + anonymous: false, + allow_http: false, + force_path_style: false, + network_stream_timeout_seconds: None, + requester_pays: false, + }; + + let store = ObjectStorage::new_s3( + bucket, + Some(listing_url.prefix().as_ref().to_string()), + Some(credentials), + Some(config), + ) + .await + .unwrap(); + + let repo = Repository::create(None, Arc::new(store), HashMap::new()) + .await + .unwrap(); + let session = repo.writable_session("main").await.unwrap(); - fn get_session(&self) -> &SessionContext { - &self.session + Arc::new(AsyncIcechunkStore::new(session)) } -} -impl Drop for S3TestFixture { - fn drop(&mut self) { - let rt = tokio::runtime::Runtime::new().unwrap(); + async fn cleanup(&self) { + let objects = self + .client + .list_objects_v2() + .bucket(self.bucket.clone()) + .prefix(self.prefix.clone()) + .send() + .await + .unwrap(); - rt.block_on(async { - let objects = self - .client - .list_objects_v2() + let to_delete: Vec<_> = objects + .contents() + .iter() + .filter_map(|obj| { + obj.key() + .map(|k| ObjectIdentifier::builder().key(k).build().unwrap()) + }) + .collect(); + + if !to_delete.is_empty() { + let delete = Delete::builder() + .set_objects(Some(to_delete)) + .build() + .unwrap(); + self.client + .delete_objects() .bucket(self.bucket.clone()) - .prefix(self.prefix.clone()) + .delete(delete) .send() .await .unwrap(); + } + } - let to_delete: Vec<_> = objects - .contents() - .iter() - .filter_map(|obj| { - obj.key() - .map(|k| ObjectIdentifier::builder().key(k).build().unwrap()) - }) - .collect(); - - if !to_delete.is_empty() { - let delete = Delete::builder() - .set_objects(Some(to_delete)) - .build() - .unwrap(); - self.client - .delete_objects() - .bucket(self.bucket.clone()) - .delete(delete) - .send() - .await - .unwrap(); - } - }) + fn bucket(&self) -> &str { + &self.bucket } -} -async fn run_query(query: &str, session: SessionContext) { - let df = session.sql(query).await.unwrap(); - let _ = df.collect().await.unwrap(); + fn prefix(&self) -> &str { + &self.prefix + } } -fn benchmark_query(c: &mut Criterion) { +fn s3_benchmark_group(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); - let s3_fixture = S3TestFixture::new(); - - let mut group = c.benchmark_group("my_group"); - group.sample_size(20); - - let session = s3_fixture.get_session().clone(); - let query = " - SELECT t1.*, t2.* - FROM zarr_table as t1 - JOIN zarr_table as t2 - ON t1.var1 % 12 = 0 - AND t1.var1 < t2.var1 + 1 - AND t1.var1 >= t2.var1 - 1 - "; - - group.bench_function("benchmark 1", |b| { - b.to_async(&rt) - .iter(|| async { run_query(black_box(query), black_box(session.clone())).await }) + let url = "s3://zarr-unit-tests/test_data_s3"; + + let fixture = rt.block_on(async { + let backend = S3BenchBackend::new("zarr-unit-tests".into(), "test_data_s3".into()).await; + TestFixture::new(backend, url).await }); - let query = " - SELECT * - FROM zarr_table - - UNION ALL - - SELECT * - FROM zarr_table - "; - group.bench_function("benchmark 2", |b| { - b.to_async(&rt) - .iter(|| async { run_query(black_box(query), black_box(session.clone())).await }) - }); + run_benchmark_group(fixture.get_session(), c, "s3_benchmarks"); } -criterion_group!(benches, benchmark_query); -criterion_main!(benches); +criterion_group!(s3_benches, s3_benchmark_group); +criterion_main!(s3_benches); diff --git a/crates/arrow-zarr/benches/shared.rs b/crates/arrow-zarr/benches/shared.rs new file mode 100644 index 0000000..7d1b120 --- /dev/null +++ b/crates/arrow-zarr/benches/shared.rs @@ -0,0 +1,159 @@ +#![cfg(feature = "datafusion")] // feature gate for datafusion +use std::hint::black_box; +use std::sync::Arc; + +use arrow_zarr::table::ZarrTableFactory; +use criterion::Criterion; +use datafusion::execution::SessionStateBuilder; +use datafusion::prelude::SessionContext; +use ndarray::{Array, Array2}; +use zarrs::array::{codec, ArrayBuilder, DataType, FillValue}; +use zarrs::array_subset::ArraySubset; +use zarrs_icechunk::AsyncIcechunkStore; +use zarrs_storage::AsyncReadableWritableListableStorageTraits; + +pub fn get_lz4_compressor() -> codec::BloscCodec { + codec::BloscCodec::new( + codec::bytes_to_bytes::blosc::BloscCompressor::LZ4, + 5.try_into().unwrap(), + Some(0), + codec::bytes_to_bytes::blosc::BloscShuffleMode::NoShuffle, + Some(1), + ) + .unwrap() +} + +pub async fn write_data_to_store( + store: Arc, + start_var_idx: usize, + prefix: &str, +) { + let n = 512; + let fill_value: i64 = 0; + let mut array_builder = ArrayBuilder::new( + vec![n, n], + [8, 8], + DataType::Int64, + FillValue::from(fill_value), + ); + + let mut builder_ref = &mut array_builder; + let codec = get_lz4_compressor(); + builder_ref = builder_ref.bytes_to_bytes_codecs(vec![Arc::new(codec)]); + + let prefix = if prefix.is_empty() { + prefix + } else { + &format!("/{}", prefix) + }; + + for var_idx in start_var_idx..(start_var_idx + 8) { + let arr = builder_ref + .build(store.clone(), &format!("{}/var{}", prefix, var_idx)) + .unwrap(); + arr.async_store_metadata().await.unwrap(); + + let arr_data: Array2 = Array::from_vec((0..(n * n) as i64).step_by(1).collect()) + .into_shape_with_order((n as usize, n as usize)) + .unwrap(); + arr.async_store_array_subset_ndarray( + ArraySubset::new_with_ranges(&[0..n, 0..n]).start(), + arr_data, + ) + .await + .unwrap(); + } +} + +async fn run_query(query: &str, session: SessionContext) { + let df = session.sql(query).await.unwrap(); + let _ = df.collect().await.unwrap(); +} + +pub fn run_benchmark_group(session: &SessionContext, c: &mut Criterion, group_name: &str) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut group = c.benchmark_group(group_name); + group.sample_size(20); + + let session = session.clone(); + + let query1 = " + SELECT t1.*, t2.* + FROM zarr_table as t1 + JOIN zarr_table as t2 + ON t1.var1 % 12 = 0 + AND t1.var1 < t2.var1 + 1 + AND t1.var1 >= t2.var1 - 1 + "; + + group.bench_function("join_benchmark", |b| { + b.to_async(&rt) + .iter(|| async { run_query(black_box(query1), black_box(session.clone())).await }) + }); + + let query2 = " + SELECT * + FROM zarr_table + + UNION ALL + + SELECT * + FROM zarr_table + "; + + group.bench_function("union_benchmark", |b| { + b.to_async(&rt) + .iter(|| async { run_query(black_box(query2), black_box(session.clone())).await }) + }); + + group.finish(); +} + + +#[async_trait::async_trait] +pub trait CloudStorageBenchBackend: Send + Sync { + // must be implemented for different cloud storage providers to enable benchmarking + async fn create_icechunk_store(url: &str) -> Arc; + fn bucket(&self) -> &str; + fn prefix(&self) -> &str; +} + +pub struct TestFixture { + backend: B, + session: SessionContext, +} + +impl TestFixture { + pub async fn new(backend: B, url: &str) -> Self { + let store: Arc = B::create_icechunk_store(url).await; + write_data_to_store(store.clone(), 1, "").await; + let session = store.session(); + let mut writer = session.write().await; + writer + .commit("Test data for benchmarking", None) + .await + .unwrap(); + + let mut state = SessionStateBuilder::new().build(); + state + .table_factories_mut() + .insert("ICECHUNK_REPO".into(), Arc::new(ZarrTableFactory {})); + let session = SessionContext::new_with_state(state.clone()); + + let query = format!( + " + CREATE EXTERNAL TABLE zarr_table + STORED AS ICECHUNK_REPO LOCATION '{}' + ", + url + ); + let _ = session.sql(&query).await.unwrap(); + + Self { backend, session } + } + + pub fn get_session(&self) -> &SessionContext { + &self.session + } +} + diff --git a/crates/arrow-zarr/src/table/config.rs b/crates/arrow-zarr/src/table/config.rs index d80b9d7..0f4cf19 100644 --- a/crates/arrow-zarr/src/table/config.rs +++ b/crates/arrow-zarr/src/table/config.rs @@ -176,6 +176,26 @@ impl ZarrTableUrl { .await .map_err(|e| DataFusionError::External(Box::new(e)))? } + "gs" => { + use icechunk::config::GcsCredentials; + + let bucket = table_url + .object_store() + .as_str() + .replace("gs://", "") + .trim_end_matches("/") + .to_string(); + let credentials = GcsCredentials::FromEnv; + + ObjectStorage::new_gcs( + bucket, + Some(table_url.prefix().as_ref().to_string()), + Some(credentials), + None, + ) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))? + } _ => { return Err(DataFusionError::Execution(format!( "Unsupported table url scheme {} for icechunk repos",