From d11cd2aabe9b42fda263d35b2ed2240b09a73c99 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Tue, 6 May 2025 23:04:35 +0800 Subject: [PATCH 1/9] feat: drop unused rows as early as possible during row fetch --- .../operations/read/parquet_rows_fetcher.rs | 54 ++++++++++++------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index 174decf0b4735..f81d5f3dbc431 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -25,6 +25,7 @@ use databend_common_catalog::plan::Projection; use databend_common_catalog::table::Table; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::BlockRowIndex; use databend_common_expression::DataBlock; use databend_common_expression::TableSchemaRef; use databend_common_storage::ColumnNodes; @@ -77,18 +78,23 @@ impl RowsFetcher for ParquetRowsFetcher { let num_rows = row_ids.len(); let mut part_set = HashSet::new(); let mut row_set = Vec::with_capacity(num_rows); + let mut block_row_indices = HashMap::new(); for row_id in row_ids { let (prefix, idx) = split_row_id(*row_id); part_set.insert(prefix); row_set.push((prefix, idx)); + block_row_indices + .entry(prefix) + .or_insert(Vec::new()) + .push((0u32, idx as u32, 1usize)); } // Read blocks in `prefix` order. let part_set = part_set.into_iter().sorted().collect::>(); - let idx_map = part_set + let mut idx_map = part_set .iter() .enumerate() - .map(|(i, p)| (*p, i)) + .map(|(i, p)| (*p, (i, 0))) .collect::>(); // parts_per_thread = num_parts / max_threads // remain = num_parts % max_threads @@ -116,10 +122,15 @@ impl RowsFetcher for ParquetRowsFetcher { .iter() .map(|idx| self.part_map[idx].clone()) .collect::>(); + let block_row_indices = part_set[begin..end] + .iter() + .map(|idx| block_row_indices.remove(idx).unwrap()) + .collect::>(); tasks.push(Self::fetch_blocks( self.reader.clone(), parts, self.settings, + block_row_indices, )); begin = end; } @@ -140,9 +151,11 @@ impl RowsFetcher for ParquetRowsFetcher { // Take result rows from blocks. let indices = row_set .iter() - .map(|(prefix, row_idx)| { - let block_idx = idx_map[prefix]; - (block_idx as u32, *row_idx as u32, 1_usize) + .map(|(prefix, _)| { + let (block_idx, row_idx_in_block) = idx_map.get_mut(prefix).unwrap(); + let row_idx = *row_idx_in_block; + *row_idx_in_block += 1; + (*block_idx as u32, row_idx as u32, 1_usize) }) .collect::>(); @@ -242,34 +255,35 @@ impl ParquetRowsFetcher { reader: Arc, parts: Vec, settings: ReadSettings, + block_row_indices: Vec>, ) -> Result> { - let mut chunks = Vec::with_capacity(parts.len()); + let mut blocks = Vec::with_capacity(parts.len()); if BLOCKING_IO { - for part in parts.iter() { + for (part, block_row_indices) in parts.iter().zip(block_row_indices.iter()) { let chunk = reader.sync_read_columns_data_by_merge_io(&settings, part, &None)?; - chunks.push(chunk); + let block = Self::build_block(&reader, part, chunk)?; + let block = + DataBlock::take_blocks(&[block], block_row_indices, block_row_indices.len()); + blocks.push(block); } } else { - for part in parts.iter() { - let part = FuseBlockPartInfo::from_part(part)?; + for (part, block_row_indices) in parts.iter().zip(block_row_indices.iter()) { + let fuse_part = FuseBlockPartInfo::from_part(part)?; let chunk = reader .read_columns_data_by_merge_io( &settings, - &part.location, - &part.columns_meta, + &fuse_part.location, + &fuse_part.columns_meta, &None, ) .await?; - chunks.push(chunk); + let block = Self::build_block(&reader, part, chunk)?; + let block = + DataBlock::take_blocks(&[block], block_row_indices, block_row_indices.len()); + blocks.push(block); } } - let fetched_blocks = chunks - .into_iter() - .zip(parts.iter()) - .map(|(chunk, part)| Self::build_block(&reader, part, chunk)) - .collect::>>()?; - - Ok(fetched_blocks) + Ok(blocks) } fn build_block( From 9cbd40003deec7fa181662d35cdbb1bb346478af Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Wed, 7 May 2025 10:54:38 +0800 Subject: [PATCH 2/9] clear cache --- .../storages/fuse/src/operations/read/parquet_rows_fetcher.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index f81d5f3dbc431..d19d0555f6ba1 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -69,6 +69,7 @@ impl RowsFetcher for ParquetRowsFetcher { fn clear_cache(&mut self) { self.part_map.clear(); + self.segment_blocks_cache.clear(); } #[async_backtrace::framed] From 367acb7c1f9e469000f4e8f7b14e27a8477a067e Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Wed, 7 May 2025 12:30:29 +0800 Subject: [PATCH 3/9] limit max fetch size --- src/query/storages/fuse/src/fuse_part.rs | 6 + .../operations/read/parquet_rows_fetcher.rs | 133 +++++++++++------- 2 files changed, 91 insertions(+), 48 deletions(-) diff --git a/src/query/storages/fuse/src/fuse_part.rs b/src/query/storages/fuse/src/fuse_part.rs index 702807ae63f13..3270af99fa663 100644 --- a/src/query/storages/fuse/src/fuse_part.rs +++ b/src/query/storages/fuse/src/fuse_part.rs @@ -121,6 +121,12 @@ impl FuseBlockPartInfo { .map(|meta| meta.page_size) .unwrap_or(self.nums_rows) } + + pub fn in_memory_size(&self) -> Option { + self.columns_stat + .as_ref() + .map(|stats| stats.iter().map(|(_, stat)| stat.in_memory_size).sum()) + } } /// Fuse table lazy partition information. diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index d19d0555f6ba1..5f94a604f87ba 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -97,58 +97,33 @@ impl RowsFetcher for ParquetRowsFetcher { .enumerate() .map(|(i, p)| (*p, (i, 0))) .collect::>(); - // parts_per_thread = num_parts / max_threads - // remain = num_parts % max_threads - // task distribution: - // Part number of each task | Task number - // ------------------------------------------------------ - // parts_per_thread + 1 | remain - // parts_per_thread | max_threads - remain - let num_parts = part_set.len(); - let mut tasks = Vec::with_capacity(self.max_threads); - // Fetch blocks in parallel. - let part_size = num_parts / self.max_threads; - let remainder = num_parts % self.max_threads; - let mut begin = 0; - for i in 0..self.max_threads { - let end = if i < remainder { - begin + part_size + 1 + + const MAX_FETCH_SIZE: u64 = 256 * 1024 * 1024; + let mut blocks = Vec::new(); + for batch in part_set.iter().batching(|it| { + let mut chunk = Vec::new(); + let mut fetch_size = 0; + for part in it { + let fuse_part = FuseBlockPartInfo::from_part(&self.part_map[part]).unwrap(); + let in_memory_size = fuse_part.in_memory_size().unwrap_or_default(); + fetch_size += in_memory_size; + chunk.push(*part); + if fetch_size > MAX_FETCH_SIZE { + return Some(chunk); + } + } + if chunk.is_empty() { + None } else { - begin + part_size - }; - if begin == end { - break; + Some(chunk) } - let parts = part_set[begin..end] - .iter() - .map(|idx| self.part_map[idx].clone()) - .collect::>(); - let block_row_indices = part_set[begin..end] - .iter() - .map(|idx| block_row_indices.remove(idx).unwrap()) - .collect::>(); - tasks.push(Self::fetch_blocks( - self.reader.clone(), - parts, - self.settings, - block_row_indices, - )); - begin = end; + }) { + let fetch_blocks = self + .fetch_blocks_in_parallel(&batch, &mut block_row_indices) + .await?; + blocks.extend(fetch_blocks); } - let num_task = tasks.len(); - let blocks = execute_futures_in_parallel( - tasks, - num_task, - num_task * 2, - "parqeut rows fetch".to_string(), - ) - .await? - .into_iter() - .collect::>>()? - .into_iter() - .flatten() - .collect::>(); // Take result rows from blocks. let indices = row_set .iter() @@ -287,6 +262,68 @@ impl ParquetRowsFetcher { Ok(blocks) } + #[async_backtrace::framed] + async fn fetch_blocks_in_parallel( + &self, + part_set: &[u64], + block_row_indices: &mut HashMap>, + ) -> Result> { + // parts_per_thread = num_parts / max_threads + // remain = num_parts % max_threads + // task distribution: + // Part number of each task | Task number + // ------------------------------------------------------ + // parts_per_thread + 1 | remain + // parts_per_thread | max_threads - remain + let num_parts = part_set.len(); + let mut tasks = Vec::with_capacity(self.max_threads); + // Fetch blocks in parallel. + let part_size = num_parts / self.max_threads; + let remainder = num_parts % self.max_threads; + let mut begin = 0; + for i in 0..self.max_threads { + let end = if i < remainder { + begin + part_size + 1 + } else { + begin + part_size + }; + if begin == end { + break; + } + let parts = part_set[begin..end] + .iter() + .map(|idx| self.part_map[idx].clone()) + .collect::>(); + let block_row_indices = part_set[begin..end] + .iter() + .map(|idx| block_row_indices.remove(idx).unwrap()) + .collect::>(); + tasks.push(Self::fetch_blocks( + self.reader.clone(), + parts, + self.settings, + block_row_indices, + )); + begin = end; + } + + let num_task = tasks.len(); + let blocks = execute_futures_in_parallel( + tasks, + num_task, + num_task * 2, + "parqeut rows fetch".to_string(), + ) + .await? + .into_iter() + .collect::>>()? + .into_iter() + .flatten() + .collect::>(); + + Ok(blocks) + } + fn build_block( reader: &BlockReader, part: &PartInfoPtr, From b9923ffa357577bad9d2518fba5bb47f3482b128 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Wed, 7 May 2025 16:21:15 +0800 Subject: [PATCH 4/9] Revert "limit max fetch size" This reverts commit 367acb7c1f9e469000f4e8f7b14e27a8477a067e. --- src/query/storages/fuse/src/fuse_part.rs | 6 - .../operations/read/parquet_rows_fetcher.rs | 133 +++++++----------- 2 files changed, 48 insertions(+), 91 deletions(-) diff --git a/src/query/storages/fuse/src/fuse_part.rs b/src/query/storages/fuse/src/fuse_part.rs index 3270af99fa663..702807ae63f13 100644 --- a/src/query/storages/fuse/src/fuse_part.rs +++ b/src/query/storages/fuse/src/fuse_part.rs @@ -121,12 +121,6 @@ impl FuseBlockPartInfo { .map(|meta| meta.page_size) .unwrap_or(self.nums_rows) } - - pub fn in_memory_size(&self) -> Option { - self.columns_stat - .as_ref() - .map(|stats| stats.iter().map(|(_, stat)| stat.in_memory_size).sum()) - } } /// Fuse table lazy partition information. diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index 5f94a604f87ba..d19d0555f6ba1 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -97,33 +97,58 @@ impl RowsFetcher for ParquetRowsFetcher { .enumerate() .map(|(i, p)| (*p, (i, 0))) .collect::>(); - - const MAX_FETCH_SIZE: u64 = 256 * 1024 * 1024; - let mut blocks = Vec::new(); - for batch in part_set.iter().batching(|it| { - let mut chunk = Vec::new(); - let mut fetch_size = 0; - for part in it { - let fuse_part = FuseBlockPartInfo::from_part(&self.part_map[part]).unwrap(); - let in_memory_size = fuse_part.in_memory_size().unwrap_or_default(); - fetch_size += in_memory_size; - chunk.push(*part); - if fetch_size > MAX_FETCH_SIZE { - return Some(chunk); - } - } - if chunk.is_empty() { - None + // parts_per_thread = num_parts / max_threads + // remain = num_parts % max_threads + // task distribution: + // Part number of each task | Task number + // ------------------------------------------------------ + // parts_per_thread + 1 | remain + // parts_per_thread | max_threads - remain + let num_parts = part_set.len(); + let mut tasks = Vec::with_capacity(self.max_threads); + // Fetch blocks in parallel. + let part_size = num_parts / self.max_threads; + let remainder = num_parts % self.max_threads; + let mut begin = 0; + for i in 0..self.max_threads { + let end = if i < remainder { + begin + part_size + 1 } else { - Some(chunk) + begin + part_size + }; + if begin == end { + break; } - }) { - let fetch_blocks = self - .fetch_blocks_in_parallel(&batch, &mut block_row_indices) - .await?; - blocks.extend(fetch_blocks); + let parts = part_set[begin..end] + .iter() + .map(|idx| self.part_map[idx].clone()) + .collect::>(); + let block_row_indices = part_set[begin..end] + .iter() + .map(|idx| block_row_indices.remove(idx).unwrap()) + .collect::>(); + tasks.push(Self::fetch_blocks( + self.reader.clone(), + parts, + self.settings, + block_row_indices, + )); + begin = end; } + let num_task = tasks.len(); + let blocks = execute_futures_in_parallel( + tasks, + num_task, + num_task * 2, + "parqeut rows fetch".to_string(), + ) + .await? + .into_iter() + .collect::>>()? + .into_iter() + .flatten() + .collect::>(); // Take result rows from blocks. let indices = row_set .iter() @@ -262,68 +287,6 @@ impl ParquetRowsFetcher { Ok(blocks) } - #[async_backtrace::framed] - async fn fetch_blocks_in_parallel( - &self, - part_set: &[u64], - block_row_indices: &mut HashMap>, - ) -> Result> { - // parts_per_thread = num_parts / max_threads - // remain = num_parts % max_threads - // task distribution: - // Part number of each task | Task number - // ------------------------------------------------------ - // parts_per_thread + 1 | remain - // parts_per_thread | max_threads - remain - let num_parts = part_set.len(); - let mut tasks = Vec::with_capacity(self.max_threads); - // Fetch blocks in parallel. - let part_size = num_parts / self.max_threads; - let remainder = num_parts % self.max_threads; - let mut begin = 0; - for i in 0..self.max_threads { - let end = if i < remainder { - begin + part_size + 1 - } else { - begin + part_size - }; - if begin == end { - break; - } - let parts = part_set[begin..end] - .iter() - .map(|idx| self.part_map[idx].clone()) - .collect::>(); - let block_row_indices = part_set[begin..end] - .iter() - .map(|idx| block_row_indices.remove(idx).unwrap()) - .collect::>(); - tasks.push(Self::fetch_blocks( - self.reader.clone(), - parts, - self.settings, - block_row_indices, - )); - begin = end; - } - - let num_task = tasks.len(); - let blocks = execute_futures_in_parallel( - tasks, - num_task, - num_task * 2, - "parqeut rows fetch".to_string(), - ) - .await? - .into_iter() - .collect::>>()? - .into_iter() - .flatten() - .collect::>(); - - Ok(blocks) - } - fn build_block( reader: &BlockReader, part: &PartInfoPtr, From 313cef7e8acb9823ef302f919ecf0ae0e18b3693 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Wed, 7 May 2025 16:21:43 +0800 Subject: [PATCH 5/9] Revert "feat: drop unused rows as early as possible during row fetch" This reverts commit d11cd2aabe9b42fda263d35b2ed2240b09a73c99. --- .../operations/read/parquet_rows_fetcher.rs | 54 +++++++------------ 1 file changed, 20 insertions(+), 34 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index d19d0555f6ba1..886bfa74a119d 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -25,7 +25,6 @@ use databend_common_catalog::plan::Projection; use databend_common_catalog::table::Table; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::BlockRowIndex; use databend_common_expression::DataBlock; use databend_common_expression::TableSchemaRef; use databend_common_storage::ColumnNodes; @@ -79,23 +78,18 @@ impl RowsFetcher for ParquetRowsFetcher { let num_rows = row_ids.len(); let mut part_set = HashSet::new(); let mut row_set = Vec::with_capacity(num_rows); - let mut block_row_indices = HashMap::new(); for row_id in row_ids { let (prefix, idx) = split_row_id(*row_id); part_set.insert(prefix); row_set.push((prefix, idx)); - block_row_indices - .entry(prefix) - .or_insert(Vec::new()) - .push((0u32, idx as u32, 1usize)); } // Read blocks in `prefix` order. let part_set = part_set.into_iter().sorted().collect::>(); - let mut idx_map = part_set + let idx_map = part_set .iter() .enumerate() - .map(|(i, p)| (*p, (i, 0))) + .map(|(i, p)| (*p, i)) .collect::>(); // parts_per_thread = num_parts / max_threads // remain = num_parts % max_threads @@ -123,15 +117,10 @@ impl RowsFetcher for ParquetRowsFetcher { .iter() .map(|idx| self.part_map[idx].clone()) .collect::>(); - let block_row_indices = part_set[begin..end] - .iter() - .map(|idx| block_row_indices.remove(idx).unwrap()) - .collect::>(); tasks.push(Self::fetch_blocks( self.reader.clone(), parts, self.settings, - block_row_indices, )); begin = end; } @@ -152,11 +141,9 @@ impl RowsFetcher for ParquetRowsFetcher { // Take result rows from blocks. let indices = row_set .iter() - .map(|(prefix, _)| { - let (block_idx, row_idx_in_block) = idx_map.get_mut(prefix).unwrap(); - let row_idx = *row_idx_in_block; - *row_idx_in_block += 1; - (*block_idx as u32, row_idx as u32, 1_usize) + .map(|(prefix, row_idx)| { + let block_idx = idx_map[prefix]; + (block_idx as u32, *row_idx as u32, 1_usize) }) .collect::>(); @@ -256,35 +243,34 @@ impl ParquetRowsFetcher { reader: Arc, parts: Vec, settings: ReadSettings, - block_row_indices: Vec>, ) -> Result> { - let mut blocks = Vec::with_capacity(parts.len()); + let mut chunks = Vec::with_capacity(parts.len()); if BLOCKING_IO { - for (part, block_row_indices) in parts.iter().zip(block_row_indices.iter()) { + for part in parts.iter() { let chunk = reader.sync_read_columns_data_by_merge_io(&settings, part, &None)?; - let block = Self::build_block(&reader, part, chunk)?; - let block = - DataBlock::take_blocks(&[block], block_row_indices, block_row_indices.len()); - blocks.push(block); + chunks.push(chunk); } } else { - for (part, block_row_indices) in parts.iter().zip(block_row_indices.iter()) { - let fuse_part = FuseBlockPartInfo::from_part(part)?; + for part in parts.iter() { + let part = FuseBlockPartInfo::from_part(part)?; let chunk = reader .read_columns_data_by_merge_io( &settings, - &fuse_part.location, - &fuse_part.columns_meta, + &part.location, + &part.columns_meta, &None, ) .await?; - let block = Self::build_block(&reader, part, chunk)?; - let block = - DataBlock::take_blocks(&[block], block_row_indices, block_row_indices.len()); - blocks.push(block); + chunks.push(chunk); } } - Ok(blocks) + let fetched_blocks = chunks + .into_iter() + .zip(parts.iter()) + .map(|(chunk, part)| Self::build_block(&reader, part, chunk)) + .collect::>>()?; + + Ok(fetched_blocks) } fn build_block( From 47a78cb396b2a2ab0bf877836b4fac18259ab9e0 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Wed, 7 May 2025 16:42:22 +0800 Subject: [PATCH 6/9] limit the concurrency of row fetch --- .../pipelines/builders/builder_row_fetch.rs | 15 ++++++- .../src/operations/read/fuse_rows_fetcher.rs | 8 ++++ .../operations/read/parquet_rows_fetcher.rs | 43 +++++++++++++------ 3 files changed, 51 insertions(+), 15 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_row_fetch.rs b/src/query/service/src/pipelines/builders/builder_row_fetch.rs index c96e00114bb9f..f3f3177901d99 100644 --- a/src/query/service/src/pipelines/builders/builder_row_fetch.rs +++ b/src/query/service/src/pipelines/builders/builder_row_fetch.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use databend_common_base::runtime::Runtime; use databend_common_exception::Result; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -21,18 +24,28 @@ use databend_common_pipeline_transforms::processors::create_dummy_item; use databend_common_sql::executor::physical_plans::RowFetch; use databend_common_sql::executor::PhysicalPlan; use databend_common_storages_fuse::operations::row_fetch_processor; +use databend_common_storages_fuse::TableContext; +use tokio::sync::Semaphore; use crate::pipelines::PipelineBuilder; - impl PipelineBuilder { pub(crate) fn build_row_fetch(&mut self, row_fetch: &RowFetch) -> Result<()> { self.build_pipeline(&row_fetch.input)?; + let max_threads = self.ctx.get_settings().get_max_threads()? as usize; + let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + let row_fetch_runtime = Arc::new(Runtime::with_worker_threads( + max_threads, + Some("row-fetch-worker".to_owned()), + )?); + let row_fetch_semaphore = Arc::new(Semaphore::new(max_io_requests)); let processor = row_fetch_processor( self.ctx.clone(), row_fetch.row_id_col_offset, &row_fetch.source, row_fetch.cols_to_fetch.clone(), row_fetch.need_wrap_nullable, + row_fetch_semaphore, + row_fetch_runtime, )?; if !matches!(&*row_fetch.input, PhysicalPlan::MutationSplit(_)) { self.main_pipeline.add_transform(processor)?; diff --git a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs index 1632cbe4eaa84..cc75db80e83d2 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs @@ -15,6 +15,8 @@ use std::collections::HashSet; use std::sync::Arc; +use databend_common_base::base::tokio::sync::Semaphore; +use databend_common_base::runtime::Runtime; use databend_common_catalog::plan::split_row_id; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::Projection; @@ -49,6 +51,8 @@ pub fn row_fetch_processor( source: &DataSourcePlan, projection: Projection, need_wrap_nullable: bool, + semaphore: Arc, + runtime: Arc, ) -> Result { let table = ctx.build_table_from_source_plan(source)?; let fuse_table = table @@ -108,6 +112,8 @@ pub fn row_fetch_processor( block_reader.clone(), read_settings, max_threads, + semaphore.clone(), + runtime.clone(), ), need_wrap_nullable, ) @@ -123,6 +129,8 @@ pub fn row_fetch_processor( block_reader.clone(), read_settings, max_threads, + semaphore.clone(), + runtime.clone(), ), need_wrap_nullable, ) diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index 886bfa74a119d..a59afc439a9fd 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -16,7 +16,8 @@ use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; -use databend_common_base::runtime::execute_futures_in_parallel; +use databend_common_base::base::tokio::sync::Semaphore; +use databend_common_base::runtime::Runtime; use databend_common_catalog::plan::block_idx_in_segment; use databend_common_catalog::plan::split_prefix; use databend_common_catalog::plan::split_row_id; @@ -32,6 +33,7 @@ use databend_storages_common_cache::LoadParams; use databend_storages_common_io::ReadSettings; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::TableSnapshot; +use futures_util::future; use itertools::Itertools; use super::fuse_rows_fetcher::RowsFetcher; @@ -56,6 +58,9 @@ pub(super) struct ParquetRowsFetcher { // To control the parallelism of fetching blocks. max_threads: usize, + + semaphore: Arc, + runtime: Arc, } #[async_trait::async_trait] @@ -125,19 +130,25 @@ impl RowsFetcher for ParquetRowsFetcher { begin = end; } - let num_task = tasks.len(); - let blocks = execute_futures_in_parallel( - tasks, - num_task, - num_task * 2, - "parqeut rows fetch".to_string(), - ) - .await? - .into_iter() - .collect::>>()? - .into_iter() - .flatten() - .collect::>(); + let tasks = tasks.into_iter().map(|v| { + |permit| async { + let r = v.await; + drop(permit); + r + } + }); + let join_handlers = self + .runtime + .try_spawn_batch_with_owned_semaphore(self.semaphore.clone(), tasks) + .await?; + + let joint = future::try_join_all(join_handlers).await?; + let blocks = joint + .into_iter() + .collect::>>()? + .into_iter() + .flatten() + .collect::>(); // Take result rows from blocks. let indices = row_set .iter() @@ -171,6 +182,8 @@ impl ParquetRowsFetcher { reader: Arc, settings: ReadSettings, max_threads: usize, + semaphore: Arc, + runtime: Arc, ) -> Self { let schema = table.schema(); let segment_reader = @@ -186,6 +199,8 @@ impl ParquetRowsFetcher { part_map: HashMap::new(), segment_blocks_cache: HashMap::new(), max_threads, + semaphore, + runtime, } } From c4c24b2408a38ff8b11bc17e23d794788a89c158 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 8 May 2025 15:18:48 +0800 Subject: [PATCH 7/9] Revert "Revert "feat: drop unused rows as early as possible during row fetch"" This reverts commit 313cef7e8acb9823ef302f919ecf0ae0e18b3693. --- .../operations/read/parquet_rows_fetcher.rs | 54 ++++++++++++------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index a59afc439a9fd..56eafb882c219 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -26,6 +26,7 @@ use databend_common_catalog::plan::Projection; use databend_common_catalog::table::Table; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::BlockRowIndex; use databend_common_expression::DataBlock; use databend_common_expression::TableSchemaRef; use databend_common_storage::ColumnNodes; @@ -83,18 +84,23 @@ impl RowsFetcher for ParquetRowsFetcher { let num_rows = row_ids.len(); let mut part_set = HashSet::new(); let mut row_set = Vec::with_capacity(num_rows); + let mut block_row_indices = HashMap::new(); for row_id in row_ids { let (prefix, idx) = split_row_id(*row_id); part_set.insert(prefix); row_set.push((prefix, idx)); + block_row_indices + .entry(prefix) + .or_insert(Vec::new()) + .push((0u32, idx as u32, 1usize)); } // Read blocks in `prefix` order. let part_set = part_set.into_iter().sorted().collect::>(); - let idx_map = part_set + let mut idx_map = part_set .iter() .enumerate() - .map(|(i, p)| (*p, i)) + .map(|(i, p)| (*p, (i, 0))) .collect::>(); // parts_per_thread = num_parts / max_threads // remain = num_parts % max_threads @@ -122,10 +128,15 @@ impl RowsFetcher for ParquetRowsFetcher { .iter() .map(|idx| self.part_map[idx].clone()) .collect::>(); + let block_row_indices = part_set[begin..end] + .iter() + .map(|idx| block_row_indices.remove(idx).unwrap()) + .collect::>(); tasks.push(Self::fetch_blocks( self.reader.clone(), parts, self.settings, + block_row_indices, )); begin = end; } @@ -152,9 +163,11 @@ impl RowsFetcher for ParquetRowsFetcher { // Take result rows from blocks. let indices = row_set .iter() - .map(|(prefix, row_idx)| { - let block_idx = idx_map[prefix]; - (block_idx as u32, *row_idx as u32, 1_usize) + .map(|(prefix, _)| { + let (block_idx, row_idx_in_block) = idx_map.get_mut(prefix).unwrap(); + let row_idx = *row_idx_in_block; + *row_idx_in_block += 1; + (*block_idx as u32, row_idx as u32, 1_usize) }) .collect::>(); @@ -258,34 +271,35 @@ impl ParquetRowsFetcher { reader: Arc, parts: Vec, settings: ReadSettings, + block_row_indices: Vec>, ) -> Result> { - let mut chunks = Vec::with_capacity(parts.len()); + let mut blocks = Vec::with_capacity(parts.len()); if BLOCKING_IO { - for part in parts.iter() { + for (part, block_row_indices) in parts.iter().zip(block_row_indices.iter()) { let chunk = reader.sync_read_columns_data_by_merge_io(&settings, part, &None)?; - chunks.push(chunk); + let block = Self::build_block(&reader, part, chunk)?; + let block = + DataBlock::take_blocks(&[block], block_row_indices, block_row_indices.len()); + blocks.push(block); } } else { - for part in parts.iter() { - let part = FuseBlockPartInfo::from_part(part)?; + for (part, block_row_indices) in parts.iter().zip(block_row_indices.iter()) { + let fuse_part = FuseBlockPartInfo::from_part(part)?; let chunk = reader .read_columns_data_by_merge_io( &settings, - &part.location, - &part.columns_meta, + &fuse_part.location, + &fuse_part.columns_meta, &None, ) .await?; - chunks.push(chunk); + let block = Self::build_block(&reader, part, chunk)?; + let block = + DataBlock::take_blocks(&[block], block_row_indices, block_row_indices.len()); + blocks.push(block); } } - let fetched_blocks = chunks - .into_iter() - .zip(parts.iter()) - .map(|(chunk, part)| Self::build_block(&reader, part, chunk)) - .collect::>>()?; - - Ok(fetched_blocks) + Ok(blocks) } fn build_block( From e5aa1aef30482ff10c8b5957e743aa069cdc410e Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 8 May 2025 15:45:33 +0800 Subject: [PATCH 8/9] simplify --- .../src/operations/read/fuse_rows_fetcher.rs | 2 - .../operations/read/parquet_rows_fetcher.rs | 104 +++++------------- 2 files changed, 29 insertions(+), 77 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs index cc75db80e83d2..ce98ca5b36d5c 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs @@ -111,7 +111,6 @@ pub fn row_fetch_processor( projection.clone(), block_reader.clone(), read_settings, - max_threads, semaphore.clone(), runtime.clone(), ), @@ -128,7 +127,6 @@ pub fn row_fetch_processor( projection.clone(), block_reader.clone(), read_settings, - max_threads, semaphore.clone(), runtime.clone(), ), diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index 56eafb882c219..b19a8bc37abbd 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -57,9 +57,6 @@ pub(super) struct ParquetRowsFetcher { part_map: HashMap, segment_blocks_cache: HashMap>>, - // To control the parallelism of fetching blocks. - max_threads: usize, - semaphore: Arc, runtime: Arc, } @@ -102,43 +99,15 @@ impl RowsFetcher for ParquetRowsFetcher { .enumerate() .map(|(i, p)| (*p, (i, 0))) .collect::>(); - // parts_per_thread = num_parts / max_threads - // remain = num_parts % max_threads - // task distribution: - // Part number of each task | Task number - // ------------------------------------------------------ - // parts_per_thread + 1 | remain - // parts_per_thread | max_threads - remain - let num_parts = part_set.len(); - let mut tasks = Vec::with_capacity(self.max_threads); - // Fetch blocks in parallel. - let part_size = num_parts / self.max_threads; - let remainder = num_parts % self.max_threads; - let mut begin = 0; - for i in 0..self.max_threads { - let end = if i < remainder { - begin + part_size + 1 - } else { - begin + part_size - }; - if begin == end { - break; - } - let parts = part_set[begin..end] - .iter() - .map(|idx| self.part_map[idx].clone()) - .collect::>(); - let block_row_indices = part_set[begin..end] - .iter() - .map(|idx| block_row_indices.remove(idx).unwrap()) - .collect::>(); - tasks.push(Self::fetch_blocks( + + let mut tasks = Vec::with_capacity(part_set.len()); + for part in &part_set { + tasks.push(Self::fetch_block( self.reader.clone(), - parts, + self.part_map[part].clone(), self.settings, - block_row_indices, + block_row_indices[part].clone(), )); - begin = end; } let tasks = tasks.into_iter().map(|v| { @@ -154,12 +123,7 @@ impl RowsFetcher for ParquetRowsFetcher { .await?; let joint = future::try_join_all(join_handlers).await?; - let blocks = joint - .into_iter() - .collect::>>()? - .into_iter() - .flatten() - .collect::>(); + let blocks = joint.into_iter().collect::>>()?; // Take result rows from blocks. let indices = row_set .iter() @@ -194,7 +158,6 @@ impl ParquetRowsFetcher { projection: Projection, reader: Arc, settings: ReadSettings, - max_threads: usize, semaphore: Arc, runtime: Arc, ) -> Self { @@ -211,7 +174,6 @@ impl ParquetRowsFetcher { settings, part_map: HashMap::new(), segment_blocks_cache: HashMap::new(), - max_threads, semaphore, runtime, } @@ -267,39 +229,31 @@ impl ParquetRowsFetcher { } #[async_backtrace::framed] - async fn fetch_blocks( + async fn fetch_block( reader: Arc, - parts: Vec, + part: PartInfoPtr, settings: ReadSettings, - block_row_indices: Vec>, - ) -> Result> { - let mut blocks = Vec::with_capacity(parts.len()); - if BLOCKING_IO { - for (part, block_row_indices) in parts.iter().zip(block_row_indices.iter()) { - let chunk = reader.sync_read_columns_data_by_merge_io(&settings, part, &None)?; - let block = Self::build_block(&reader, part, chunk)?; - let block = - DataBlock::take_blocks(&[block], block_row_indices, block_row_indices.len()); - blocks.push(block); - } + block_row_indices: Vec, + ) -> Result { + let chunk = if BLOCKING_IO { + reader.sync_read_columns_data_by_merge_io(&settings, &part, &None)? } else { - for (part, block_row_indices) in parts.iter().zip(block_row_indices.iter()) { - let fuse_part = FuseBlockPartInfo::from_part(part)?; - let chunk = reader - .read_columns_data_by_merge_io( - &settings, - &fuse_part.location, - &fuse_part.columns_meta, - &None, - ) - .await?; - let block = Self::build_block(&reader, part, chunk)?; - let block = - DataBlock::take_blocks(&[block], block_row_indices, block_row_indices.len()); - blocks.push(block); - } - } - Ok(blocks) + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + reader + .read_columns_data_by_merge_io( + &settings, + &fuse_part.location, + &fuse_part.columns_meta, + &None, + ) + .await? + }; + let block = Self::build_block(&reader, &part, chunk)?; + Ok(DataBlock::take_blocks( + &[block], + &block_row_indices, + block_row_indices.len(), + )) } fn build_block( From b9913811845c24eac3a9d51997364fad2aa73f6e Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Fri, 9 May 2025 10:00:55 +0800 Subject: [PATCH 9/9] use global io runtime --- .../service/src/pipelines/builders/builder_row_fetch.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_row_fetch.rs b/src/query/service/src/pipelines/builders/builder_row_fetch.rs index f3f3177901d99..5b90720d5c9e6 100644 --- a/src/query/service/src/pipelines/builders/builder_row_fetch.rs +++ b/src/query/service/src/pipelines/builders/builder_row_fetch.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use databend_common_base::runtime::Runtime; +use databend_common_base::runtime::GlobalIORuntime; use databend_common_exception::Result; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; @@ -31,12 +31,8 @@ use crate::pipelines::PipelineBuilder; impl PipelineBuilder { pub(crate) fn build_row_fetch(&mut self, row_fetch: &RowFetch) -> Result<()> { self.build_pipeline(&row_fetch.input)?; - let max_threads = self.ctx.get_settings().get_max_threads()? as usize; let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize; - let row_fetch_runtime = Arc::new(Runtime::with_worker_threads( - max_threads, - Some("row-fetch-worker".to_owned()), - )?); + let row_fetch_runtime = GlobalIORuntime::instance(); let row_fetch_semaphore = Arc::new(Semaphore::new(max_io_requests)); let processor = row_fetch_processor( self.ctx.clone(),