Skip to content

Commit 4af7cf5

Browse files
committed
feat: new settings fuse_parquet_read_batch_size
Which controls the bach size during deserializing of fuse parquet data block. The default value of this setting is 8192.
1 parent ba2d1d3 commit 4af7cf5

10 files changed

+289
-164
lines changed

src/query/settings/src/settings_default.rs

+7
Original file line numberDiff line numberDiff line change
@@ -1242,6 +1242,13 @@ impl DefaultSettings {
12421242
scope: SettingScope::Both,
12431243
range: Some(SettingRange::Numeric(0..=1)),
12441244
}),
1245+
("fuse_parquet_read_batch_size", DefaultSettingValue {
1246+
value: UserSettingValue::UInt64(8192),
1247+
desc: "The batch size while deserializing fuse table with parquet storage format",
1248+
mode: SettingMode::Both,
1249+
scope: SettingScope::Both,
1250+
range: Some(SettingRange::Numeric(0..=1_000_000)),
1251+
}),
12451252
]);
12461253

12471254
Ok(Arc::new(DefaultSettings {

src/query/settings/src/settings_getter_setter.rs

+4
Original file line numberDiff line numberDiff line change
@@ -924,4 +924,8 @@ impl Settings {
924924
pub fn get_enable_use_vacuum2_to_purge_transient_table_data(&self) -> Result<bool> {
925925
Ok(self.try_get_u64("use_vacuum2_to_purge_transient_table_data")? == 1)
926926
}
927+
928+
pub fn get_fuse_parquet_read_batch_size(&self) -> Result<usize> {
929+
Ok(self.try_get_u64("fuse_parquet_read_batch_size")? as usize)
930+
}
927931
}

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl AggIndexReader {
9797
self.index_id
9898
}
9999

100-
pub(super) fn apply_agg_info(&self, block: DataBlock) -> Result<DataBlock> {
100+
pub(super) fn apply_agg_info_to_block(&self, block: DataBlock) -> Result<DataBlock> {
101101
let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS);
102102

103103
// 1. Filter the block if there is a filter.
@@ -145,4 +145,11 @@ impl AggIndexReader {
145145
)),
146146
))
147147
}
148+
149+
pub(super) fn apply_agg_info(&self, block: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
150+
block
151+
.into_iter()
152+
.map(|block| self.apply_agg_info_to_block(block))
153+
.collect::<Result<_>>()
154+
}
148155
}

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::sync::Arc;
16+
use std::vec;
1617

1718
use databend_common_exception::Result;
1819
use databend_common_expression::DataBlock;
@@ -138,7 +139,7 @@ impl AggIndexReader {
138139
}
139140
}
140141

141-
pub fn deserialize_native_data(&self, data: &mut NativeSourceData) -> Result<DataBlock> {
142+
pub fn deserialize_native_data(&self, data: &mut NativeSourceData) -> Result<Vec<DataBlock>> {
142143
let mut all_columns_arrays = vec![];
143144

144145
for (index, column_node) in self.reader.project_column_nodes.iter().enumerate() {
@@ -148,9 +149,9 @@ impl AggIndexReader {
148149
all_columns_arrays.push(arrays);
149150
}
150151
if all_columns_arrays.is_empty() {
151-
return Ok(DataBlock::empty_with_schema(Arc::new(
152+
return Ok(vec![DataBlock::empty_with_schema(Arc::new(
152153
self.reader.data_schema(),
153-
)));
154+
))]);
154155
}
155156
debug_assert!(all_columns_arrays
156157
.iter()
@@ -166,7 +167,6 @@ impl AggIndexReader {
166167
let block = DataBlock::new_from_columns(columns);
167168
blocks.push(block);
168169
}
169-
let block = DataBlock::concat(&blocks)?;
170-
self.apply_agg_info(block)
170+
self.apply_agg_info(blocks)
171171
}
172172
}

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,17 @@ impl AggIndexReader {
113113
&self,
114114
part: PartInfoPtr,
115115
data: BlockReadResult,
116-
) -> Result<DataBlock> {
116+
batch_size: usize,
117+
) -> Result<Vec<DataBlock>> {
117118
let columns_chunks = data.columns_chunks()?;
118119
let part = FuseBlockPartInfo::from_part(&part)?;
119-
let block = self.reader.deserialize_parquet_chunks(
120+
let block = self.reader.deserialize_parquet_to_blocks(
120121
part.nums_rows,
121122
&part.columns_meta,
122123
columns_chunks,
123124
&part.compression,
124125
&part.location,
126+
batch_size,
125127
)?;
126128

127129
self.apply_agg_info(block)

src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs

+12-6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use arrow_schema::Schema;
1919
use databend_common_expression::ColumnId;
2020
use databend_common_expression::TableSchema;
2121
use databend_storages_common_table_meta::meta::Compression;
22+
use itertools::Itertools;
2223
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
2324
use parquet::arrow::parquet_to_arrow_field_levels;
2425
use parquet::arrow::ArrowSchemaConverter;
@@ -34,7 +35,8 @@ pub fn column_chunks_to_record_batch(
3435
num_rows: usize,
3536
column_chunks: &HashMap<ColumnId, DataItem>,
3637
compression: &Compression,
37-
) -> databend_common_exception::Result<RecordBatch> {
38+
batch_size: usize,
39+
) -> databend_common_exception::Result<Vec<RecordBatch>> {
3840
let arrow_schema = Schema::from(original_schema);
3941
let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
4042

@@ -66,13 +68,17 @@ pub fn column_chunks_to_record_batch(
6668
ProjectionMask::leaves(&parquet_schema, projection_mask),
6769
Some(arrow_schema.fields()),
6870
)?;
69-
let mut record_reader = ParquetRecordBatchReader::try_new_with_row_groups(
71+
let record_reader = ParquetRecordBatchReader::try_new_with_row_groups(
7072
&field_levels,
7173
row_group.as_ref(),
72-
num_rows,
74+
batch_size,
7375
None,
7476
)?;
75-
let record = record_reader.next().unwrap()?;
76-
assert!(record_reader.next().is_none());
77-
Ok(record)
77+
78+
let records: Vec<_> = record_reader.try_collect()?;
79+
assert_eq!(
80+
num_rows,
81+
records.iter().map(|r| r.num_rows()).sum::<usize>()
82+
);
83+
Ok(records)
7884
}

src/query/storages/fuse/src/io/read/block/parquet/mod.rs

+90-52
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ mod adapter;
3535
mod deserialize;
3636

3737
pub use adapter::RowGroupImplBuilder;
38+
use databend_common_exception::Result;
3839
pub use deserialize::column_chunks_to_record_batch;
3940

4041
use crate::io::read::block::block_reader_merge_io::DataItem;
@@ -48,17 +49,41 @@ impl BlockReader {
4849
column_chunks: HashMap<ColumnId, DataItem>,
4950
compression: &Compression,
5051
block_path: &str,
51-
) -> databend_common_exception::Result<DataBlock> {
52+
) -> Result<DataBlock> {
53+
let mut blocks = self.deserialize_parquet_to_blocks(
54+
num_rows,
55+
column_metas,
56+
column_chunks,
57+
compression,
58+
block_path,
59+
num_rows,
60+
)?;
61+
// Defensive check: using `num_rows` as batch_size, expects only one block
62+
assert_eq!(blocks.len(), 1);
63+
Ok(blocks.pop().unwrap())
64+
}
65+
66+
pub(crate) fn deserialize_parquet_to_blocks(
67+
&self,
68+
num_rows: usize,
69+
column_metas: &HashMap<ColumnId, ColumnMeta>,
70+
column_chunks: HashMap<ColumnId, DataItem>,
71+
compression: &Compression,
72+
block_path: &str,
73+
batch_size: usize,
74+
) -> Result<Vec<DataBlock>> {
5275
if column_chunks.is_empty() {
53-
return self.build_default_values_block(num_rows);
76+
return Ok(vec![self.build_default_values_block(num_rows)?]);
5477
}
55-
let record_batch = column_chunks_to_record_batch(
78+
79+
let record_batches = column_chunks_to_record_batch(
5680
&self.original_schema,
5781
num_rows,
5882
&column_chunks,
5983
compression,
84+
batch_size,
6085
)?;
61-
let mut columns = Vec::with_capacity(self.projected_schema.fields.len());
86+
6287
let name_paths = column_name_paths(&self.projection, &self.original_schema);
6388

6489
let array_cache = if self.put_cache {
@@ -67,58 +92,71 @@ impl BlockReader {
6792
None
6893
};
6994

70-
for ((i, field), column_node) in self
71-
.projected_schema
72-
.fields
73-
.iter()
74-
.enumerate()
75-
.zip(self.project_column_nodes.iter())
76-
{
77-
let data_type = field.data_type().into();
78-
79-
// NOTE, there is something tricky here:
80-
// - `column_chunks` always contains data of leaf columns
81-
// - here we may processing a nested type field
82-
// - But, even if the field being processed is a field with multiple leaf columns
83-
// `column_chunks.get(&field.column_id)` will still return Some(DataItem::_)[^1],
84-
// even if we are getting data from `column_chunks` using a non-leaf
85-
// `column_id` of `projected_schema.fields`
86-
//
87-
// [^1]: Except in the current block, there is no data stored for the
88-
// corresponding field, and a default value has been declared for
89-
// the corresponding field.
90-
//
91-
// Yes, it is too obscure, we need to polish it later.
92-
93-
let value = match column_chunks.get(&field.column_id) {
94-
Some(DataItem::RawData(data)) => {
95-
// get the deserialized arrow array, which may be a nested array
96-
let arrow_array = column_by_name(&record_batch, &name_paths[i]);
97-
if !column_node.is_nested {
98-
if let Some(cache) = &array_cache {
99-
let meta = column_metas.get(&field.column_id).unwrap();
100-
let (offset, len) = meta.offset_length();
101-
let key =
102-
TableDataCacheKey::new(block_path, field.column_id, offset, len);
103-
cache.insert(key.into(), (arrow_array.clone(), data.len()));
95+
let mut blocks = Vec::with_capacity(record_batches.len());
96+
97+
for record_batch in record_batches {
98+
let num_rows_record_batch = record_batch.num_rows();
99+
let mut columns = Vec::with_capacity(self.projected_schema.fields.len());
100+
for ((i, field), column_node) in self
101+
.projected_schema
102+
.fields
103+
.iter()
104+
.enumerate()
105+
.zip(self.project_column_nodes.iter())
106+
{
107+
let data_type = field.data_type().into();
108+
109+
// NOTE, there is something tricky here:
110+
// - `column_chunks` always contains data of leaf columns
111+
// - here we may processing a nested type field
112+
// - But, even if the field being processed is a field with multiple leaf columns
113+
// `column_chunks.get(&field.column_id)` will still return Some(DataItem::_)[^1],
114+
// even if we are getting data from `column_chunks` using a non-leaf
115+
// `column_id` of `projected_schema.fields`
116+
//
117+
// [^1]: Except in the current block, there is no data stored for the
118+
// corresponding field, and a default value has been declared for
119+
// the corresponding field.
120+
//
121+
// Yes, it is too obscure, we need to polish it later.
122+
123+
let value = match column_chunks.get(&field.column_id) {
124+
Some(DataItem::RawData(data)) => {
125+
// get the deserialized arrow array, which may be a nested array
126+
let arrow_array = column_by_name(&record_batch, &name_paths[i]);
127+
if !column_node.is_nested {
128+
if let Some(cache) = &array_cache {
129+
let meta = column_metas.get(&field.column_id).unwrap();
130+
let (offset, len) = meta.offset_length();
131+
let key = TableDataCacheKey::new(
132+
block_path,
133+
field.column_id,
134+
offset,
135+
len,
136+
);
137+
cache.insert(key.into(), (arrow_array.clone(), data.len()));
138+
}
104139
}
140+
Value::from_arrow_rs(arrow_array, &data_type)?
105141
}
106-
Value::from_arrow_rs(arrow_array, &data_type)?
107-
}
108-
Some(DataItem::ColumnArray(cached)) => {
109-
if column_node.is_nested {
110-
// a defensive check, should never happen
111-
return Err(ErrorCode::StorageOther(
112-
"unexpected nested field: nested leaf field hits cached",
113-
));
142+
Some(DataItem::ColumnArray(cached)) => {
143+
// TODO this is NOT correct!
144+
if column_node.is_nested {
145+
// a defensive check, should never happen
146+
return Err(ErrorCode::StorageOther(
147+
"unexpected nested field: nested leaf field hits cached",
148+
));
149+
}
150+
Value::from_arrow_rs(cached.0.clone(), &data_type)?
114151
}
115-
Value::from_arrow_rs(cached.0.clone(), &data_type)?
116-
}
117-
None => Value::Scalar(self.default_vals[i].clone()),
118-
};
119-
columns.push(BlockEntry::new(data_type, value));
152+
None => Value::Scalar(self.default_vals[i].clone()),
153+
};
154+
columns.push(BlockEntry::new(data_type, value));
155+
}
156+
blocks.push(DataBlock::new(columns, num_rows_record_batch));
120157
}
121-
Ok(DataBlock::new(columns, num_rows))
158+
159+
Ok(blocks)
122160
}
123161
}
124162

0 commit comments

Comments
 (0)