Skip to content

Commit 313cef7

Browse files
committed
Revert "feat: drop unused rows as early as possible during row fetch"
This reverts commit d11cd2a.
1 parent b9923ff commit 313cef7

File tree

1 file changed

+20
-34
lines changed

1 file changed

+20
-34
lines changed

src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs

+20-34
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use databend_common_catalog::plan::Projection;
2525
use databend_common_catalog::table::Table;
2626
use databend_common_exception::ErrorCode;
2727
use databend_common_exception::Result;
28-
use databend_common_expression::BlockRowIndex;
2928
use databend_common_expression::DataBlock;
3029
use databend_common_expression::TableSchemaRef;
3130
use databend_common_storage::ColumnNodes;
@@ -79,23 +78,18 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
7978
let num_rows = row_ids.len();
8079
let mut part_set = HashSet::new();
8180
let mut row_set = Vec::with_capacity(num_rows);
82-
let mut block_row_indices = HashMap::new();
8381
for row_id in row_ids {
8482
let (prefix, idx) = split_row_id(*row_id);
8583
part_set.insert(prefix);
8684
row_set.push((prefix, idx));
87-
block_row_indices
88-
.entry(prefix)
89-
.or_insert(Vec::new())
90-
.push((0u32, idx as u32, 1usize));
9185
}
9286

9387
// Read blocks in `prefix` order.
9488
let part_set = part_set.into_iter().sorted().collect::<Vec<_>>();
95-
let mut idx_map = part_set
89+
let idx_map = part_set
9690
.iter()
9791
.enumerate()
98-
.map(|(i, p)| (*p, (i, 0)))
92+
.map(|(i, p)| (*p, i))
9993
.collect::<HashMap<_, _>>();
10094
// parts_per_thread = num_parts / max_threads
10195
// remain = num_parts % max_threads
@@ -123,15 +117,10 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
123117
.iter()
124118
.map(|idx| self.part_map[idx].clone())
125119
.collect::<Vec<_>>();
126-
let block_row_indices = part_set[begin..end]
127-
.iter()
128-
.map(|idx| block_row_indices.remove(idx).unwrap())
129-
.collect::<Vec<_>>();
130120
tasks.push(Self::fetch_blocks(
131121
self.reader.clone(),
132122
parts,
133123
self.settings,
134-
block_row_indices,
135124
));
136125
begin = end;
137126
}
@@ -152,11 +141,9 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
152141
// Take result rows from blocks.
153142
let indices = row_set
154143
.iter()
155-
.map(|(prefix, _)| {
156-
let (block_idx, row_idx_in_block) = idx_map.get_mut(prefix).unwrap();
157-
let row_idx = *row_idx_in_block;
158-
*row_idx_in_block += 1;
159-
(*block_idx as u32, row_idx as u32, 1_usize)
144+
.map(|(prefix, row_idx)| {
145+
let block_idx = idx_map[prefix];
146+
(block_idx as u32, *row_idx as u32, 1_usize)
160147
})
161148
.collect::<Vec<_>>();
162149

@@ -256,35 +243,34 @@ impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
256243
reader: Arc<BlockReader>,
257244
parts: Vec<PartInfoPtr>,
258245
settings: ReadSettings,
259-
block_row_indices: Vec<Vec<BlockRowIndex>>,
260246
) -> Result<Vec<DataBlock>> {
261-
let mut blocks = Vec::with_capacity(parts.len());
247+
let mut chunks = Vec::with_capacity(parts.len());
262248
if BLOCKING_IO {
263-
for (part, block_row_indices) in parts.iter().zip(block_row_indices.iter()) {
249+
for part in parts.iter() {
264250
let chunk = reader.sync_read_columns_data_by_merge_io(&settings, part, &None)?;
265-
let block = Self::build_block(&reader, part, chunk)?;
266-
let block =
267-
DataBlock::take_blocks(&[block], block_row_indices, block_row_indices.len());
268-
blocks.push(block);
251+
chunks.push(chunk);
269252
}
270253
} else {
271-
for (part, block_row_indices) in parts.iter().zip(block_row_indices.iter()) {
272-
let fuse_part = FuseBlockPartInfo::from_part(part)?;
254+
for part in parts.iter() {
255+
let part = FuseBlockPartInfo::from_part(part)?;
273256
let chunk = reader
274257
.read_columns_data_by_merge_io(
275258
&settings,
276-
&fuse_part.location,
277-
&fuse_part.columns_meta,
259+
&part.location,
260+
&part.columns_meta,
278261
&None,
279262
)
280263
.await?;
281-
let block = Self::build_block(&reader, part, chunk)?;
282-
let block =
283-
DataBlock::take_blocks(&[block], block_row_indices, block_row_indices.len());
284-
blocks.push(block);
264+
chunks.push(chunk);
285265
}
286266
}
287-
Ok(blocks)
267+
let fetched_blocks = chunks
268+
.into_iter()
269+
.zip(parts.iter())
270+
.map(|(chunk, part)| Self::build_block(&reader, part, chunk))
271+
.collect::<Result<Vec<_>>>()?;
272+
273+
Ok(fetched_blocks)
288274
}
289275

290276
fn build_block(

0 commit comments

Comments
 (0)