From e284993ddcfcaf95ccf8717103963ae1d04df2fd Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Wed, 9 Apr 2025 11:35:43 +0200 Subject: [PATCH 1/7] (fix): cache partial decoder --- src/lib.rs | 44 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index cae087c..308eca1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,8 +2,10 @@ #![allow(clippy::module_name_repetitions)] use std::borrow::Cow; +use std::collections::hash_map::Entry::{Occupied, Vacant}; +use std::collections::HashMap; use std::ptr::NonNull; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use numpy::npyffi::PyArrayObject; use numpy::{PyArrayDescrMethods, PyUntypedArray, PyUntypedArrayMethods}; @@ -14,12 +16,16 @@ use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon_iter_concurrent_limit::iter_concurrent_limit; use unsafe_cell_slice::UnsafeCellSlice; -use zarrs::array::codec::{ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder}; +use zarrs::array::codec::{ + ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder, +}; use zarrs::array::{ - copy_fill_value_into, update_array_bytes, ArrayBytes, ArraySize, CodecChain, FillValue, + copy_fill_value_into, update_array_bytes, ArrayBytes, ArraySize, ChunkRepresentation, + CodecChain, FillValue, }; use zarrs::array_subset::ArraySubset; use zarrs::metadata::v3::MetadataV3; +use zarrs::storage::StoreKey; mod chunk_item; mod concurrency; @@ -46,6 +52,7 @@ pub struct CodecPipelineImpl { pub(crate) chunk_concurrent_minimum: usize, pub(crate) chunk_concurrent_maximum: usize, pub(crate) num_threads: usize, + partial_decoder_cache: Arc>>>, } impl CodecPipelineImpl { @@ -245,6 +252,7 @@ impl CodecPipelineImpl { chunk_concurrent_minimum, chunk_concurrent_maximum, num_threads, + partial_decoder_cache: Arc::new(Mutex::new(HashMap::new().into())), }) } @@ -308,18 +316,34 @@ impl CodecPipelineImpl { } } } else { - let input_handle = Arc::new(self.stores.decoder(&item)?); - let partial_decoder = self - .codec_chain - .clone() - .partial_decoder(input_handle, item.representation(), &codec_options) - .map_py_err::()?; + let key = item.key().clone(); + let partial_decoder: PyResult> = match self + .partial_decoder_cache + .lock() + .map_py_err::()? + .entry(key) + { + Occupied(e) => Ok(e.get().clone()), + Vacant(e) => { + let input_handle = self.stores.decoder(&item)?; + let partial_decoder = self + .codec_chain + .clone() + .partial_decoder( + Arc::new(input_handle), + item.representation(), + &codec_options, + ) + .map_py_err::()?; + Ok(e.insert(partial_decoder).clone()) + } + }; unsafe { // SAFETY: // - output is an array with output_shape elements of the item.representation data type, // - item.subset is within the bounds of output_shape. // - item.chunk_subset has the same number of elements as item.subset. - partial_decoder.partial_decode_into( + partial_decoder?.partial_decode_into( &item.chunk_subset, &output, &output_shape, From 452741bd92252aca40c37f6928cd6422bf8a07f4 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Wed, 9 Apr 2025 16:24:41 +0200 Subject: [PATCH 2/7] (refactor): local caching --- src/lib.rs | 74 ++++++++++++++++++++++++++++++---------------------- src/utils.rs | 7 +++++ 2 files changed, 50 insertions(+), 31 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 308eca1..bb6af42 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,11 +2,11 @@ #![allow(clippy::module_name_repetitions)] use std::borrow::Cow; -use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::HashMap; use std::ptr::NonNull; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use chunk_item::WithSubset; use numpy::npyffi::PyArrayObject; use numpy::{PyArrayDescrMethods, PyUntypedArray, PyUntypedArrayMethods}; use pyo3::exceptions::{PyRuntimeError, PyTypeError, PyValueError}; @@ -16,12 +16,12 @@ use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon_iter_concurrent_limit::iter_concurrent_limit; use unsafe_cell_slice::UnsafeCellSlice; +use utils::is_whole_chunk; use zarrs::array::codec::{ ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder, }; use zarrs::array::{ - copy_fill_value_into, update_array_bytes, ArrayBytes, ArraySize, ChunkRepresentation, - CodecChain, FillValue, + copy_fill_value_into, update_array_bytes, ArrayBytes, ArraySize, CodecChain, FillValue, }; use zarrs::array_subset::ArraySubset; use zarrs::metadata::v3::MetadataV3; @@ -52,7 +52,6 @@ pub struct CodecPipelineImpl { pub(crate) chunk_concurrent_minimum: usize, pub(crate) chunk_concurrent_maximum: usize, pub(crate) num_threads: usize, - partial_decoder_cache: Arc>>>, } impl CodecPipelineImpl { @@ -252,7 +251,6 @@ impl CodecPipelineImpl { chunk_concurrent_minimum, chunk_concurrent_maximum, num_threads, - partial_decoder_cache: Arc::new(Mutex::new(HashMap::new().into())), }) } @@ -273,15 +271,43 @@ impl CodecPipelineImpl { return Ok(()); }; + // Assemble partial decoders ahead of time and in parallel + let mut item_map: HashMap = HashMap::new().into(); + chunk_descriptions + .iter() + .filter(|item| !(is_whole_chunk(item))) + .for_each(|item| { + item_map.insert(item.key().clone(), item); + }); + let mut partial_decoder_cache: HashMap> = + HashMap::new().into(); + if item_map.len() > 0 { + let key_decoder_pairs = item_map + .into_par_iter() + .map(|(key, item)| { + let input_handle = self.stores.decoder(item)?; + let partial_decoder = self + .codec_chain + .clone() + .partial_decoder( + Arc::new(input_handle), + item.representation(), + &codec_options, + ) + .map_py_err::()?; + Ok((key.clone(), partial_decoder)) + }) + .collect::>>()?; + partial_decoder_cache.extend(key_decoder_pairs); + } + py.allow_threads(move || { // FIXME: the `decode_into` methods only support fixed length data types. // For variable length data types, need a codepath with non `_into` methods. // Collect all the subsets and copy into value on the Python side? let update_chunk_subset = |item: chunk_item::WithSubset| { // See zarrs::array::Array::retrieve_chunk_subset_into - if item.chunk_subset.start().iter().all(|&o| o == 0) - && item.chunk_subset.shape() == item.representation().shape_u64() - { + if is_whole_chunk(&item) { // See zarrs::array::Array::retrieve_chunk_into if let Some(chunk_encoded) = self.stores.get(&item)? { // Decode the encoded data into the output buffer @@ -316,28 +342,14 @@ impl CodecPipelineImpl { } } } else { - let key = item.key().clone(); - let partial_decoder: PyResult> = match self - .partial_decoder_cache - .lock() - .map_py_err::()? - .entry(key) - { - Occupied(e) => Ok(e.get().clone()), - Vacant(e) => { - let input_handle = self.stores.decoder(&item)?; - let partial_decoder = self - .codec_chain - .clone() - .partial_decoder( - Arc::new(input_handle), - item.representation(), - &codec_options, - ) - .map_py_err::()?; - Ok(e.insert(partial_decoder).clone()) - } - }; + let key = item.key(); + let partial_decoder: PyResult<&Arc> = + match partial_decoder_cache.get(key) { + Some(e) => Ok(e), + None => Err(PyRuntimeError::new_err(format!( + "Partial decoder not found for key: {key}" + ))), + }; unsafe { // SAFETY: // - output is an array with output_shape elements of the item.representation data type, diff --git a/src/utils.rs b/src/utils.rs index 5855b54..b33b4b1 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -3,6 +3,8 @@ use std::fmt::Display; use numpy::{PyUntypedArray, PyUntypedArrayMethods}; use pyo3::{Bound, PyErr, PyResult, PyTypeInfo}; +use crate::{ChunksItem, WithSubset}; + pub(crate) trait PyErrExt { fn map_py_err(self) -> PyResult; } @@ -29,3 +31,8 @@ impl PyUntypedArrayExt for Bound<'_, PyUntypedArray> { }) } } + +pub fn is_whole_chunk(item: &WithSubset) -> bool { + item.chunk_subset.start().iter().all(|&o| o == 0) + && item.chunk_subset.shape() == item.representation().shape_u64() +} From 24478f74e5dc34d954096df0b3b259bd5d7b9045 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Wed, 9 Apr 2025 18:06:14 +0200 Subject: [PATCH 3/7] (refactor): only use one hashmap --- Cargo.toml | 1 + src/lib.rs | 17 ++++++++--------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 26c459f..94c33bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ opendal = { version = "0.51.0", features = ["services-http"] } tokio = { version = "1.41.1", features = ["rt-multi-thread"] } zarrs_opendal = "0.5.0" zarrs_metadata = "0.3.3" # require recent zarr-python compatibility fixes (remove with zarrs 0.20) +itertools = "0.9.0" [profile.release] lto = true diff --git a/src/lib.rs b/src/lib.rs index bb6af42..56803a8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ use std::ptr::NonNull; use std::sync::Arc; use chunk_item::WithSubset; +use itertools::Itertools; use numpy::npyffi::PyArrayObject; use numpy::{PyArrayDescrMethods, PyUntypedArray, PyUntypedArrayMethods}; use pyo3::exceptions::{PyRuntimeError, PyTypeError, PyValueError}; @@ -272,19 +273,17 @@ impl CodecPipelineImpl { }; // Assemble partial decoders ahead of time and in parallel - let mut item_map: HashMap = HashMap::new().into(); - chunk_descriptions + let partial_chunk_descriptions = chunk_descriptions .iter() + .unique_by(|item| item.key()) .filter(|item| !(is_whole_chunk(item))) - .for_each(|item| { - item_map.insert(item.key().clone(), item); - }); + .collect::>(); let mut partial_decoder_cache: HashMap> = HashMap::new().into(); - if item_map.len() > 0 { - let key_decoder_pairs = item_map + if partial_chunk_descriptions.len() > 0 { + let key_decoder_pairs = partial_chunk_descriptions .into_par_iter() - .map(|(key, item)| { + .map(|item| { let input_handle = self.stores.decoder(item)?; let partial_decoder = self .codec_chain @@ -295,7 +294,7 @@ impl CodecPipelineImpl { &codec_options, ) .map_py_err::()?; - Ok((key.clone(), partial_decoder)) + Ok((item.key().clone(), partial_decoder)) }) .collect::>>()?; partial_decoder_cache.extend(key_decoder_pairs); From 8fb36e9768c5686c030143ffea7afe5c7c92fb46 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Wed, 9 Apr 2025 18:08:12 +0200 Subject: [PATCH 4/7] (refactor): only do unique on filtered object --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 56803a8..af987ba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -275,8 +275,8 @@ impl CodecPipelineImpl { // Assemble partial decoders ahead of time and in parallel let partial_chunk_descriptions = chunk_descriptions .iter() - .unique_by(|item| item.key()) .filter(|item| !(is_whole_chunk(item))) + .unique_by(|item| item.key()) .collect::>(); let mut partial_decoder_cache: HashMap> = HashMap::new().into(); From 226ff735320139f8f80e0f38acb28085ba5a431a Mon Sep 17 00:00:00 2001 From: Ilan Gold Date: Thu, 10 Apr 2025 09:38:07 +0200 Subject: [PATCH 5/7] Update src/lib.rs Co-authored-by: Lachlan Deakin --- src/lib.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index af987ba..3fadf2f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -281,9 +281,11 @@ impl CodecPipelineImpl { let mut partial_decoder_cache: HashMap> = HashMap::new().into(); if partial_chunk_descriptions.len() > 0 { - let key_decoder_pairs = partial_chunk_descriptions - .into_par_iter() - .map(|item| { + let key_decoder_pairs = iter_concurrent_limit!( + chunk_concurrent_limit, + partial_chunk_descriptions, + map, + |item| { let input_handle = self.stores.decoder(item)?; let partial_decoder = self .codec_chain @@ -295,8 +297,9 @@ impl CodecPipelineImpl { ) .map_py_err::()?; Ok((item.key().clone(), partial_decoder)) - }) - .collect::>>()?; + } + ) + .collect::>>()?; partial_decoder_cache.extend(key_decoder_pairs); } From a40104d3b20882e986de27deb21c1aa9f75960e0 Mon Sep 17 00:00:00 2001 From: Ilan Gold Date: Thu, 10 Apr 2025 09:38:34 +0200 Subject: [PATCH 6/7] Update src/lib.rs Co-authored-by: Lachlan Deakin --- src/lib.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3fadf2f..0526674 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -345,13 +345,9 @@ impl CodecPipelineImpl { } } else { let key = item.key(); - let partial_decoder: PyResult<&Arc> = - match partial_decoder_cache.get(key) { - Some(e) => Ok(e), - None => Err(PyRuntimeError::new_err(format!( - "Partial decoder not found for key: {key}" - ))), - }; + let partial_decoder = partial_decoder_cache.get(key).ok_or_else(|| { + PyRuntimeError::new_err(format!("Partial decoder not found for key: {key}")) + })?; unsafe { // SAFETY: // - output is an array with output_shape elements of the item.representation data type, From c6af851d6cf3d47c4cd35c1c1bc3897ba82a5893 Mon Sep 17 00:00:00 2001 From: Ilan Gold Date: Thu, 10 Apr 2025 09:38:40 +0200 Subject: [PATCH 7/7] Update src/lib.rs Co-authored-by: Lachlan Deakin --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 0526674..a38be9b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -353,7 +353,7 @@ impl CodecPipelineImpl { // - output is an array with output_shape elements of the item.representation data type, // - item.subset is within the bounds of output_shape. // - item.chunk_subset has the same number of elements as item.subset. - partial_decoder?.partial_decode_into( + partial_decoder.partial_decode_into( &item.chunk_subset, &output, &output_shape,