Skip to content

Commit 8c25161

Browse files
committed
limit max fetch size
1 parent 9cbd400 commit 8c25161

File tree

2 files changed

+83
-50
lines changed

2 files changed

+83
-50
lines changed

src/query/storages/fuse/src/fuse_part.rs

+6
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ impl FuseBlockPartInfo {
121121
.map(|meta| meta.page_size)
122122
.unwrap_or(self.nums_rows)
123123
}
124+
125+
pub fn in_memory_size(&self) -> Option<u64> {
126+
self.columns_stat
127+
.as_ref()
128+
.map(|stats| stats.iter().map(|(_, stat)| stat.in_memory_size).sum())
129+
}
124130
}
125131

126132
/// Fuse table lazy partition information.

src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs

+77-50
Original file line numberDiff line numberDiff line change
@@ -97,58 +97,27 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
9797
.enumerate()
9898
.map(|(i, p)| (*p, (i, 0)))
9999
.collect::<HashMap<_, _>>();
100-
// parts_per_thread = num_parts / max_threads
101-
// remain = num_parts % max_threads
102-
// task distribution:
103-
// Part number of each task | Task number
104-
// ------------------------------------------------------
105-
// parts_per_thread + 1 | remain
106-
// parts_per_thread | max_threads - remain
107-
let num_parts = part_set.len();
108-
let mut tasks = Vec::with_capacity(self.max_threads);
109-
// Fetch blocks in parallel.
110-
let part_size = num_parts / self.max_threads;
111-
let remainder = num_parts % self.max_threads;
112-
let mut begin = 0;
113-
for i in 0..self.max_threads {
114-
let end = if i < remainder {
115-
begin + part_size + 1
116-
} else {
117-
begin + part_size
118-
};
119-
if begin == end {
120-
break;
100+
101+
const MAX_FETCH_SIZE: u64 = 256 * 1024 * 1024;
102+
let mut blocks = Vec::new();
103+
for batch in part_set.iter().batching(|it| {
104+
let mut chunk = Vec::new();
105+
let mut fetch_size = 0;
106+
while let Some(part) = it.next() {
107+
let fuse_part = FuseBlockPartInfo::from_part(&self.part_map[part]).unwrap();
108+
let in_memory_size = fuse_part.in_memory_size().unwrap_or_default();
109+
if fetch_size + in_memory_size > MAX_FETCH_SIZE && !chunk.is_empty() {
110+
return Some(chunk);
111+
}
112+
fetch_size += in_memory_size;
113+
chunk.push(*part);
121114
}
122-
let parts = part_set[begin..end]
123-
.iter()
124-
.map(|idx| self.part_map[idx].clone())
125-
.collect::<Vec<_>>();
126-
let block_row_indices = part_set[begin..end]
127-
.iter()
128-
.map(|idx| block_row_indices.remove(idx).unwrap())
129-
.collect::<Vec<_>>();
130-
tasks.push(Self::fetch_blocks(
131-
self.reader.clone(),
132-
parts,
133-
self.settings,
134-
block_row_indices,
135-
));
136-
begin = end;
115+
if chunk.is_empty() { None } else { Some(chunk) }
116+
}){
117+
let fetch_blocks = self.fetch_blocks_in_parallel(&batch, &mut block_row_indices).await?;
118+
blocks.extend(fetch_blocks);
137119
}
138-
139-
let num_task = tasks.len();
140-
let blocks = execute_futures_in_parallel(
141-
tasks,
142-
num_task,
143-
num_task * 2,
144-
"parqeut rows fetch".to_string(),
145-
)
146-
.await?
147-
.into_iter()
148-
.collect::<Result<Vec<_>>>()?
149-
.into_iter()
150-
.flatten()
151-
.collect::<Vec<_>>();
120+
152121
// Take result rows from blocks.
153122
let indices = row_set
154123
.iter()
@@ -287,6 +256,64 @@ impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
287256
Ok(blocks)
288257
}
289258

259+
#[async_backtrace::framed]
260+
async fn fetch_blocks_in_parallel(&self, part_set: &[u64],block_row_indices: &mut HashMap<u64,Vec<BlockRowIndex>>)->Result<Vec<DataBlock>>{
261+
// parts_per_thread = num_parts / max_threads
262+
// remain = num_parts % max_threads
263+
// task distribution:
264+
// Part number of each task | Task number
265+
// ------------------------------------------------------
266+
// parts_per_thread + 1 | remain
267+
// parts_per_thread | max_threads - remain
268+
let num_parts = part_set.len();
269+
let mut tasks = Vec::with_capacity(self.max_threads);
270+
// Fetch blocks in parallel.
271+
let part_size = num_parts / self.max_threads;
272+
let remainder = num_parts % self.max_threads;
273+
let mut begin = 0;
274+
for i in 0..self.max_threads {
275+
let end = if i < remainder {
276+
begin + part_size + 1
277+
} else {
278+
begin + part_size
279+
};
280+
if begin == end {
281+
break;
282+
}
283+
let parts = part_set[begin..end]
284+
.iter()
285+
.map(|idx| self.part_map[idx].clone())
286+
.collect::<Vec<_>>();
287+
let block_row_indices = part_set[begin..end]
288+
.iter()
289+
.map(|idx| block_row_indices.remove(idx).unwrap())
290+
.collect::<Vec<_>>();
291+
tasks.push(Self::fetch_blocks(
292+
self.reader.clone(),
293+
parts,
294+
self.settings,
295+
block_row_indices,
296+
));
297+
begin = end;
298+
}
299+
300+
let num_task = tasks.len();
301+
let blocks = execute_futures_in_parallel(
302+
tasks,
303+
num_task,
304+
num_task * 2,
305+
"parqeut rows fetch".to_string(),
306+
)
307+
.await?
308+
.into_iter()
309+
.collect::<Result<Vec<_>>>()?
310+
.into_iter()
311+
.flatten()
312+
.collect::<Vec<_>>();
313+
314+
Ok(blocks)
315+
}
316+
290317
fn build_block(
291318
reader: &BlockReader,
292319
part: &PartInfoPtr,

0 commit comments

Comments
 (0)