Skip to content

fix: avoid excessive concurrency in row fetch #17885

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 9, 2025
11 changes: 10 additions & 1 deletion src/query/service/src/pipelines/builders/builder_row_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_base::runtime::GlobalIORuntime;
use databend_common_exception::Result;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
Expand All @@ -21,18 +24,24 @@ use databend_common_pipeline_transforms::processors::create_dummy_item;
use databend_common_sql::executor::physical_plans::RowFetch;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_storages_fuse::operations::row_fetch_processor;
use databend_common_storages_fuse::TableContext;
use tokio::sync::Semaphore;

use crate::pipelines::PipelineBuilder;

impl PipelineBuilder {
pub(crate) fn build_row_fetch(&mut self, row_fetch: &RowFetch) -> Result<()> {
self.build_pipeline(&row_fetch.input)?;
let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
let row_fetch_runtime = GlobalIORuntime::instance();
let row_fetch_semaphore = Arc::new(Semaphore::new(max_io_requests));
let processor = row_fetch_processor(
self.ctx.clone(),
row_fetch.row_id_col_offset,
&row_fetch.source,
row_fetch.cols_to_fetch.clone(),
row_fetch.need_wrap_nullable,
row_fetch_semaphore,
row_fetch_runtime,
)?;
if !matches!(&*row_fetch.input, PhysicalPlan::MutationSplit(_)) {
self.main_pipeline.add_transform(processor)?;
Expand Down
10 changes: 8 additions & 2 deletions src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::collections::HashSet;
use std::sync::Arc;

use databend_common_base::base::tokio::sync::Semaphore;
use databend_common_base::runtime::Runtime;
use databend_common_catalog::plan::split_row_id;
use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::plan::Projection;
Expand Down Expand Up @@ -49,6 +51,8 @@ pub fn row_fetch_processor(
source: &DataSourcePlan,
projection: Projection,
need_wrap_nullable: bool,
semaphore: Arc<Semaphore>,
runtime: Arc<Runtime>,
) -> Result<RowFetcher> {
let table = ctx.build_table_from_source_plan(source)?;
let fuse_table = table
Expand Down Expand Up @@ -107,7 +111,8 @@ pub fn row_fetch_processor(
projection.clone(),
block_reader.clone(),
read_settings,
max_threads,
semaphore.clone(),
runtime.clone(),
),
need_wrap_nullable,
)
Expand All @@ -122,7 +127,8 @@ pub fn row_fetch_processor(
projection.clone(),
block_reader.clone(),
read_settings,
max_threads,
semaphore.clone(),
runtime.clone(),
),
need_wrap_nullable,
)
Expand Down
146 changes: 65 additions & 81 deletions src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use databend_common_base::runtime::execute_futures_in_parallel;
use databend_common_base::base::tokio::sync::Semaphore;
use databend_common_base::runtime::Runtime;
use databend_common_catalog::plan::block_idx_in_segment;
use databend_common_catalog::plan::split_prefix;
use databend_common_catalog::plan::split_row_id;
Expand All @@ -25,13 +26,15 @@ use databend_common_catalog::plan::Projection;
use databend_common_catalog::table::Table;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::BlockRowIndex;
use databend_common_expression::DataBlock;
use databend_common_expression::TableSchemaRef;
use databend_common_storage::ColumnNodes;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_io::ReadSettings;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::TableSnapshot;
use futures_util::future;
use itertools::Itertools;

use super::fuse_rows_fetcher::RowsFetcher;
Expand All @@ -54,8 +57,8 @@ pub(super) struct ParquetRowsFetcher<const BLOCKING_IO: bool> {
part_map: HashMap<u64, PartInfoPtr>,
segment_blocks_cache: HashMap<u64, Vec<Arc<BlockMeta>>>,

// To control the parallelism of fetching blocks.
max_threads: usize,
semaphore: Arc<Semaphore>,
runtime: Arc<Runtime>,
}

#[async_trait::async_trait]
Expand All @@ -68,6 +71,7 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {

fn clear_cache(&mut self) {
self.part_map.clear();
self.segment_blocks_cache.clear();
}

#[async_backtrace::framed]
Expand All @@ -77,72 +81,57 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
let num_rows = row_ids.len();
let mut part_set = HashSet::new();
let mut row_set = Vec::with_capacity(num_rows);
let mut block_row_indices = HashMap::new();
for row_id in row_ids {
let (prefix, idx) = split_row_id(*row_id);
part_set.insert(prefix);
row_set.push((prefix, idx));
block_row_indices
.entry(prefix)
.or_insert(Vec::new())
.push((0u32, idx as u32, 1usize));
}

// Read blocks in `prefix` order.
let part_set = part_set.into_iter().sorted().collect::<Vec<_>>();
let idx_map = part_set
let mut idx_map = part_set
.iter()
.enumerate()
.map(|(i, p)| (*p, i))
.map(|(i, p)| (*p, (i, 0)))
.collect::<HashMap<_, _>>();
// parts_per_thread = num_parts / max_threads
// remain = num_parts % max_threads
// task distribution:
// Part number of each task | Task number
// ------------------------------------------------------
// parts_per_thread + 1 | remain
// parts_per_thread | max_threads - remain
let num_parts = part_set.len();
let mut tasks = Vec::with_capacity(self.max_threads);
// Fetch blocks in parallel.
let part_size = num_parts / self.max_threads;
let remainder = num_parts % self.max_threads;
let mut begin = 0;
for i in 0..self.max_threads {
let end = if i < remainder {
begin + part_size + 1
} else {
begin + part_size
};
if begin == end {
break;
}
let parts = part_set[begin..end]
.iter()
.map(|idx| self.part_map[idx].clone())
.collect::<Vec<_>>();
tasks.push(Self::fetch_blocks(

let mut tasks = Vec::with_capacity(part_set.len());
for part in &part_set {
tasks.push(Self::fetch_block(
self.reader.clone(),
parts,
self.part_map[part].clone(),
self.settings,
block_row_indices[part].clone(),
));
begin = end;
}

let num_task = tasks.len();
let blocks = execute_futures_in_parallel(
tasks,
num_task,
num_task * 2,
"parqeut rows fetch".to_string(),
)
.await?
.into_iter()
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect::<Vec<_>>();
let tasks = tasks.into_iter().map(|v| {
|permit| async {
let r = v.await;
drop(permit);
r
}
});
let join_handlers = self
.runtime
.try_spawn_batch_with_owned_semaphore(self.semaphore.clone(), tasks)
.await?;

let joint = future::try_join_all(join_handlers).await?;
let blocks = joint.into_iter().collect::<Result<Vec<_>>>()?;
// Take result rows from blocks.
let indices = row_set
.iter()
.map(|(prefix, row_idx)| {
let block_idx = idx_map[prefix];
(block_idx as u32, *row_idx as u32, 1_usize)
.map(|(prefix, _)| {
let (block_idx, row_idx_in_block) = idx_map.get_mut(prefix).unwrap();
let row_idx = *row_idx_in_block;
*row_idx_in_block += 1;
(*block_idx as u32, row_idx as u32, 1_usize)
})
.collect::<Vec<_>>();

Expand All @@ -169,7 +158,8 @@ impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
projection: Projection,
reader: Arc<BlockReader>,
settings: ReadSettings,
max_threads: usize,
semaphore: Arc<Semaphore>,
runtime: Arc<Runtime>,
) -> Self {
let schema = table.schema();
let segment_reader =
Expand All @@ -184,7 +174,8 @@ impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
settings,
part_map: HashMap::new(),
segment_blocks_cache: HashMap::new(),
max_threads,
semaphore,
runtime,
}
}

Expand Down Expand Up @@ -238,38 +229,31 @@ impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
}

#[async_backtrace::framed]
async fn fetch_blocks(
async fn fetch_block(
reader: Arc<BlockReader>,
parts: Vec<PartInfoPtr>,
part: PartInfoPtr,
settings: ReadSettings,
) -> Result<Vec<DataBlock>> {
let mut chunks = Vec::with_capacity(parts.len());
if BLOCKING_IO {
for part in parts.iter() {
let chunk = reader.sync_read_columns_data_by_merge_io(&settings, part, &None)?;
chunks.push(chunk);
}
block_row_indices: Vec<BlockRowIndex>,
) -> Result<DataBlock> {
let chunk = if BLOCKING_IO {
reader.sync_read_columns_data_by_merge_io(&settings, &part, &None)?
} else {
for part in parts.iter() {
let part = FuseBlockPartInfo::from_part(part)?;
let chunk = reader
.read_columns_data_by_merge_io(
&settings,
&part.location,
&part.columns_meta,
&None,
)
.await?;
chunks.push(chunk);
}
}
let fetched_blocks = chunks
.into_iter()
.zip(parts.iter())
.map(|(chunk, part)| Self::build_block(&reader, part, chunk))
.collect::<Result<Vec<_>>>()?;

Ok(fetched_blocks)
let fuse_part = FuseBlockPartInfo::from_part(&part)?;
reader
.read_columns_data_by_merge_io(
&settings,
&fuse_part.location,
&fuse_part.columns_meta,
&None,
)
.await?
};
let block = Self::build_block(&reader, &part, chunk)?;
Ok(DataBlock::take_blocks(
&[block],
&block_row_indices,
block_row_indices.len(),
))
}

fn build_block(
Expand Down
Loading