@@ -103,7 +103,7 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
103
103
for batch in part_set. iter ( ) . batching ( |it| {
104
104
let mut chunk = Vec :: new ( ) ;
105
105
let mut fetch_size = 0 ;
106
- while let Some ( part) = it. next ( ) {
106
+ for part in it {
107
107
let fuse_part = FuseBlockPartInfo :: from_part ( & self . part_map [ part] ) . unwrap ( ) ;
108
108
let in_memory_size = fuse_part. in_memory_size ( ) . unwrap_or_default ( ) ;
109
109
if fetch_size + in_memory_size > MAX_FETCH_SIZE && !chunk. is_empty ( ) {
@@ -112,12 +112,18 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
112
112
fetch_size += in_memory_size;
113
113
chunk. push ( * part) ;
114
114
}
115
- if chunk. is_empty ( ) { None } else { Some ( chunk) }
116
- } ) {
117
- let fetch_blocks = self . fetch_blocks_in_parallel ( & batch, & mut block_row_indices) . await ?;
115
+ if chunk. is_empty ( ) {
116
+ None
117
+ } else {
118
+ Some ( chunk)
119
+ }
120
+ } ) {
121
+ let fetch_blocks = self
122
+ . fetch_blocks_in_parallel ( & batch, & mut block_row_indices)
123
+ . await ?;
118
124
blocks. extend ( fetch_blocks) ;
119
125
}
120
-
126
+
121
127
// Take result rows from blocks.
122
128
let indices = row_set
123
129
. iter ( )
@@ -257,7 +263,11 @@ impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
257
263
}
258
264
259
265
#[ async_backtrace:: framed]
260
- async fn fetch_blocks_in_parallel ( & self , part_set : & [ u64 ] , block_row_indices : & mut HashMap < u64 , Vec < BlockRowIndex > > ) ->Result < Vec < DataBlock > > {
266
+ async fn fetch_blocks_in_parallel (
267
+ & self ,
268
+ part_set : & [ u64 ] ,
269
+ block_row_indices : & mut HashMap < u64 , Vec < BlockRowIndex > > ,
270
+ ) -> Result < Vec < DataBlock > > {
261
271
// parts_per_thread = num_parts / max_threads
262
272
// remain = num_parts % max_threads
263
273
// task distribution:
0 commit comments