Skip to content

Commit b06ebd2

Browse files
committed
clean up
1 parent 20d9d60 commit b06ebd2

File tree

2 files changed

+25
-16
lines changed

2 files changed

+25
-16
lines changed

src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ pub fn column_chunks_to_record_batch(
7474
batch_size,
7575
None,
7676
)?;
77-
let records = record_reader.try_collect()?;
78-
// TODO assert the row numbers?
77+
78+
let records: Vec<_> = record_reader.try_collect()?;
79+
assert_eq!(
80+
num_rows,
81+
records.iter().map(|r| r.num_rows()).sum::<usize>()
82+
);
7983
Ok(records)
8084
}

src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs

+19-14
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::collections::HashSet;
1616

1717
use arrow_array::RecordBatch;
18-
use databend_common_catalog::plan::VirtualColumnInfo;
18+
use databend_common_catalog::plan::VirtualColumnField;
1919
use databend_common_exception::Result;
2020
use databend_common_expression::eval_function;
2121
use databend_common_expression::types::DataType;
@@ -132,12 +132,13 @@ impl VirtualColumnReader {
132132
))
133133
}
134134

135+
/// Deserialize virtual column data into record batches, according to the `batch_size`.
135136
pub fn try_create_paster(
136137
&self,
137138
virtual_data: Option<VirtualBlockReadResult>,
138139
batch_size: usize,
139-
) -> Result<VirtualColumnDataModifier> {
140-
let chunks = if let Some(virtual_data) = virtual_data {
140+
) -> Result<VirtualColumnDataPaster> {
141+
let record_batches = if let Some(virtual_data) = virtual_data {
141142
let columns_chunks = virtual_data.data.columns_chunks()?;
142143
let chunks = column_chunks_to_record_batch(
143144
&self.virtual_column_info.schema,
@@ -151,27 +152,31 @@ impl VirtualColumnReader {
151152
None
152153
};
153154

154-
let func_ctx = self.ctx.get_function_context()?;
155+
let function_context = self.ctx.get_function_context()?;
155156

156-
Ok(VirtualColumnDataModifier {
157-
record_batches: chunks,
158-
function_context: func_ctx,
157+
// Unfortunately, Paster cannot hold references to the fields that being cloned,
158+
// since the caller `DeserializeDataTransform` will take mutable reference of
159+
// VirtualColumnReader indirectly.
160+
Ok(VirtualColumnDataPaster {
161+
record_batches,
162+
function_context,
159163
next_record_batch_index: 0,
160-
virtual_column_info: self.virtual_column_info.clone(),
164+
virtual_column_fields: self.virtual_column_info.virtual_column_fields.clone(),
161165
source_schema: self.source_schema.clone(),
162166
})
163167
}
164168
}
165169

166-
pub struct VirtualColumnDataModifier {
170+
pub struct VirtualColumnDataPaster {
167171
record_batches: Option<Vec<RecordBatch>>,
168172
next_record_batch_index: usize,
169173
function_context: FunctionContext,
170-
virtual_column_info: VirtualColumnInfo,
174+
virtual_column_fields: Vec<VirtualColumnField>,
171175
source_schema: TableSchemaRef,
172176
}
173177

174-
impl VirtualColumnDataModifier {
178+
impl VirtualColumnDataPaster {
179+
/// Paste virtual column to `data_block` if necessary
175180
pub fn paste_virtual_column(&mut self, mut data_block: DataBlock) -> Result<DataBlock> {
176181
let record_batch = if let Some(record_batches) = &self.record_batches {
177182
assert!(record_batches.len() > self.next_record_batch_index);
@@ -183,7 +188,7 @@ impl VirtualColumnDataModifier {
183188
self.next_record_batch_index += 1;
184189

185190
let func_ctx = &self.function_context;
186-
for virtual_column_field in self.virtual_column_info.virtual_column_fields.iter() {
191+
for virtual_column_field in self.virtual_column_fields.iter() {
187192
if let Some(arrow_array) =
188193
record_batch.and_then(|r| r.column_by_name(&virtual_column_field.name).cloned())
189194
{
@@ -207,7 +212,7 @@ impl VirtualColumnDataModifier {
207212
None,
208213
"get_by_keypath",
209214
[src_arg, path_arg],
210-
&func_ctx,
215+
func_ctx,
211216
data_block.num_rows(),
212217
&BUILTIN_FUNCTIONS,
213218
)?;
@@ -217,7 +222,7 @@ impl VirtualColumnDataModifier {
217222
None,
218223
cast_func_name,
219224
[(value, data_type)],
220-
&func_ctx,
225+
func_ctx,
221226
data_block.num_rows(),
222227
&BUILTIN_FUNCTIONS,
223228
)?;

0 commit comments

Comments
 (0)