Skip to content

Commit 47a78cb

Browse files
committed
limit the concurrency of row fetch
1 parent 313cef7 commit 47a78cb

File tree

3 files changed

+51
-15
lines changed

3 files changed

+51
-15
lines changed

src/query/service/src/pipelines/builders/builder_row_fetch.rs

+14-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
17+
use databend_common_base::runtime::Runtime;
1518
use databend_common_exception::Result;
1619
use databend_common_pipeline_core::processors::InputPort;
1720
use databend_common_pipeline_core::processors::OutputPort;
@@ -21,18 +24,28 @@ use databend_common_pipeline_transforms::processors::create_dummy_item;
2124
use databend_common_sql::executor::physical_plans::RowFetch;
2225
use databend_common_sql::executor::PhysicalPlan;
2326
use databend_common_storages_fuse::operations::row_fetch_processor;
27+
use databend_common_storages_fuse::TableContext;
28+
use tokio::sync::Semaphore;
2429

2530
use crate::pipelines::PipelineBuilder;
26-
2731
impl PipelineBuilder {
2832
pub(crate) fn build_row_fetch(&mut self, row_fetch: &RowFetch) -> Result<()> {
2933
self.build_pipeline(&row_fetch.input)?;
34+
let max_threads = self.ctx.get_settings().get_max_threads()? as usize;
35+
let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
36+
let row_fetch_runtime = Arc::new(Runtime::with_worker_threads(
37+
max_threads,
38+
Some("row-fetch-worker".to_owned()),
39+
)?);
40+
let row_fetch_semaphore = Arc::new(Semaphore::new(max_io_requests));
3041
let processor = row_fetch_processor(
3142
self.ctx.clone(),
3243
row_fetch.row_id_col_offset,
3344
&row_fetch.source,
3445
row_fetch.cols_to_fetch.clone(),
3546
row_fetch.need_wrap_nullable,
47+
row_fetch_semaphore,
48+
row_fetch_runtime,
3649
)?;
3750
if !matches!(&*row_fetch.input, PhysicalPlan::MutationSplit(_)) {
3851
self.main_pipeline.add_transform(processor)?;

src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs

+8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
use std::collections::HashSet;
1616
use std::sync::Arc;
1717

18+
use databend_common_base::base::tokio::sync::Semaphore;
19+
use databend_common_base::runtime::Runtime;
1820
use databend_common_catalog::plan::split_row_id;
1921
use databend_common_catalog::plan::DataSourcePlan;
2022
use databend_common_catalog::plan::Projection;
@@ -49,6 +51,8 @@ pub fn row_fetch_processor(
4951
source: &DataSourcePlan,
5052
projection: Projection,
5153
need_wrap_nullable: bool,
54+
semaphore: Arc<Semaphore>,
55+
runtime: Arc<Runtime>,
5256
) -> Result<RowFetcher> {
5357
let table = ctx.build_table_from_source_plan(source)?;
5458
let fuse_table = table
@@ -108,6 +112,8 @@ pub fn row_fetch_processor(
108112
block_reader.clone(),
109113
read_settings,
110114
max_threads,
115+
semaphore.clone(),
116+
runtime.clone(),
111117
),
112118
need_wrap_nullable,
113119
)
@@ -123,6 +129,8 @@ pub fn row_fetch_processor(
123129
block_reader.clone(),
124130
read_settings,
125131
max_threads,
132+
semaphore.clone(),
133+
runtime.clone(),
126134
),
127135
need_wrap_nullable,
128136
)

src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs

+29-14
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ use std::collections::HashMap;
1616
use std::collections::HashSet;
1717
use std::sync::Arc;
1818

19-
use databend_common_base::runtime::execute_futures_in_parallel;
19+
use databend_common_base::base::tokio::sync::Semaphore;
20+
use databend_common_base::runtime::Runtime;
2021
use databend_common_catalog::plan::block_idx_in_segment;
2122
use databend_common_catalog::plan::split_prefix;
2223
use databend_common_catalog::plan::split_row_id;
@@ -32,6 +33,7 @@ use databend_storages_common_cache::LoadParams;
3233
use databend_storages_common_io::ReadSettings;
3334
use databend_storages_common_table_meta::meta::BlockMeta;
3435
use databend_storages_common_table_meta::meta::TableSnapshot;
36+
use futures_util::future;
3537
use itertools::Itertools;
3638

3739
use super::fuse_rows_fetcher::RowsFetcher;
@@ -56,6 +58,9 @@ pub(super) struct ParquetRowsFetcher<const BLOCKING_IO: bool> {
5658

5759
// To control the parallelism of fetching blocks.
5860
max_threads: usize,
61+
62+
semaphore: Arc<Semaphore>,
63+
runtime: Arc<Runtime>,
5964
}
6065

6166
#[async_trait::async_trait]
@@ -125,19 +130,25 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
125130
begin = end;
126131
}
127132

128-
let num_task = tasks.len();
129-
let blocks = execute_futures_in_parallel(
130-
tasks,
131-
num_task,
132-
num_task * 2,
133-
"parqeut rows fetch".to_string(),
134-
)
135-
.await?
136-
.into_iter()
137-
.collect::<Result<Vec<_>>>()?
138-
.into_iter()
139-
.flatten()
140-
.collect::<Vec<_>>();
133+
let tasks = tasks.into_iter().map(|v| {
134+
|permit| async {
135+
let r = v.await;
136+
drop(permit);
137+
r
138+
}
139+
});
140+
let join_handlers = self
141+
.runtime
142+
.try_spawn_batch_with_owned_semaphore(self.semaphore.clone(), tasks)
143+
.await?;
144+
145+
let joint = future::try_join_all(join_handlers).await?;
146+
let blocks = joint
147+
.into_iter()
148+
.collect::<Result<Vec<_>>>()?
149+
.into_iter()
150+
.flatten()
151+
.collect::<Vec<_>>();
141152
// Take result rows from blocks.
142153
let indices = row_set
143154
.iter()
@@ -171,6 +182,8 @@ impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
171182
reader: Arc<BlockReader>,
172183
settings: ReadSettings,
173184
max_threads: usize,
185+
semaphore: Arc<Semaphore>,
186+
runtime: Arc<Runtime>,
174187
) -> Self {
175188
let schema = table.schema();
176189
let segment_reader =
@@ -186,6 +199,8 @@ impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
186199
part_map: HashMap::new(),
187200
segment_blocks_cache: HashMap::new(),
188201
max_threads,
202+
semaphore,
203+
runtime,
189204
}
190205
}
191206

0 commit comments

Comments
 (0)