Skip to content

Commit a297715

Browse files
committed
fix: array cache population
1 parent bd8d3c6 commit a297715

File tree

2 files changed

+28
-13
lines changed

2 files changed

+28
-13
lines changed

src/query/storages/common/cache/src/providers/table_data_cache.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ struct CacheItem {
3636
value: Bytes,
3737
}
3838

39-
#[derive(Clone)]
39+
#[derive(Clone, Eq, PartialEq, Hash)]
4040
pub struct TableDataCacheKey {
4141
cache_key: String,
4242
}

src/query/storages/fuse/src/io/read/block/parquet/mod.rs

+27-12
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ impl BlockReader {
9494
};
9595

9696
let mut blocks = Vec::with_capacity(record_batches.len());
97+
let mut array_cache_buffer = HashMap::with_capacity(record_batches.len());
9798

9899
let mut offset = 0;
99100
for record_batch in record_batches {
@@ -126,18 +127,17 @@ impl BlockReader {
126127
Some(DataItem::RawData(data)) => {
127128
// get the deserialized arrow array, which may be a nested array
128129
let arrow_array = column_by_name(&record_batch, &name_paths[i]);
129-
if !column_node.is_nested {
130-
if let Some(cache) = &array_cache {
131-
let meta = column_metas.get(&field.column_id).unwrap();
132-
let (offset, len) = meta.offset_length();
133-
let key = TableDataCacheKey::new(
134-
block_path,
135-
field.column_id,
136-
offset,
137-
len,
138-
);
139-
cache.insert(key.into(), (arrow_array.clone(), data.len()));
140-
}
130+
if !column_node.is_nested && array_cache.is_some() {
131+
let meta = column_metas.get(&field.column_id).unwrap();
132+
let (offset, len) = meta.offset_length();
133+
let key =
134+
TableDataCacheKey::new(block_path, field.column_id, offset, len);
135+
array_cache_buffer
136+
.entry(key)
137+
.and_modify(|v: &mut Vec<_>| {
138+
v.push((arrow_array.clone(), data.len()))
139+
})
140+
.or_insert(vec![(arrow_array.clone(), data.len())]);
141141
}
142142
Value::from_arrow_rs(arrow_array, &data_type)?
143143
}
@@ -160,6 +160,21 @@ impl BlockReader {
160160
blocks.push(DataBlock::new(columns, num_rows_record_batch));
161161
}
162162

163+
// TODO doc this
164+
if let Some(array_cache) = &array_cache {
165+
for (key, items) in array_cache_buffer {
166+
let mut arrays = Vec::with_capacity(items.len());
167+
let mut len = 0;
168+
for (array, size) in &items {
169+
arrays.push(array.as_ref());
170+
len += size;
171+
}
172+
use arrow::compute::concat;
173+
let result = concat(&arrays)?;
174+
array_cache.insert(key.into(), (result, len));
175+
}
176+
}
177+
163178
Ok(blocks)
164179
}
165180
}

0 commit comments

Comments
 (0)