Skip to content

Commit 367acb7

Browse files
committed
limit max fetch size
1 parent 9cbd400 commit 367acb7

File tree

2 files changed

+91
-48
lines changed

2 files changed

+91
-48
lines changed

src/query/storages/fuse/src/fuse_part.rs

+6
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ impl FuseBlockPartInfo {
121121
.map(|meta| meta.page_size)
122122
.unwrap_or(self.nums_rows)
123123
}
124+
125+
pub fn in_memory_size(&self) -> Option<u64> {
126+
self.columns_stat
127+
.as_ref()
128+
.map(|stats| stats.iter().map(|(_, stat)| stat.in_memory_size).sum())
129+
}
124130
}
125131

126132
/// Fuse table lazy partition information.

src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs

+85-48
Original file line numberDiff line numberDiff line change
@@ -97,58 +97,33 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
9797
.enumerate()
9898
.map(|(i, p)| (*p, (i, 0)))
9999
.collect::<HashMap<_, _>>();
100-
// parts_per_thread = num_parts / max_threads
101-
// remain = num_parts % max_threads
102-
// task distribution:
103-
// Part number of each task | Task number
104-
// ------------------------------------------------------
105-
// parts_per_thread + 1 | remain
106-
// parts_per_thread | max_threads - remain
107-
let num_parts = part_set.len();
108-
let mut tasks = Vec::with_capacity(self.max_threads);
109-
// Fetch blocks in parallel.
110-
let part_size = num_parts / self.max_threads;
111-
let remainder = num_parts % self.max_threads;
112-
let mut begin = 0;
113-
for i in 0..self.max_threads {
114-
let end = if i < remainder {
115-
begin + part_size + 1
100+
101+
const MAX_FETCH_SIZE: u64 = 256 * 1024 * 1024;
102+
let mut blocks = Vec::new();
103+
for batch in part_set.iter().batching(|it| {
104+
let mut chunk = Vec::new();
105+
let mut fetch_size = 0;
106+
for part in it {
107+
let fuse_part = FuseBlockPartInfo::from_part(&self.part_map[part]).unwrap();
108+
let in_memory_size = fuse_part.in_memory_size().unwrap_or_default();
109+
fetch_size += in_memory_size;
110+
chunk.push(*part);
111+
if fetch_size > MAX_FETCH_SIZE {
112+
return Some(chunk);
113+
}
114+
}
115+
if chunk.is_empty() {
116+
None
116117
} else {
117-
begin + part_size
118-
};
119-
if begin == end {
120-
break;
118+
Some(chunk)
121119
}
122-
let parts = part_set[begin..end]
123-
.iter()
124-
.map(|idx| self.part_map[idx].clone())
125-
.collect::<Vec<_>>();
126-
let block_row_indices = part_set[begin..end]
127-
.iter()
128-
.map(|idx| block_row_indices.remove(idx).unwrap())
129-
.collect::<Vec<_>>();
130-
tasks.push(Self::fetch_blocks(
131-
self.reader.clone(),
132-
parts,
133-
self.settings,
134-
block_row_indices,
135-
));
136-
begin = end;
120+
}) {
121+
let fetch_blocks = self
122+
.fetch_blocks_in_parallel(&batch, &mut block_row_indices)
123+
.await?;
124+
blocks.extend(fetch_blocks);
137125
}
138126

139-
let num_task = tasks.len();
140-
let blocks = execute_futures_in_parallel(
141-
tasks,
142-
num_task,
143-
num_task * 2,
144-
"parqeut rows fetch".to_string(),
145-
)
146-
.await?
147-
.into_iter()
148-
.collect::<Result<Vec<_>>>()?
149-
.into_iter()
150-
.flatten()
151-
.collect::<Vec<_>>();
152127
// Take result rows from blocks.
153128
let indices = row_set
154129
.iter()
@@ -287,6 +262,68 @@ impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
287262
Ok(blocks)
288263
}
289264

265+
#[async_backtrace::framed]
266+
async fn fetch_blocks_in_parallel(
267+
&self,
268+
part_set: &[u64],
269+
block_row_indices: &mut HashMap<u64, Vec<BlockRowIndex>>,
270+
) -> Result<Vec<DataBlock>> {
271+
// parts_per_thread = num_parts / max_threads
272+
// remain = num_parts % max_threads
273+
// task distribution:
274+
// Part number of each task | Task number
275+
// ------------------------------------------------------
276+
// parts_per_thread + 1 | remain
277+
// parts_per_thread | max_threads - remain
278+
let num_parts = part_set.len();
279+
let mut tasks = Vec::with_capacity(self.max_threads);
280+
// Fetch blocks in parallel.
281+
let part_size = num_parts / self.max_threads;
282+
let remainder = num_parts % self.max_threads;
283+
let mut begin = 0;
284+
for i in 0..self.max_threads {
285+
let end = if i < remainder {
286+
begin + part_size + 1
287+
} else {
288+
begin + part_size
289+
};
290+
if begin == end {
291+
break;
292+
}
293+
let parts = part_set[begin..end]
294+
.iter()
295+
.map(|idx| self.part_map[idx].clone())
296+
.collect::<Vec<_>>();
297+
let block_row_indices = part_set[begin..end]
298+
.iter()
299+
.map(|idx| block_row_indices.remove(idx).unwrap())
300+
.collect::<Vec<_>>();
301+
tasks.push(Self::fetch_blocks(
302+
self.reader.clone(),
303+
parts,
304+
self.settings,
305+
block_row_indices,
306+
));
307+
begin = end;
308+
}
309+
310+
let num_task = tasks.len();
311+
let blocks = execute_futures_in_parallel(
312+
tasks,
313+
num_task,
314+
num_task * 2,
315+
"parqeut rows fetch".to_string(),
316+
)
317+
.await?
318+
.into_iter()
319+
.collect::<Result<Vec<_>>>()?
320+
.into_iter()
321+
.flatten()
322+
.collect::<Vec<_>>();
323+
324+
Ok(blocks)
325+
}
326+
290327
fn build_block(
291328
reader: &BlockReader,
292329
part: &PartInfoPtr,

0 commit comments

Comments
 (0)