diff --git a/src/query/service/src/pipelines/builders/builder_row_fetch.rs b/src/query/service/src/pipelines/builders/builder_row_fetch.rs index c96e00114bb9f..5b90720d5c9e6 100644 --- a/src/query/service/src/pipelines/builders/builder_row_fetch.rs +++ b/src/query/service/src/pipelines/builders/builder_row_fetch.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use databend_common_base::runtime::GlobalIORuntime; use databend_common_exception::Result; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -21,18 +24,24 @@ use databend_common_pipeline_transforms::processors::create_dummy_item; use databend_common_sql::executor::physical_plans::RowFetch; use databend_common_sql::executor::PhysicalPlan; use databend_common_storages_fuse::operations::row_fetch_processor; +use databend_common_storages_fuse::TableContext; +use tokio::sync::Semaphore; use crate::pipelines::PipelineBuilder; - impl PipelineBuilder { pub(crate) fn build_row_fetch(&mut self, row_fetch: &RowFetch) -> Result<()> { self.build_pipeline(&row_fetch.input)?; + let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + let row_fetch_runtime = GlobalIORuntime::instance(); + let row_fetch_semaphore = Arc::new(Semaphore::new(max_io_requests)); let processor = row_fetch_processor( self.ctx.clone(), row_fetch.row_id_col_offset, &row_fetch.source, row_fetch.cols_to_fetch.clone(), row_fetch.need_wrap_nullable, + row_fetch_semaphore, + row_fetch_runtime, )?; if !matches!(&*row_fetch.input, PhysicalPlan::MutationSplit(_)) { self.main_pipeline.add_transform(processor)?; diff --git a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs index 1632cbe4eaa84..ce98ca5b36d5c 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs @@ -15,6 +15,8 @@ use std::collections::HashSet; use std::sync::Arc; +use databend_common_base::base::tokio::sync::Semaphore; +use databend_common_base::runtime::Runtime; use databend_common_catalog::plan::split_row_id; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::Projection; @@ -49,6 +51,8 @@ pub fn row_fetch_processor( source: &DataSourcePlan, projection: Projection, need_wrap_nullable: bool, + semaphore: Arc, + runtime: Arc, ) -> Result { let table = ctx.build_table_from_source_plan(source)?; let fuse_table = table @@ -107,7 +111,8 @@ pub fn row_fetch_processor( projection.clone(), block_reader.clone(), read_settings, - max_threads, + semaphore.clone(), + runtime.clone(), ), need_wrap_nullable, ) @@ -122,7 +127,8 @@ pub fn row_fetch_processor( projection.clone(), block_reader.clone(), read_settings, - max_threads, + semaphore.clone(), + runtime.clone(), ), need_wrap_nullable, ) diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index 174decf0b4735..b19a8bc37abbd 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -16,7 +16,8 @@ use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; -use databend_common_base::runtime::execute_futures_in_parallel; +use databend_common_base::base::tokio::sync::Semaphore; +use databend_common_base::runtime::Runtime; use databend_common_catalog::plan::block_idx_in_segment; use databend_common_catalog::plan::split_prefix; use databend_common_catalog::plan::split_row_id; @@ -25,6 +26,7 @@ use databend_common_catalog::plan::Projection; use databend_common_catalog::table::Table; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::BlockRowIndex; use databend_common_expression::DataBlock; use databend_common_expression::TableSchemaRef; use databend_common_storage::ColumnNodes; @@ -32,6 +34,7 @@ use databend_storages_common_cache::LoadParams; use databend_storages_common_io::ReadSettings; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::TableSnapshot; +use futures_util::future; use itertools::Itertools; use super::fuse_rows_fetcher::RowsFetcher; @@ -54,8 +57,8 @@ pub(super) struct ParquetRowsFetcher { part_map: HashMap, segment_blocks_cache: HashMap>>, - // To control the parallelism of fetching blocks. - max_threads: usize, + semaphore: Arc, + runtime: Arc, } #[async_trait::async_trait] @@ -68,6 +71,7 @@ impl RowsFetcher for ParquetRowsFetcher { fn clear_cache(&mut self) { self.part_map.clear(); + self.segment_blocks_cache.clear(); } #[async_backtrace::framed] @@ -77,72 +81,57 @@ impl RowsFetcher for ParquetRowsFetcher { let num_rows = row_ids.len(); let mut part_set = HashSet::new(); let mut row_set = Vec::with_capacity(num_rows); + let mut block_row_indices = HashMap::new(); for row_id in row_ids { let (prefix, idx) = split_row_id(*row_id); part_set.insert(prefix); row_set.push((prefix, idx)); + block_row_indices + .entry(prefix) + .or_insert(Vec::new()) + .push((0u32, idx as u32, 1usize)); } // Read blocks in `prefix` order. let part_set = part_set.into_iter().sorted().collect::>(); - let idx_map = part_set + let mut idx_map = part_set .iter() .enumerate() - .map(|(i, p)| (*p, i)) + .map(|(i, p)| (*p, (i, 0))) .collect::>(); - // parts_per_thread = num_parts / max_threads - // remain = num_parts % max_threads - // task distribution: - // Part number of each task | Task number - // ------------------------------------------------------ - // parts_per_thread + 1 | remain - // parts_per_thread | max_threads - remain - let num_parts = part_set.len(); - let mut tasks = Vec::with_capacity(self.max_threads); - // Fetch blocks in parallel. - let part_size = num_parts / self.max_threads; - let remainder = num_parts % self.max_threads; - let mut begin = 0; - for i in 0..self.max_threads { - let end = if i < remainder { - begin + part_size + 1 - } else { - begin + part_size - }; - if begin == end { - break; - } - let parts = part_set[begin..end] - .iter() - .map(|idx| self.part_map[idx].clone()) - .collect::>(); - tasks.push(Self::fetch_blocks( + + let mut tasks = Vec::with_capacity(part_set.len()); + for part in &part_set { + tasks.push(Self::fetch_block( self.reader.clone(), - parts, + self.part_map[part].clone(), self.settings, + block_row_indices[part].clone(), )); - begin = end; } - let num_task = tasks.len(); - let blocks = execute_futures_in_parallel( - tasks, - num_task, - num_task * 2, - "parqeut rows fetch".to_string(), - ) - .await? - .into_iter() - .collect::>>()? - .into_iter() - .flatten() - .collect::>(); + let tasks = tasks.into_iter().map(|v| { + |permit| async { + let r = v.await; + drop(permit); + r + } + }); + let join_handlers = self + .runtime + .try_spawn_batch_with_owned_semaphore(self.semaphore.clone(), tasks) + .await?; + + let joint = future::try_join_all(join_handlers).await?; + let blocks = joint.into_iter().collect::>>()?; // Take result rows from blocks. let indices = row_set .iter() - .map(|(prefix, row_idx)| { - let block_idx = idx_map[prefix]; - (block_idx as u32, *row_idx as u32, 1_usize) + .map(|(prefix, _)| { + let (block_idx, row_idx_in_block) = idx_map.get_mut(prefix).unwrap(); + let row_idx = *row_idx_in_block; + *row_idx_in_block += 1; + (*block_idx as u32, row_idx as u32, 1_usize) }) .collect::>(); @@ -169,7 +158,8 @@ impl ParquetRowsFetcher { projection: Projection, reader: Arc, settings: ReadSettings, - max_threads: usize, + semaphore: Arc, + runtime: Arc, ) -> Self { let schema = table.schema(); let segment_reader = @@ -184,7 +174,8 @@ impl ParquetRowsFetcher { settings, part_map: HashMap::new(), segment_blocks_cache: HashMap::new(), - max_threads, + semaphore, + runtime, } } @@ -238,38 +229,31 @@ impl ParquetRowsFetcher { } #[async_backtrace::framed] - async fn fetch_blocks( + async fn fetch_block( reader: Arc, - parts: Vec, + part: PartInfoPtr, settings: ReadSettings, - ) -> Result> { - let mut chunks = Vec::with_capacity(parts.len()); - if BLOCKING_IO { - for part in parts.iter() { - let chunk = reader.sync_read_columns_data_by_merge_io(&settings, part, &None)?; - chunks.push(chunk); - } + block_row_indices: Vec, + ) -> Result { + let chunk = if BLOCKING_IO { + reader.sync_read_columns_data_by_merge_io(&settings, &part, &None)? } else { - for part in parts.iter() { - let part = FuseBlockPartInfo::from_part(part)?; - let chunk = reader - .read_columns_data_by_merge_io( - &settings, - &part.location, - &part.columns_meta, - &None, - ) - .await?; - chunks.push(chunk); - } - } - let fetched_blocks = chunks - .into_iter() - .zip(parts.iter()) - .map(|(chunk, part)| Self::build_block(&reader, part, chunk)) - .collect::>>()?; - - Ok(fetched_blocks) + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + reader + .read_columns_data_by_merge_io( + &settings, + &fuse_part.location, + &fuse_part.columns_meta, + &None, + ) + .await? + }; + let block = Self::build_block(&reader, &part, chunk)?; + Ok(DataBlock::take_blocks( + &[block], + &block_row_indices, + block_row_indices.len(), + )) } fn build_block(