From d32bc162dd24b98143190e6a4ff3b28a568774ed Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 1 Apr 2025 11:36:34 +0800 Subject: [PATCH 01/11] feat: new settings `fuse_parquet_read_batch_size` Which controls the bach size during deserializing of fuse parquet data block. The default value of this setting is 8192. --- src/query/settings/src/settings_default.rs | 7 + .../settings/src/settings_getter_setter.rs | 4 + .../src/io/read/agg_index/agg_index_reader.rs | 9 +- .../read/agg_index/agg_index_reader_native.rs | 10 +- .../agg_index/agg_index_reader_parquet.rs | 6 +- .../src/io/read/block/parquet/deserialize.rs | 18 +- .../fuse/src/io/read/block/parquet/mod.rs | 142 ++++++++++------ .../virtual_column_reader_parquet.rs | 3 + .../read/native_data_source_deserializer.rs | 12 +- .../read/parquet_data_source_deserializer.rs | 158 ++++++++++-------- 10 files changed, 229 insertions(+), 140 deletions(-) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index f70a446a63a52..f4dc4f24e08ea 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1264,6 +1264,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("fuse_parquet_read_batch_size", DefaultSettingValue { + value: UserSettingValue::UInt64(8192), + desc: "The batch size while deserializing fuse table with parquet storage format", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=1_000_000)), + }), ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 7deb0fd812d9f..c405f0a3e3008 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -932,4 +932,8 @@ impl Settings { pub fn get_enable_block_stream_write(&self) -> Result { Ok(self.try_get_u64("enable_block_stream_write")? == 1) } + + pub fn get_fuse_parquet_read_batch_size(&self) -> Result { + Ok(self.try_get_u64("fuse_parquet_read_batch_size")? as usize) + } } diff --git a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs index 124b0d4478493..3d7503781184c 100644 --- a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs +++ b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader.rs @@ -97,7 +97,7 @@ impl AggIndexReader { self.index_id } - pub(super) fn apply_agg_info(&self, block: DataBlock) -> Result { + pub(super) fn apply_agg_info_to_block(&self, block: DataBlock) -> Result { let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS); // 1. Filter the block if there is a filter. @@ -145,4 +145,11 @@ impl AggIndexReader { )), )) } + + pub(super) fn apply_agg_info(&self, block: Vec) -> Result> { + block + .into_iter() + .map(|block| self.apply_agg_info_to_block(block)) + .collect::>() + } } diff --git a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs index 8a76a72e7242d..cfe8d4f615a35 100644 --- a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs +++ b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::vec; use databend_common_exception::Result; use databend_common_expression::DataBlock; @@ -138,7 +139,7 @@ impl AggIndexReader { } } - pub fn deserialize_native_data(&self, data: &mut NativeSourceData) -> Result { + pub fn deserialize_native_data(&self, data: &mut NativeSourceData) -> Result> { let mut all_columns_arrays = vec![]; for (index, column_node) in self.reader.project_column_nodes.iter().enumerate() { @@ -148,9 +149,9 @@ impl AggIndexReader { all_columns_arrays.push(arrays); } if all_columns_arrays.is_empty() { - return Ok(DataBlock::empty_with_schema(Arc::new( + return Ok(vec![DataBlock::empty_with_schema(Arc::new( self.reader.data_schema(), - ))); + ))]); } debug_assert!(all_columns_arrays .iter() @@ -166,7 +167,6 @@ impl AggIndexReader { let block = DataBlock::new_from_columns(columns); blocks.push(block); } - let block = DataBlock::concat(&blocks)?; - self.apply_agg_info(block) + self.apply_agg_info(blocks) } } diff --git a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs index 9a7d5da04fe13..e8a087cdef5e3 100644 --- a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs +++ b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs @@ -113,15 +113,17 @@ impl AggIndexReader { &self, part: PartInfoPtr, data: BlockReadResult, - ) -> Result { + batch_size: usize, + ) -> Result> { let columns_chunks = data.columns_chunks()?; let part = FuseBlockPartInfo::from_part(&part)?; - let block = self.reader.deserialize_parquet_chunks( + let block = self.reader.deserialize_parquet_to_blocks( part.nums_rows, &part.columns_meta, columns_chunks, &part.compression, &part.location, + batch_size, )?; self.apply_agg_info(block) diff --git a/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs b/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs index 22907c2d03210..c78c22c627cb7 100644 --- a/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs +++ b/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs @@ -19,6 +19,7 @@ use arrow_schema::Schema; use databend_common_expression::ColumnId; use databend_common_expression::TableSchema; use databend_storages_common_table_meta::meta::Compression; +use itertools::Itertools; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use parquet::arrow::parquet_to_arrow_field_levels; use parquet::arrow::ArrowSchemaConverter; @@ -34,7 +35,8 @@ pub fn column_chunks_to_record_batch( num_rows: usize, column_chunks: &HashMap, compression: &Compression, -) -> databend_common_exception::Result { + batch_size: usize, +) -> databend_common_exception::Result> { let arrow_schema = Schema::from(original_schema); let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?; @@ -66,13 +68,17 @@ pub fn column_chunks_to_record_batch( ProjectionMask::leaves(&parquet_schema, projection_mask), Some(arrow_schema.fields()), )?; - let mut record_reader = ParquetRecordBatchReader::try_new_with_row_groups( + let record_reader = ParquetRecordBatchReader::try_new_with_row_groups( &field_levels, row_group.as_ref(), - num_rows, + batch_size, None, )?; - let record = record_reader.next().unwrap()?; - assert!(record_reader.next().is_none()); - Ok(record) + + let records: Vec<_> = record_reader.try_collect()?; + assert_eq!( + num_rows, + records.iter().map(|r| r.num_rows()).sum::() + ); + Ok(records) } diff --git a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs index bd7ec4a71dbaa..6588a3a275e0a 100644 --- a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs +++ b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs @@ -35,6 +35,7 @@ mod adapter; mod deserialize; pub use adapter::RowGroupImplBuilder; +use databend_common_exception::Result; pub use deserialize::column_chunks_to_record_batch; use crate::io::read::block::block_reader_merge_io::DataItem; @@ -48,17 +49,41 @@ impl BlockReader { column_chunks: HashMap, compression: &Compression, block_path: &str, - ) -> databend_common_exception::Result { + ) -> Result { + let mut blocks = self.deserialize_parquet_to_blocks( + num_rows, + column_metas, + column_chunks, + compression, + block_path, + num_rows, + )?; + // Defensive check: using `num_rows` as batch_size, expects only one block + assert_eq!(blocks.len(), 1); + Ok(blocks.pop().unwrap()) + } + + pub(crate) fn deserialize_parquet_to_blocks( + &self, + num_rows: usize, + column_metas: &HashMap, + column_chunks: HashMap, + compression: &Compression, + block_path: &str, + batch_size: usize, + ) -> Result> { if column_chunks.is_empty() { - return self.build_default_values_block(num_rows); + return Ok(vec![self.build_default_values_block(num_rows)?]); } - let record_batch = column_chunks_to_record_batch( + + let record_batches = column_chunks_to_record_batch( &self.original_schema, num_rows, &column_chunks, compression, + batch_size, )?; - let mut columns = Vec::with_capacity(self.projected_schema.fields.len()); + let name_paths = column_name_paths(&self.projection, &self.original_schema); let array_cache = if self.put_cache { @@ -67,58 +92,71 @@ impl BlockReader { None }; - for ((i, field), column_node) in self - .projected_schema - .fields - .iter() - .enumerate() - .zip(self.project_column_nodes.iter()) - { - let data_type = field.data_type().into(); - - // NOTE, there is something tricky here: - // - `column_chunks` always contains data of leaf columns - // - here we may processing a nested type field - // - But, even if the field being processed is a field with multiple leaf columns - // `column_chunks.get(&field.column_id)` will still return Some(DataItem::_)[^1], - // even if we are getting data from `column_chunks` using a non-leaf - // `column_id` of `projected_schema.fields` - // - // [^1]: Except in the current block, there is no data stored for the - // corresponding field, and a default value has been declared for - // the corresponding field. - // - // Yes, it is too obscure, we need to polish it later. - - let value = match column_chunks.get(&field.column_id) { - Some(DataItem::RawData(data)) => { - // get the deserialized arrow array, which may be a nested array - let arrow_array = column_by_name(&record_batch, &name_paths[i]); - if !column_node.is_nested { - if let Some(cache) = &array_cache { - let meta = column_metas.get(&field.column_id).unwrap(); - let (offset, len) = meta.offset_length(); - let key = - TableDataCacheKey::new(block_path, field.column_id, offset, len); - cache.insert(key.into(), (arrow_array.clone(), data.len())); + let mut blocks = Vec::with_capacity(record_batches.len()); + + for record_batch in record_batches { + let num_rows_record_batch = record_batch.num_rows(); + let mut columns = Vec::with_capacity(self.projected_schema.fields.len()); + for ((i, field), column_node) in self + .projected_schema + .fields + .iter() + .enumerate() + .zip(self.project_column_nodes.iter()) + { + let data_type = field.data_type().into(); + + // NOTE, there is something tricky here: + // - `column_chunks` always contains data of leaf columns + // - here we may processing a nested type field + // - But, even if the field being processed is a field with multiple leaf columns + // `column_chunks.get(&field.column_id)` will still return Some(DataItem::_)[^1], + // even if we are getting data from `column_chunks` using a non-leaf + // `column_id` of `projected_schema.fields` + // + // [^1]: Except in the current block, there is no data stored for the + // corresponding field, and a default value has been declared for + // the corresponding field. + // + // Yes, it is too obscure, we need to polish it later. + + let value = match column_chunks.get(&field.column_id) { + Some(DataItem::RawData(data)) => { + // get the deserialized arrow array, which may be a nested array + let arrow_array = column_by_name(&record_batch, &name_paths[i]); + if !column_node.is_nested { + if let Some(cache) = &array_cache { + let meta = column_metas.get(&field.column_id).unwrap(); + let (offset, len) = meta.offset_length(); + let key = TableDataCacheKey::new( + block_path, + field.column_id, + offset, + len, + ); + cache.insert(key.into(), (arrow_array.clone(), data.len())); + } } + Value::from_arrow_rs(arrow_array, &data_type)? } - Value::from_arrow_rs(arrow_array, &data_type)? - } - Some(DataItem::ColumnArray(cached)) => { - if column_node.is_nested { - // a defensive check, should never happen - return Err(ErrorCode::StorageOther( - "unexpected nested field: nested leaf field hits cached", - )); + Some(DataItem::ColumnArray(cached)) => { + // TODO this is NOT correct! + if column_node.is_nested { + // a defensive check, should never happen + return Err(ErrorCode::StorageOther( + "unexpected nested field: nested leaf field hits cached", + )); + } + Value::from_arrow_rs(cached.0.clone(), &data_type)? } - Value::from_arrow_rs(cached.0.clone(), &data_type)? - } - None => Value::Scalar(self.default_vals[i].clone()), - }; - columns.push(BlockEntry::new(data_type, value)); + None => Value::Scalar(self.default_vals[i].clone()), + }; + columns.push(BlockEntry::new(data_type, value)); + } + blocks.push(DataBlock::new(columns, num_rows_record_batch)); } - Ok(DataBlock::new(columns, num_rows)) + + Ok(blocks) } } diff --git a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs index 1eb508ad38297..bf262f4bfc1b9 100644 --- a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs +++ b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs @@ -15,6 +15,8 @@ use std::collections::HashSet; use std::sync::Arc; +use arrow_array::RecordBatch; +use databend_common_catalog::plan::VirtualColumnField; use databend_common_exception::Result; use databend_common_expression::eval_function; use databend_common_expression::types::DataType; @@ -22,6 +24,7 @@ use databend_common_expression::BlockEntry; use databend_common_expression::Column; use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; +use databend_common_expression::FunctionContext; use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRef; use databend_common_expression::Value; diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs index d4f525c5a8981..c4ece46437cb6 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs @@ -191,7 +191,7 @@ pub struct NativeDeserializeDataTransform { // Structures for driving the pipeline: input: Arc, output: Arc, - output_data: Option, + output_data: Vec, parts: VecDeque, columns: VecDeque, scan_progress: Arc, @@ -309,7 +309,7 @@ impl NativeDeserializeDataTransform { block_reader, input, output, - output_data: None, + output_data: vec![], parts: VecDeque::new(), columns: VecDeque::new(), prewhere_columns, @@ -353,7 +353,7 @@ impl NativeDeserializeDataTransform { }; self.scan_progress.incr(&progress_values); Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, data_block.memory_size()); - self.output_data = Some(data_block); + self.output_data = vec![data_block]; } /// Check if can skip the whole block by default values. @@ -846,7 +846,7 @@ impl Processor for NativeDeserializeDataTransform { return Ok(Event::NeedConsume); } - if let Some(data_block) = self.output_data.take() { + if let Some(data_block) = self.output_data.pop() { self.output.push_data(Ok(data_block)); return Ok(Event::NeedConsume); } @@ -891,8 +891,8 @@ impl Processor for NativeDeserializeDataTransform { let columns = match columns { NativeDataSource::AggIndex(data) => { let agg_index_reader = self.index_reader.as_ref().as_ref().unwrap(); - let block = agg_index_reader.deserialize_native_data(data)?; - self.output_data = Some(block); + let blocks = agg_index_reader.deserialize_native_data(data)?; + self.output_data = blocks; self.finish_partition(); return Ok(()); } diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs index 1add3998f2b4c..e0218adb6ce13 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs @@ -64,7 +64,7 @@ pub struct DeserializeDataTransform { input: Arc, output: Arc, - output_data: Option, + output_data: Vec, src_schema: DataSchema, output_schema: DataSchema, parts: Vec, @@ -79,6 +79,8 @@ pub struct DeserializeDataTransform { need_reserve_block_info: bool, need_wait_runtime_filter: bool, runtime_filter_ready: Option>, + + batch_size: usize, } unsafe impl Send for DeserializeDataTransform {} @@ -114,6 +116,8 @@ impl DeserializeDataTransform { output_schema.remove_internal_fields(); let output_schema: DataSchema = (&output_schema).into(); let (need_reserve_block_info, _) = need_reserve_block_info(ctx.clone(), plan.table_index); + + let batch_size = ctx.get_settings().get_fuse_parquet_read_batch_size()? as usize; Ok(ProcessorPtr::create(Box::new(DeserializeDataTransform { ctx, table_index: plan.table_index, @@ -122,7 +126,7 @@ impl DeserializeDataTransform { block_reader, input, output, - output_data: None, + output_data: vec![], src_schema, output_schema, parts: vec![], @@ -134,10 +138,11 @@ impl DeserializeDataTransform { need_reserve_block_info, need_wait_runtime_filter, runtime_filter_ready: None, + batch_size, }))) } - fn runtime_filter(&mut self, data_block: DataBlock) -> Result> { + fn runtime_filter(&mut self, data_block: &DataBlock) -> Result> { // Check if already cached runtime filters if self.cached_runtime_filter.is_none() { let bloom_filters = self.ctx.get_bloom_runtime_filter_with_id(self.table_index); @@ -222,7 +227,7 @@ impl Processor for DeserializeDataTransform { return Ok(Event::NeedConsume); } - if let Some(data_block) = self.output_data.take() { + if let Some(data_block) = self.output_data.pop() { self.output.push_data(Ok(data_block)); return Ok(Event::NeedConsume); } @@ -264,48 +269,39 @@ impl Processor for DeserializeDataTransform { match read_res { ParquetDataSource::AggIndex((actual_part, data)) => { let agg_index_reader = self.index_reader.as_ref().as_ref().unwrap(); - let block = agg_index_reader.deserialize_parquet_data(actual_part, data)?; - - let progress_values = ProgressValues { - rows: block.num_rows(), - bytes: block.memory_size(), - }; - self.scan_progress.incr(&progress_values); - Profile::record_usize_profile( - ProfileStatisticsName::ScanBytes, - block.memory_size(), - ); - - self.output_data = Some(block); + let blocks = agg_index_reader.deserialize_parquet_data( + actual_part, + data, + self.batch_size, + )?; + + self.update_scan_metrics(blocks.as_slice()); + + self.output_data = blocks; } + ParquetDataSource::Normal((data, virtual_data)) => { let start = Instant::now(); let columns_chunks = data.columns_chunks()?; let part = FuseBlockPartInfo::from_part(&part)?; - let mut data_block = self.block_reader.deserialize_parquet_chunks( + let data_blocks = self.block_reader.deserialize_parquet_to_blocks( part.nums_rows, &part.columns_meta, columns_chunks, &part.compression, &part.location, + self.batch_size, )?; - let origin_num_rows = data_block.num_rows(); - - let mut filter = None; - if self.ctx.has_bloom_runtime_filters(self.table_index) { - if let Some(bitmap) = self.runtime_filter(data_block.clone())? { - data_block = data_block.filter_with_bitmap(&bitmap)?; - filter = Some(bitmap); - } - } - - // Add optional virtual columns - if let Some(virtual_reader) = self.virtual_reader.as_ref() { - data_block = virtual_reader - .deserialize_virtual_columns(data_block.clone(), virtual_data)?; - } + let mut virtual_columns_paster = + if let Some(virtual_column_reader) = self.virtual_reader.as_ref() { + let record_batches = virtual_column_reader + .try_create_paster(virtual_data, self.batch_size)?; + Some(record_batches) + } else { + None + }; // Perf. { @@ -314,42 +310,53 @@ impl Processor for DeserializeDataTransform { ); } - let progress_values = ProgressValues { - rows: data_block.num_rows(), - bytes: data_block.memory_size(), - }; - self.scan_progress.incr(&progress_values); - Profile::record_usize_profile( - ProfileStatisticsName::ScanBytes, - data_block.memory_size(), - ); - - let mut data_block = - data_block.resort(&self.src_schema, &self.output_schema)?; - - // Fill `BlockMetaIndex` as `DataBlock.meta` if query internal columns, - // `TransformAddInternalColumns` will generate internal columns using `BlockMetaIndex` in next pipeline. - let offsets = if self.block_reader.query_internal_columns() { - filter.as_ref().map(|bitmap| { - (0..origin_num_rows) - .filter(|i| unsafe { bitmap.get_bit_unchecked(*i) }) - .collect() - }) - } else { - None - }; - - data_block = add_data_block_meta( - data_block, - part, - offsets, - self.base_block_ids.clone(), - self.block_reader.update_stream_columns(), - self.block_reader.query_internal_columns(), - self.need_reserve_block_info, - )?; + self.update_scan_metrics(data_blocks.as_slice()); - self.output_data = Some(data_block); + let mut output_blocks = Vec::with_capacity(data_blocks.len()); + for mut data_block in data_blocks { + let origin_num_rows = data_block.num_rows(); + + let mut filter = None; + if self.ctx.has_bloom_runtime_filters(self.table_index) { + if let Some(bitmap) = self.runtime_filter(&data_block)? { + data_block = data_block.filter_with_bitmap(&bitmap)?; + filter = Some(bitmap); + } + } + + // Add optional virtual columns + if let Some(virtual_columns_paster) = &mut virtual_columns_paster { + data_block = virtual_columns_paster.paste_virtual_column(data_block)?; + } + + let mut data_block = + data_block.resort(&self.src_schema, &self.output_schema)?; + + // Fill `BlockMetaIndex` as `DataBlock.meta` if query internal columns, + // `TransformAddInternalColumns` will generate internal columns using `BlockMetaIndex` in next pipeline. + let offsets = if self.block_reader.query_internal_columns() { + filter.as_ref().map(|bitmap| { + (0..origin_num_rows) + .filter(|i| unsafe { bitmap.get_bit_unchecked(*i) }) + .collect() + }) + } else { + None + }; + + data_block = add_data_block_meta( + data_block, + part, + offsets, + self.base_block_ids.clone(), + self.block_reader.update_stream_columns(), + self.block_reader.query_internal_columns(), + self.need_reserve_block_info, + )?; + output_blocks.push(data_block); + } + + self.output_data = output_blocks; } } } @@ -370,3 +377,18 @@ impl Processor for DeserializeDataTransform { Ok(()) } } + +impl DeserializeDataTransform { + fn update_scan_metrics(&self, blocks: &[DataBlock]) { + let (num_rows, memory_size) = blocks.iter().fold((0, 0), |(rows, size), block| { + (block.num_rows() + rows, block.memory_size() + size) + }); + + let progress_values = ProgressValues { + rows: num_rows, + bytes: memory_size, + }; + self.scan_progress.incr(&progress_values); + Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, memory_size); + } +} From 210274d364b957e11448a13e2bf866d63b574eb1 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 1 Apr 2025 18:36:33 +0800 Subject: [PATCH 02/11] fix clippy --- src/query/storages/fuse/src/io/read/block/parquet/mod.rs | 2 +- .../src/operations/read/parquet_data_source_deserializer.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs index 6588a3a275e0a..42ee5e4bd94a6 100644 --- a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs +++ b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs @@ -118,7 +118,7 @@ impl BlockReader { // corresponding field, and a default value has been declared for // the corresponding field. // - // Yes, it is too obscure, we need to polish it later. + // Yes, it is too obscure, we need to polish it SOON. let value = match column_chunks.get(&field.column_id) { Some(DataItem::RawData(data)) => { diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs index e0218adb6ce13..c0a5e9a6cd96b 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs @@ -117,7 +117,7 @@ impl DeserializeDataTransform { let output_schema: DataSchema = (&output_schema).into(); let (need_reserve_block_info, _) = need_reserve_block_info(ctx.clone(), plan.table_index); - let batch_size = ctx.get_settings().get_fuse_parquet_read_batch_size()? as usize; + let batch_size = ctx.get_settings().get_fuse_parquet_read_batch_size()?; Ok(ProcessorPtr::create(Box::new(DeserializeDataTransform { ctx, table_index: plan.table_index, From 1701de7180bf5cfd98112d23061205f5af4b5646 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 1 Apr 2025 23:42:54 +0800 Subject: [PATCH 03/11] refactor: Enables the new setting for Query only --- .../read/agg_index/agg_index_reader_native.rs | 9 +- .../agg_index/agg_index_reader_parquet.rs | 8 +- .../src/io/read/block/parquet/deserialize.rs | 4 +- .../fuse/src/io/read/block/parquet/mod.rs | 6 +- .../virtual_column_reader_parquet.rs | 215 ++++++++++++++++-- .../merge_into/mutator/matched_mutator.rs | 4 +- .../read/native_data_source_deserializer.rs | 12 +- .../read/parquet_data_source_deserializer.rs | 18 +- 8 files changed, 233 insertions(+), 43 deletions(-) diff --git a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs index cfe8d4f615a35..fb44116705a03 100644 --- a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs +++ b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs @@ -139,7 +139,7 @@ impl AggIndexReader { } } - pub fn deserialize_native_data(&self, data: &mut NativeSourceData) -> Result> { + pub fn deserialize_native_data(&self, data: &mut NativeSourceData) -> Result { let mut all_columns_arrays = vec![]; for (index, column_node) in self.reader.project_column_nodes.iter().enumerate() { @@ -149,9 +149,9 @@ impl AggIndexReader { all_columns_arrays.push(arrays); } if all_columns_arrays.is_empty() { - return Ok(vec![DataBlock::empty_with_schema(Arc::new( + return Ok(DataBlock::empty_with_schema(Arc::new( self.reader.data_schema(), - ))]); + ))); } debug_assert!(all_columns_arrays .iter() @@ -167,6 +167,7 @@ impl AggIndexReader { let block = DataBlock::new_from_columns(columns); blocks.push(block); } - self.apply_agg_info(blocks) + let block = DataBlock::concat(&blocks)?; + self.apply_agg_info_to_block(block) } } diff --git a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs index e8a087cdef5e3..926d831ba88e1 100644 --- a/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs +++ b/src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_parquet.rs @@ -113,19 +113,19 @@ impl AggIndexReader { &self, part: PartInfoPtr, data: BlockReadResult, - batch_size: usize, + batch_size_hint: Option, ) -> Result> { let columns_chunks = data.columns_chunks()?; let part = FuseBlockPartInfo::from_part(&part)?; - let block = self.reader.deserialize_parquet_to_blocks( + let blocks = self.reader.deserialize_parquet_to_blocks( part.nums_rows, &part.columns_meta, columns_chunks, &part.compression, &part.location, - batch_size, + batch_size_hint, )?; - self.apply_agg_info(block) + self.apply_agg_info(blocks) } } diff --git a/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs b/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs index c78c22c627cb7..c5c2f4744c50c 100644 --- a/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs +++ b/src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs @@ -35,7 +35,7 @@ pub fn column_chunks_to_record_batch( num_rows: usize, column_chunks: &HashMap, compression: &Compression, - batch_size: usize, + batch_size: Option, ) -> databend_common_exception::Result> { let arrow_schema = Schema::from(original_schema); let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?; @@ -68,6 +68,8 @@ pub fn column_chunks_to_record_batch( ProjectionMask::leaves(&parquet_schema, projection_mask), Some(arrow_schema.fields()), )?; + + let batch_size = batch_size.unwrap_or(num_rows); let record_reader = ParquetRecordBatchReader::try_new_with_row_groups( &field_levels, row_group.as_ref(), diff --git a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs index 42ee5e4bd94a6..3cf27bc9f88cf 100644 --- a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs +++ b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs @@ -56,7 +56,7 @@ impl BlockReader { column_chunks, compression, block_path, - num_rows, + None, )?; // Defensive check: using `num_rows` as batch_size, expects only one block assert_eq!(blocks.len(), 1); @@ -70,7 +70,7 @@ impl BlockReader { column_chunks: HashMap, compression: &Compression, block_path: &str, - batch_size: usize, + batch_size_hint: Option, ) -> Result> { if column_chunks.is_empty() { return Ok(vec![self.build_default_values_block(num_rows)?]); @@ -81,7 +81,7 @@ impl BlockReader { num_rows, &column_chunks, compression, - batch_size, + batch_size_hint, )?; let name_paths = column_name_paths(&self.projection, &self.original_schema); diff --git a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs index bf262f4bfc1b9..ba9a5877c9e52 100644 --- a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs +++ b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs @@ -149,37 +149,171 @@ impl VirtualColumnReader { )) } - pub fn deserialize_virtual_columns( + // pub fn deserialize_virtual_columns( + // &self, + // mut data_block: DataBlock, + // virtual_data: Option, + //) -> Result { + // let orig_schema = virtual_data + // .as_ref() + // .map(|virtual_data| virtual_data.schema.clone()) + // .unwrap_or_default(); + // let record_batch = virtual_data + // .map(|virtual_data| { + // let columns_chunks = virtual_data.data.columns_chunks()?; + // column_chunks_to_record_batch( + // &virtual_data.schema, + // virtual_data.num_rows, + // &columns_chunks, + // &virtual_data.compression, + // ) + // }) + // .transpose()?; + + // // If the virtual column has already generated, add it directly, + // // otherwise extract it from the source column + // let func_ctx = self.ctx.get_function_context()?; + // for virtual_column_field in self.virtual_column_info.virtual_column_fields.iter() { + // let name = format!("{}", virtual_column_field.column_id); + // if let Some(arrow_array) = record_batch + // .as_ref() + // .and_then(|r| r.column_by_name(&name).cloned()) + // { + // let orig_field = orig_schema.field_with_name(&name).unwrap(); + // let orig_type: DataType = orig_field.data_type().into(); + // let value = Value::Column(Column::from_arrow_rs(arrow_array, &orig_type)?); + // let data_type: DataType = virtual_column_field.data_type.as_ref().into(); + // let column = if orig_type != data_type { + // let cast_func_name = format!( + // "to_{}", + // data_type.remove_nullable().to_string().to_lowercase() + // ); + // let (cast_value, cast_data_type) = eval_function( + // None, + // &cast_func_name, + // [(value, orig_type)], + // &func_ctx, + // data_block.num_rows(), + // &BUILTIN_FUNCTIONS, + // )?; + // BlockEntry::new(cast_data_type, cast_value) + // } else { + // BlockEntry::new(data_type, value) + // }; + // data_block.add_column(column); + // continue; + // } + // let src_index = self + // .source_schema + // .index_of(&virtual_column_field.source_name) + // .unwrap(); + // let source = data_block.get_by_offset(src_index); + // let src_arg = (source.value.clone(), source.data_type.clone()); + // let path_arg = ( + // Value::Scalar(virtual_column_field.key_paths.clone()), + // DataType::String, + // ); + + // let (value, data_type) = eval_function( + // None, + // "get_by_keypath", + // [src_arg, path_arg], + // &func_ctx, + // data_block.num_rows(), + // &BUILTIN_FUNCTIONS, + // )?; + + // let column = if let Some(cast_func_name) = &virtual_column_field.cast_func_name { + // let (cast_value, cast_data_type) = eval_function( + // None, + // cast_func_name, + // [(value, data_type)], + // &func_ctx, + // data_block.num_rows(), + // &BUILTIN_FUNCTIONS, + // )?; + // BlockEntry::new(cast_data_type, cast_value) + // } else { + // BlockEntry::new(data_type, value) + // }; + // data_block.add_column(column); + // } + + // Ok(data_block) + //} + /// Deserialize virtual column data into record batches, according to the `batch_size`. + pub fn try_create_paster( &self, - mut data_block: DataBlock, virtual_data: Option, - ) -> Result { + batch_size_hint: Option, + ) -> Result { let orig_schema = virtual_data .as_ref() .map(|virtual_data| virtual_data.schema.clone()) .unwrap_or_default(); - let record_batch = virtual_data - .map(|virtual_data| { - let columns_chunks = virtual_data.data.columns_chunks()?; - column_chunks_to_record_batch( - &virtual_data.schema, - virtual_data.num_rows, - &columns_chunks, - &virtual_data.compression, - ) - }) - .transpose()?; + + let record_batches = if let Some(virtual_data) = virtual_data { + let columns_chunks = virtual_data.data.columns_chunks()?; + let chunks = column_chunks_to_record_batch( + &self.virtual_column_info.schema, + virtual_data.num_rows, + &columns_chunks, + &virtual_data.compression, + batch_size_hint, + )?; + Some(chunks) + } else { + None + }; + + let function_context = self.ctx.get_function_context()?; + + // Unfortunately, Paster cannot hold references to the fields that being cloned, + // since the caller `DeserializeDataTransform` will take mutable reference of + // VirtualColumnReader indirectly. + Ok(VirtualColumnDataPaster { + record_batches, + function_context, + next_record_batch_index: 0, + virtual_column_fields: self.virtual_column_info.virtual_column_fields.clone(), + source_schema: self.source_schema.clone(), + orig_schema, + }) + } +} + +pub struct VirtualColumnDataPaster { + record_batches: Option>, + next_record_batch_index: usize, + function_context: FunctionContext, + virtual_column_fields: Vec, + source_schema: TableSchemaRef, + orig_schema: TableSchemaRef, +} + +impl VirtualColumnDataPaster { + /// Paste virtual column to `data_block` if necessary + pub fn paste_virtual_column(&mut self, mut data_block: DataBlock) -> Result { + let record_batch = if let Some(record_batches) = &self.record_batches { + assert!(record_batches.len() > self.next_record_batch_index); + Some(&record_batches[self.next_record_batch_index]) + } else { + None + }; + + self.next_record_batch_index += 1; + + let func_ctx = &self.function_context; // If the virtual column has already generated, add it directly, // otherwise extract it from the source column - let func_ctx = self.ctx.get_function_context()?; - for virtual_column_field in self.virtual_column_info.virtual_column_fields.iter() { + for virtual_column_field in self.virtual_column_fields.iter() { let name = format!("{}", virtual_column_field.column_id); if let Some(arrow_array) = record_batch .as_ref() .and_then(|r| r.column_by_name(&name).cloned()) { - let orig_field = orig_schema.field_with_name(&name).unwrap(); + let orig_field = self.orig_schema.field_with_name(&name).unwrap(); let orig_type: DataType = orig_field.data_type().into(); let value = Value::Column(Column::from_arrow_rs(arrow_array, &orig_type)?); let data_type: DataType = virtual_column_field.data_type.as_ref().into(); @@ -240,5 +374,52 @@ impl VirtualColumnReader { } Ok(data_block) + + // for virtual_column_field in self.virtual_column_fields.iter() { + // if let Some(arrow_array) = + // record_batch.and_then(|r| r.column_by_name(&virtual_column_field.name).cloned()) + // { + // let data_type: DataType = virtual_column_field.data_type.as_ref().into(); + // let value = Value::Column(Column::from_arrow_rs(arrow_array, &data_type)?); + // data_block.add_column(BlockEntry::new(data_type, value)); + // continue; + // } + // let src_index = self + // .source_schema + // .index_of(&virtual_column_field.source_name) + // .unwrap(); + // let source = data_block.get_by_offset(src_index); + // let src_arg = (source.value.clone(), source.data_type.clone()); + // let path_arg = ( + // Value::Scalar(virtual_column_field.key_paths.clone()), + // DataType::String, + // ); + + // let (value, data_type) = eval_function( + // None, + // "get_by_keypath", + // [src_arg, path_arg], + // func_ctx, + // data_block.num_rows(), + // &BUILTIN_FUNCTIONS, + // )?; + + // let column = if let Some(cast_func_name) = &virtual_column_field.cast_func_name { + // let (cast_value, cast_data_type) = eval_function( + // None, + // cast_func_name, + // [(value, data_type)], + // func_ctx, + // data_block.num_rows(), + // &BUILTIN_FUNCTIONS, + // )?; + // BlockEntry::new(cast_data_type, cast_value) + // } else { + // BlockEntry::new(data_type, value) + // }; + // data_block.add_column(column); + //} + + // Ok(data_block) } } diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs index e78cc2b6c1b71..78a9a828fdcc1 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs @@ -201,7 +201,7 @@ impl MatchedAggregator { .insert(offset as usize) { return Err(ErrorCode::UnresolvableConflict( - "multi rows from source match one and the same row in the target_table multi times", + "1 multi rows from source match one and the same row in the target_table multi times", )); } } @@ -335,7 +335,7 @@ impl MatchedAggregator { < update_modified_offsets.len() + delete_modified_offsets.len() { return Err(ErrorCode::UnresolvableConflict( - "multi rows from source match one and the same row in the target_table multi times", + "2 multi rows from source match one and the same row in the target_table multi times", )); } diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs index c4ece46437cb6..d4f525c5a8981 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs @@ -191,7 +191,7 @@ pub struct NativeDeserializeDataTransform { // Structures for driving the pipeline: input: Arc, output: Arc, - output_data: Vec, + output_data: Option, parts: VecDeque, columns: VecDeque, scan_progress: Arc, @@ -309,7 +309,7 @@ impl NativeDeserializeDataTransform { block_reader, input, output, - output_data: vec![], + output_data: None, parts: VecDeque::new(), columns: VecDeque::new(), prewhere_columns, @@ -353,7 +353,7 @@ impl NativeDeserializeDataTransform { }; self.scan_progress.incr(&progress_values); Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, data_block.memory_size()); - self.output_data = vec![data_block]; + self.output_data = Some(data_block); } /// Check if can skip the whole block by default values. @@ -846,7 +846,7 @@ impl Processor for NativeDeserializeDataTransform { return Ok(Event::NeedConsume); } - if let Some(data_block) = self.output_data.pop() { + if let Some(data_block) = self.output_data.take() { self.output.push_data(Ok(data_block)); return Ok(Event::NeedConsume); } @@ -891,8 +891,8 @@ impl Processor for NativeDeserializeDataTransform { let columns = match columns { NativeDataSource::AggIndex(data) => { let agg_index_reader = self.index_reader.as_ref().as_ref().unwrap(); - let blocks = agg_index_reader.deserialize_native_data(data)?; - self.output_data = blocks; + let block = agg_index_reader.deserialize_native_data(data)?; + self.output_data = Some(block); self.finish_partition(); return Ok(()); } diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs index c0a5e9a6cd96b..723924e9cd6dd 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs @@ -23,6 +23,7 @@ use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; +use databend_common_catalog::query_kind::QueryKind; use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -80,7 +81,7 @@ pub struct DeserializeDataTransform { need_wait_runtime_filter: bool, runtime_filter_ready: Option>, - batch_size: usize, + batch_size_hint: Option, } unsafe impl Send for DeserializeDataTransform {} @@ -99,6 +100,12 @@ impl DeserializeDataTransform { let need_wait_runtime_filter = !ctx.get_cluster().is_empty() && ctx.get_wait_runtime_filter(plan.scan_id); + // Unfortunately, batch size is hint is only safe for Query now. + let batch_size_hint = match ctx.get_query_kind() { + QueryKind::Query => Some(ctx.get_settings().get_fuse_parquet_read_batch_size()?), + _ => None, + }; + let mut src_schema: DataSchema = (block_reader.schema().as_ref()).into(); if let Some(virtual_reader) = virtual_reader.as_ref() { let mut fields = src_schema.fields().clone(); @@ -117,7 +124,6 @@ impl DeserializeDataTransform { let output_schema: DataSchema = (&output_schema).into(); let (need_reserve_block_info, _) = need_reserve_block_info(ctx.clone(), plan.table_index); - let batch_size = ctx.get_settings().get_fuse_parquet_read_batch_size()?; Ok(ProcessorPtr::create(Box::new(DeserializeDataTransform { ctx, table_index: plan.table_index, @@ -138,7 +144,7 @@ impl DeserializeDataTransform { need_reserve_block_info, need_wait_runtime_filter, runtime_filter_ready: None, - batch_size, + batch_size_hint, }))) } @@ -272,7 +278,7 @@ impl Processor for DeserializeDataTransform { let blocks = agg_index_reader.deserialize_parquet_data( actual_part, data, - self.batch_size, + self.batch_size_hint, )?; self.update_scan_metrics(blocks.as_slice()); @@ -291,13 +297,13 @@ impl Processor for DeserializeDataTransform { columns_chunks, &part.compression, &part.location, - self.batch_size, + self.batch_size_hint, )?; let mut virtual_columns_paster = if let Some(virtual_column_reader) = self.virtual_reader.as_ref() { let record_batches = virtual_column_reader - .try_create_paster(virtual_data, self.batch_size)?; + .try_create_paster(virtual_data, self.batch_size_hint)?; Some(record_batches) } else { None From 768a25d276da3a51e1a26c638b1aaafa081b6721 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 1 Apr 2025 23:50:22 +0800 Subject: [PATCH 04/11] fix typo --- .../fuse/src/operations/merge_into/mutator/matched_mutator.rs | 4 ++-- .../src/operations/read/parquet_data_source_deserializer.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs index 78a9a828fdcc1..e78cc2b6c1b71 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs @@ -201,7 +201,7 @@ impl MatchedAggregator { .insert(offset as usize) { return Err(ErrorCode::UnresolvableConflict( - "1 multi rows from source match one and the same row in the target_table multi times", + "multi rows from source match one and the same row in the target_table multi times", )); } } @@ -335,7 +335,7 @@ impl MatchedAggregator { < update_modified_offsets.len() + delete_modified_offsets.len() { return Err(ErrorCode::UnresolvableConflict( - "2 multi rows from source match one and the same row in the target_table multi times", + "multi rows from source match one and the same row in the target_table multi times", )); } diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs index 723924e9cd6dd..3edc9e1c585d7 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs @@ -100,7 +100,7 @@ impl DeserializeDataTransform { let need_wait_runtime_filter = !ctx.get_cluster().is_empty() && ctx.get_wait_runtime_filter(plan.scan_id); - // Unfortunately, batch size is hint is only safe for Query now. + // Unfortunately, the batch size hint is only safe for Query now. let batch_size_hint = match ctx.get_query_kind() { QueryKind::Query => Some(ctx.get_settings().get_fuse_parquet_read_batch_size()?), _ => None, From 57585f6548afd45347b92f2ca6d013aee0787217 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 2 Apr 2025 00:08:05 +0800 Subject: [PATCH 05/11] fix: slice array cache item according to batch size --- .../storages/fuse/src/io/read/block/parquet/mod.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs index 3cf27bc9f88cf..5f1a3e7b53d30 100644 --- a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs +++ b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use arrow_array::Array; use arrow_array::ArrayRef; use arrow_array::RecordBatch; use arrow_array::StructArray; @@ -94,6 +95,7 @@ impl BlockReader { let mut blocks = Vec::with_capacity(record_batches.len()); + let mut offset = 0; for record_batch in record_batches { let num_rows_record_batch = record_batch.num_rows(); let mut columns = Vec::with_capacity(self.projected_schema.fields.len()); @@ -118,7 +120,7 @@ impl BlockReader { // corresponding field, and a default value has been declared for // the corresponding field. // - // Yes, it is too obscure, we need to polish it SOON. + // It is too confusing, we need to polish it SOON. let value = match column_chunks.get(&field.column_id) { Some(DataItem::RawData(data)) => { @@ -140,19 +142,21 @@ impl BlockReader { Value::from_arrow_rs(arrow_array, &data_type)? } Some(DataItem::ColumnArray(cached)) => { - // TODO this is NOT correct! if column_node.is_nested { // a defensive check, should never happen return Err(ErrorCode::StorageOther( "unexpected nested field: nested leaf field hits cached", )); } - Value::from_arrow_rs(cached.0.clone(), &data_type)? + let array = cached.0.slice(offset, record_batch.num_rows()); + Value::from_arrow_rs(array, &data_type)? } None => Value::Scalar(self.default_vals[i].clone()), }; columns.push(BlockEntry::new(data_type, value)); } + + offset += record_batch.num_rows(); blocks.push(DataBlock::new(columns, num_rows_record_batch)); } From 976385c0f56845f5cb3e9248dcbddb05af941a92 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 2 Apr 2025 01:55:25 +0800 Subject: [PATCH 06/11] fix: array cache population --- .../cache/src/providers/disk_cache_builder.rs | 2 +- .../fuse/src/io/read/block/parquet/mod.rs | 39 +++++++++++++------ 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/src/query/storages/common/cache/src/providers/disk_cache_builder.rs b/src/query/storages/common/cache/src/providers/disk_cache_builder.rs index f6fcbad755fb1..7127c9c97f476 100644 --- a/src/query/storages/common/cache/src/providers/disk_cache_builder.rs +++ b/src/query/storages/common/cache/src/providers/disk_cache_builder.rs @@ -36,7 +36,7 @@ struct CacheItem { value: Bytes, } -#[derive(Clone)] +#[derive(Clone, Eq, PartialEq, Hash)] pub struct TableDataCacheKey { cache_key: String, } diff --git a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs index 5f1a3e7b53d30..71b75bcbbd9ae 100644 --- a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs +++ b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs @@ -94,6 +94,7 @@ impl BlockReader { }; let mut blocks = Vec::with_capacity(record_batches.len()); + let mut array_cache_buffer = HashMap::with_capacity(record_batches.len()); let mut offset = 0; for record_batch in record_batches { @@ -126,18 +127,17 @@ impl BlockReader { Some(DataItem::RawData(data)) => { // get the deserialized arrow array, which may be a nested array let arrow_array = column_by_name(&record_batch, &name_paths[i]); - if !column_node.is_nested { - if let Some(cache) = &array_cache { - let meta = column_metas.get(&field.column_id).unwrap(); - let (offset, len) = meta.offset_length(); - let key = TableDataCacheKey::new( - block_path, - field.column_id, - offset, - len, - ); - cache.insert(key.into(), (arrow_array.clone(), data.len())); - } + if !column_node.is_nested && array_cache.is_some() { + let meta = column_metas.get(&field.column_id).unwrap(); + let (offset, len) = meta.offset_length(); + let key = + TableDataCacheKey::new(block_path, field.column_id, offset, len); + array_cache_buffer + .entry(key) + .and_modify(|v: &mut Vec<_>| { + v.push((arrow_array.clone(), data.len())) + }) + .or_insert(vec![(arrow_array.clone(), data.len())]); } Value::from_arrow_rs(arrow_array, &data_type)? } @@ -160,6 +160,21 @@ impl BlockReader { blocks.push(DataBlock::new(columns, num_rows_record_batch)); } + // TODO doc this + if let Some(array_cache) = &array_cache { + for (key, items) in array_cache_buffer { + let mut arrays = Vec::with_capacity(items.len()); + let mut len = 0; + for (array, size) in &items { + arrays.push(array.as_ref()); + len += size; + } + use arrow::compute::concat; + let result = concat(&arrays)?; + array_cache.insert(key.into(), (result, len)); + } + } + Ok(blocks) } } From f60b8fe68bcb28ac979b90f0f218f3be10571241 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 2 Apr 2025 02:00:27 +0800 Subject: [PATCH 07/11] refactor: use VecDeque for outputting data --- .../read/parquet_data_source_deserializer.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs index 3edc9e1c585d7..f3b783c478ace 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::collections::VecDeque; use std::ops::BitAnd; use std::sync::Arc; use std::time::Instant; @@ -65,7 +66,7 @@ pub struct DeserializeDataTransform { input: Arc, output: Arc, - output_data: Vec, + output_data: VecDeque, src_schema: DataSchema, output_schema: DataSchema, parts: Vec, @@ -132,7 +133,7 @@ impl DeserializeDataTransform { block_reader, input, output, - output_data: vec![], + output_data: VecDeque::new(), src_schema, output_schema, parts: vec![], @@ -233,7 +234,7 @@ impl Processor for DeserializeDataTransform { return Ok(Event::NeedConsume); } - if let Some(data_block) = self.output_data.pop() { + if let Some(data_block) = self.output_data.pop_front() { self.output.push_data(Ok(data_block)); return Ok(Event::NeedConsume); } @@ -283,7 +284,7 @@ impl Processor for DeserializeDataTransform { self.update_scan_metrics(blocks.as_slice()); - self.output_data = blocks; + self.output_data = blocks.into(); } ParquetDataSource::Normal((data, virtual_data)) => { @@ -318,7 +319,7 @@ impl Processor for DeserializeDataTransform { self.update_scan_metrics(data_blocks.as_slice()); - let mut output_blocks = Vec::with_capacity(data_blocks.len()); + let mut output_blocks = VecDeque::with_capacity(data_blocks.len()); for mut data_block in data_blocks { let origin_num_rows = data_block.num_rows(); @@ -359,7 +360,7 @@ impl Processor for DeserializeDataTransform { self.block_reader.query_internal_columns(), self.need_reserve_block_info, )?; - output_blocks.push(data_block); + output_blocks.push_back(data_block); } self.output_data = output_blocks; From 2cd4789cdc36c9bc2807b8dc3458931625980015 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 2 Apr 2025 13:07:16 +0800 Subject: [PATCH 08/11] fix flaky logic test --- tests/sqllogictests/suites/tpch/spill.test | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/sqllogictests/suites/tpch/spill.test b/tests/sqllogictests/suites/tpch/spill.test index c6aa2cea290b0..651dd1323b833 100644 --- a/tests/sqllogictests/suites/tpch/spill.test +++ b/tests/sqllogictests/suites/tpch/spill.test @@ -34,6 +34,7 @@ FROM ( SELECT * FROM lineitem, part + ORDER BY lineitem.l_orderkey LIMIT 10000000 ); ---- From 0d3a27c297ca717fab724ee9c5c0abd79eb667c1 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 2 Apr 2025 14:06:55 +0800 Subject: [PATCH 09/11] tweak default setting value --- src/query/settings/src/settings_default.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index f4dc4f24e08ea..20171d624eb6d 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1269,7 +1269,7 @@ impl DefaultSettings { desc: "The batch size while deserializing fuse table with parquet storage format", mode: SettingMode::Both, scope: SettingScope::Both, - range: Some(SettingRange::Numeric(0..=1_000_000)), + range: Some(SettingRange::Numeric(1..=1_000_000)), }), ]); From 2a7b27bd375819007290e4eb61ba2338eceb0244 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Mon, 21 Apr 2025 19:50:12 +0800 Subject: [PATCH 10/11] fix merge conflicts --- .../virtual_column_reader_parquet.rs | 179 ++++-------------- .../read/parquet_data_source_deserializer.rs | 16 +- 2 files changed, 49 insertions(+), 146 deletions(-) diff --git a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs index ba9a5877c9e52..434614e154f05 100644 --- a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs +++ b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs @@ -149,100 +149,20 @@ impl VirtualColumnReader { )) } - // pub fn deserialize_virtual_columns( - // &self, - // mut data_block: DataBlock, - // virtual_data: Option, - //) -> Result { - // let orig_schema = virtual_data - // .as_ref() - // .map(|virtual_data| virtual_data.schema.clone()) - // .unwrap_or_default(); - // let record_batch = virtual_data - // .map(|virtual_data| { - // let columns_chunks = virtual_data.data.columns_chunks()?; - // column_chunks_to_record_batch( - // &virtual_data.schema, - // virtual_data.num_rows, - // &columns_chunks, - // &virtual_data.compression, - // ) - // }) - // .transpose()?; - - // // If the virtual column has already generated, add it directly, - // // otherwise extract it from the source column - // let func_ctx = self.ctx.get_function_context()?; - // for virtual_column_field in self.virtual_column_info.virtual_column_fields.iter() { - // let name = format!("{}", virtual_column_field.column_id); - // if let Some(arrow_array) = record_batch - // .as_ref() - // .and_then(|r| r.column_by_name(&name).cloned()) - // { - // let orig_field = orig_schema.field_with_name(&name).unwrap(); - // let orig_type: DataType = orig_field.data_type().into(); - // let value = Value::Column(Column::from_arrow_rs(arrow_array, &orig_type)?); - // let data_type: DataType = virtual_column_field.data_type.as_ref().into(); - // let column = if orig_type != data_type { - // let cast_func_name = format!( - // "to_{}", - // data_type.remove_nullable().to_string().to_lowercase() - // ); - // let (cast_value, cast_data_type) = eval_function( - // None, - // &cast_func_name, - // [(value, orig_type)], - // &func_ctx, - // data_block.num_rows(), - // &BUILTIN_FUNCTIONS, - // )?; - // BlockEntry::new(cast_data_type, cast_value) - // } else { - // BlockEntry::new(data_type, value) - // }; - // data_block.add_column(column); - // continue; - // } - // let src_index = self - // .source_schema - // .index_of(&virtual_column_field.source_name) - // .unwrap(); - // let source = data_block.get_by_offset(src_index); - // let src_arg = (source.value.clone(), source.data_type.clone()); - // let path_arg = ( - // Value::Scalar(virtual_column_field.key_paths.clone()), - // DataType::String, - // ); - - // let (value, data_type) = eval_function( - // None, - // "get_by_keypath", - // [src_arg, path_arg], - // &func_ctx, - // data_block.num_rows(), - // &BUILTIN_FUNCTIONS, - // )?; - - // let column = if let Some(cast_func_name) = &virtual_column_field.cast_func_name { - // let (cast_value, cast_data_type) = eval_function( - // None, - // cast_func_name, - // [(value, data_type)], - // &func_ctx, - // data_block.num_rows(), - // &BUILTIN_FUNCTIONS, - // )?; - // BlockEntry::new(cast_data_type, cast_value) - // } else { - // BlockEntry::new(data_type, value) - // }; - // data_block.add_column(column); - // } - - // Ok(data_block) - //} - /// Deserialize virtual column data into record batches, according to the `batch_size`. - pub fn try_create_paster( + /// Creates a VirtualColumnDataPaster that handles the integration of virtual column data into DataBlocks. + /// + /// This method prepares a paster object that can process virtual column data from virtual block + /// read result, and later merge this data into existing DataBlocks. It deserializes virtual + /// column data into record batches according to the optional batch size hint. + /// + /// # Arguments + /// * `virtual_data` - Optional virtual block read result containing the data to be processed + /// * `batch_size_hint` - Optional hint for controlling the size of generated record batches + /// + /// # Returns + /// * `Result` - A paster object that can merge virtual column data + /// into DataBlocks, or an error if creation fails + pub fn try_create_virtual_column_paster( &self, virtual_data: Option, batch_size_hint: Option, @@ -292,7 +212,29 @@ pub struct VirtualColumnDataPaster { } impl VirtualColumnDataPaster { - /// Paste virtual column to `data_block` if necessary + /// Processes a DataBlock by adding virtual columns to it. + /// + /// This method enriches the provided DataBlock with virtual columns by either: + /// 1. Using pre-computed virtual column data from deserialized record batches if available + /// 2. Computing virtual column values on-the-fly from source columns + /// + /// For each virtual column field: + /// - If the corresponding data exists in record batches, it is extracted and added directly + /// - If not available in record batches, it is computed from source columns using path extraction + /// - Type casting is performed if the source data type doesn't match the target virtual column type + /// + /// The method tracks which record batch to use via an internal counter that advances with each call. + /// + /// # Arguments + /// * `data_block` - The input DataBlock to which virtual columns will be added + /// + /// # Returns + /// * `Result` - The modified DataBlock containing the original columns plus virtual columns, + /// or an error if the operation fails + /// + /// # Note + /// This method must be called sequentially for each data block. The internal state keeps track of + /// which pre-computed record batch to use for each call. pub fn paste_virtual_column(&mut self, mut data_block: DataBlock) -> Result { let record_batch = if let Some(record_batches) = &self.record_batches { assert!(record_batches.len() > self.next_record_batch_index); @@ -374,52 +316,5 @@ impl VirtualColumnDataPaster { } Ok(data_block) - - // for virtual_column_field in self.virtual_column_fields.iter() { - // if let Some(arrow_array) = - // record_batch.and_then(|r| r.column_by_name(&virtual_column_field.name).cloned()) - // { - // let data_type: DataType = virtual_column_field.data_type.as_ref().into(); - // let value = Value::Column(Column::from_arrow_rs(arrow_array, &data_type)?); - // data_block.add_column(BlockEntry::new(data_type, value)); - // continue; - // } - // let src_index = self - // .source_schema - // .index_of(&virtual_column_field.source_name) - // .unwrap(); - // let source = data_block.get_by_offset(src_index); - // let src_arg = (source.value.clone(), source.data_type.clone()); - // let path_arg = ( - // Value::Scalar(virtual_column_field.key_paths.clone()), - // DataType::String, - // ); - - // let (value, data_type) = eval_function( - // None, - // "get_by_keypath", - // [src_arg, path_arg], - // func_ctx, - // data_block.num_rows(), - // &BUILTIN_FUNCTIONS, - // )?; - - // let column = if let Some(cast_func_name) = &virtual_column_field.cast_func_name { - // let (cast_value, cast_data_type) = eval_function( - // None, - // cast_func_name, - // [(value, data_type)], - // func_ctx, - // data_block.num_rows(), - // &BUILTIN_FUNCTIONS, - // )?; - // BlockEntry::new(cast_data_type, cast_value) - // } else { - // BlockEntry::new(data_type, value) - // }; - // data_block.add_column(column); - //} - - // Ok(data_block) } } diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs index f3b783c478ace..36f0aab489778 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs @@ -301,11 +301,15 @@ impl Processor for DeserializeDataTransform { self.batch_size_hint, )?; + // Initialize virtual column paster if needed: which will add virtual columns to + // each DataBlock. The paster is created from the VirtualColumnReader and maintains + // internal state to track which record batch of virtual column data to use for each DataBlock. let mut virtual_columns_paster = if let Some(virtual_column_reader) = self.virtual_reader.as_ref() { - let record_batches = virtual_column_reader - .try_create_paster(virtual_data, self.batch_size_hint)?; - Some(record_batches) + Some(virtual_column_reader.try_create_virtual_column_paster( + virtual_data, + self.batch_size_hint, + )?) } else { None }; @@ -331,7 +335,11 @@ impl Processor for DeserializeDataTransform { } } - // Add optional virtual columns + // Process virtual columns if available: This step enriches the DataBlock + // with virtual columns that were not originally present. + // The paster was created earlier from the VirtualColumnReader and maintains + // the state necessary to merge virtual columns into each data block in + // sequence, ensuring each block gets the correct corresponding virtual data. if let Some(virtual_columns_paster) = &mut virtual_columns_paster { data_block = virtual_columns_paster.paste_virtual_column(data_block)?; } From 512cba024ccbc54b137c72aa96e92ca5fe7cdc02 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Mon, 21 Apr 2025 20:01:46 +0800 Subject: [PATCH 11/11] fix clippy warning --- .../io/read/virtual_column/virtual_column_reader_parquet.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs index 434614e154f05..0a77bbbc0956b 100644 --- a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs +++ b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs @@ -268,7 +268,7 @@ impl VirtualColumnDataPaster { None, &cast_func_name, [(value, orig_type)], - &func_ctx, + func_ctx, data_block.num_rows(), &BUILTIN_FUNCTIONS, )?; @@ -294,7 +294,7 @@ impl VirtualColumnDataPaster { None, "get_by_keypath", [src_arg, path_arg], - &func_ctx, + func_ctx, data_block.num_rows(), &BUILTIN_FUNCTIONS, )?; @@ -304,7 +304,7 @@ impl VirtualColumnDataPaster { None, cast_func_name, [(value, data_type)], - &func_ctx, + func_ctx, data_block.num_rows(), &BUILTIN_FUNCTIONS, )?;