Skip to content

Commit 4382b95

Browse files
authored
feat: support load/select parquet files into a single variant column. (#18028)
* refactor: simplify ParquetCopySource. * ci: update test to not select $1 on parquet. * cast_scalar_to_variant support tuple field names. * refactor: abstract collect_parts for parquet table. * feat: read parquet into variant. * add tests * fix fmt * fix clippy * rm unused dep
1 parent 38c3722 commit 4382b95

File tree

32 files changed

+971
-145
lines changed

32 files changed

+971
-145
lines changed

โ€ŽCargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

โ€Žsrc/query/catalog/src/plan/datasource/datasource_info/stage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub struct StageTableInfo {
4747
pub duplicated_files_detected: Vec<String>,
4848
pub is_select: bool,
4949
pub copy_into_table_options: CopyIntoTableOptions,
50+
pub is_variant: bool,
5051

5152
// copy into location only
5253
pub copy_into_location_ordered: bool,

โ€Žsrc/query/expression/src/types/variant.rs

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use crate::values::Column;
4545
use crate::values::Scalar;
4646
use crate::values::ScalarRef;
4747
use crate::ColumnBuilder;
48+
use crate::TableDataType;
4849

4950
/// JSONB bytes representation of `null`.
5051
pub const JSONB_NULL: &[u8] = &[0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
@@ -215,7 +216,12 @@ impl VariantType {
215216
}
216217
}
217218

218-
pub fn cast_scalar_to_variant(scalar: ScalarRef, tz: &TimeZone, buf: &mut Vec<u8>) {
219+
pub fn cast_scalar_to_variant(
220+
scalar: ScalarRef,
221+
tz: &TimeZone,
222+
buf: &mut Vec<u8>,
223+
table_data_type: Option<&TableDataType>,
224+
) {
219225
let value = match scalar {
220226
ScalarRef::Null => jsonb::Value::Null,
221227
ScalarRef::EmptyArray => jsonb::Value::Array(vec![]),
@@ -264,13 +270,24 @@ pub fn cast_scalar_to_variant(scalar: ScalarRef, tz: &TimeZone, buf: &mut Vec<u8
264270
jsonb::Value::Interval(interval)
265271
}
266272
ScalarRef::Array(col) => {
267-
let items = cast_scalars_to_variants(col.iter(), tz);
273+
let typ = if let Some(TableDataType::Array(typ)) = table_data_type {
274+
Some(typ.remove_nullable())
275+
} else {
276+
None
277+
};
278+
let items = cast_scalars_to_variants(col.iter(), tz, typ.as_ref());
268279
let owned_jsonb = OwnedJsonb::build_array(items.iter().map(RawJsonb::new))
269280
.expect("failed to build jsonb array");
270281
buf.extend_from_slice(owned_jsonb.as_ref());
271282
return;
272283
}
273284
ScalarRef::Map(col) => {
285+
let typ = if let Some(TableDataType::Map(typ)) = table_data_type {
286+
Some(typ.remove_nullable())
287+
} else {
288+
None
289+
};
290+
274291
let kv_col = KvPair::<AnyType, AnyType>::try_downcast_column(&col).unwrap();
275292
let mut kvs = BTreeMap::new();
276293
for (k, v) in kv_col.iter() {
@@ -284,7 +301,7 @@ pub fn cast_scalar_to_variant(scalar: ScalarRef, tz: &TimeZone, buf: &mut Vec<u8
284301
_ => unreachable!(),
285302
};
286303
let mut val = vec![];
287-
cast_scalar_to_variant(v, tz, &mut val);
304+
cast_scalar_to_variant(v, tz, &mut val, typ.as_ref());
288305
kvs.insert(key, val);
289306
}
290307
let owned_jsonb =
@@ -305,14 +322,43 @@ pub fn cast_scalar_to_variant(scalar: ScalarRef, tz: &TimeZone, buf: &mut Vec<u8
305322
return;
306323
}
307324
ScalarRef::Tuple(fields) => {
308-
let values = cast_scalars_to_variants(fields, tz);
309-
let owned_jsonb = OwnedJsonb::build_object(
310-
values
311-
.iter()
312-
.enumerate()
313-
.map(|(i, bytes)| (format!("{}", i + 1), RawJsonb::new(bytes))),
314-
)
315-
.expect("failed to build jsonb object from tuple");
325+
let owned_jsonb = match table_data_type {
326+
Some(TableDataType::Tuple {
327+
fields_name,
328+
fields_type,
329+
}) => {
330+
assert_eq!(fields.len(), fields_type.len());
331+
let iter = fields.into_iter();
332+
let mut builder = BinaryColumnBuilder::with_capacity(iter.size_hint().0, 0);
333+
for (scalar, typ) in iter.zip(fields_type) {
334+
cast_scalar_to_variant(
335+
scalar,
336+
tz,
337+
&mut builder.data,
338+
Some(&typ.remove_nullable()),
339+
);
340+
builder.commit_row();
341+
}
342+
let values = builder.build();
343+
OwnedJsonb::build_object(
344+
values
345+
.iter()
346+
.enumerate()
347+
.map(|(i, bytes)| (fields_name[i].clone(), RawJsonb::new(bytes))),
348+
)
349+
.expect("failed to build jsonb object from tuple")
350+
}
351+
_ => {
352+
let values = cast_scalars_to_variants(fields, tz, None);
353+
OwnedJsonb::build_object(
354+
values
355+
.iter()
356+
.enumerate()
357+
.map(|(i, bytes)| (format!("{}", i + 1), RawJsonb::new(bytes))),
358+
)
359+
.expect("failed to build jsonb object from tuple")
360+
}
361+
};
316362
buf.extend_from_slice(owned_jsonb.as_ref());
317363
return;
318364
}
@@ -342,11 +388,12 @@ pub fn cast_scalar_to_variant(scalar: ScalarRef, tz: &TimeZone, buf: &mut Vec<u8
342388
pub fn cast_scalars_to_variants(
343389
scalars: impl IntoIterator<Item = ScalarRef>,
344390
tz: &TimeZone,
391+
table_data_type: Option<&TableDataType>,
345392
) -> BinaryColumn {
346393
let iter = scalars.into_iter();
347394
let mut builder = BinaryColumnBuilder::with_capacity(iter.size_hint().0, 0);
348395
for scalar in iter {
349-
cast_scalar_to_variant(scalar, tz, &mut builder.data);
396+
cast_scalar_to_variant(scalar, tz, &mut builder.data, table_data_type);
350397
builder.commit_row();
351398
}
352399
builder.build()

โ€Žsrc/query/functions/src/aggregates/aggregate_json_array_agg.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ where
120120
continue;
121121
}
122122
let mut val = vec![];
123-
cast_scalar_to_variant(v.as_ref(), &tz, &mut val);
123+
cast_scalar_to_variant(v.as_ref(), &tz, &mut val, None);
124124
items.push(val);
125125
}
126126
let owned_jsonb = OwnedJsonb::build_array(items.iter().map(|v| RawJsonb::new(v)))

โ€Žsrc/query/functions/src/aggregates/aggregate_json_object_agg.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ where
184184
continue;
185185
}
186186
let mut val = vec![];
187-
cast_scalar_to_variant(v.as_ref(), &tz, &mut val);
187+
cast_scalar_to_variant(v.as_ref(), &tz, &mut val, None);
188188
values.push((key, val));
189189
}
190190
let owned_jsonb =

โ€Žsrc/query/functions/src/scalars/variant.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,7 +1112,12 @@ pub fn register(registry: &mut FunctionRegistry) {
11121112
Scalar::Null => Value::Scalar(Scalar::Null),
11131113
_ => {
11141114
let mut buf = Vec::new();
1115-
cast_scalar_to_variant(scalar.as_ref(), &ctx.func_ctx.tz, &mut buf);
1115+
cast_scalar_to_variant(
1116+
scalar.as_ref(),
1117+
&ctx.func_ctx.tz,
1118+
&mut buf,
1119+
None,
1120+
);
11161121
Value::Scalar(Scalar::Variant(buf))
11171122
}
11181123
},
@@ -1124,7 +1129,7 @@ pub fn register(registry: &mut FunctionRegistry) {
11241129
}
11251130
_ => None,
11261131
};
1127-
let new_col = cast_scalars_to_variants(col.iter(), &ctx.func_ctx.tz);
1132+
let new_col = cast_scalars_to_variants(col.iter(), &ctx.func_ctx.tz, None);
11281133
if let Some(validity) = validity {
11291134
Value::Column(NullableColumn::new_column(
11301135
Column::Variant(new_col),
@@ -1157,7 +1162,7 @@ pub fn register(registry: &mut FunctionRegistry) {
11571162
Scalar::Null => Value::Scalar(None),
11581163
_ => {
11591164
let mut buf = Vec::new();
1160-
cast_scalar_to_variant(scalar.as_ref(), &ctx.func_ctx.tz, &mut buf);
1165+
cast_scalar_to_variant(scalar.as_ref(), &ctx.func_ctx.tz, &mut buf, None);
11611166
Value::Scalar(Some(buf))
11621167
}
11631168
},
@@ -1167,7 +1172,7 @@ pub fn register(registry: &mut FunctionRegistry) {
11671172
Column::Nullable(box ref nullable_column) => nullable_column.validity.clone(),
11681173
_ => Bitmap::new_constant(true, col.len()),
11691174
};
1170-
let new_col = cast_scalars_to_variants(col.iter(), &ctx.func_ctx.tz);
1175+
let new_col = cast_scalars_to_variants(col.iter(), &ctx.func_ctx.tz, None);
11711176
Value::Column(NullableColumn::new(new_col, validity))
11721177
}
11731178
},
@@ -2132,7 +2137,7 @@ fn json_array_fn(args: &[Value<AnyType>], ctx: &mut EvalContext) -> Value<AnyTyp
21322137
for column in &columns {
21332138
let v = unsafe { column.index_unchecked(idx) };
21342139
let mut val = vec![];
2135-
cast_scalar_to_variant(v, &ctx.func_ctx.tz, &mut val);
2140+
cast_scalar_to_variant(v, &ctx.func_ctx.tz, &mut val, None);
21362141
items.push(val);
21372142
}
21382143
match OwnedJsonb::build_array(items.iter().map(|v| RawJsonb::new(v))) {
@@ -2203,7 +2208,7 @@ fn json_object_impl_fn(
22032208
}
22042209
set.insert(key);
22052210
let mut val = vec![];
2206-
cast_scalar_to_variant(v, &ctx.func_ctx.tz, &mut val);
2211+
cast_scalar_to_variant(v, &ctx.func_ctx.tz, &mut val, None);
22072212
kvs.push((key, val));
22082213
}
22092214
if !has_err {
@@ -2598,7 +2603,7 @@ fn json_object_insert_fn(
25982603
_ => {
25992604
// if the new value is not a json value, cast it to json.
26002605
let mut new_val_buf = vec![];
2601-
cast_scalar_to_variant(new_val.clone(), &ctx.func_ctx.tz, &mut new_val_buf);
2606+
cast_scalar_to_variant(new_val.clone(), &ctx.func_ctx.tz, &mut new_val_buf, None);
26022607
let new_val = RawJsonb::new(new_val_buf.as_bytes());
26032608
value.object_insert(new_key, &new_val, update_flag)
26042609
}

โ€Žsrc/query/service/src/interpreters/interpreter_copy_into_location.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ impl CopyIntoLocationInterpreter {
115115
copy_into_table_options: Default::default(),
116116
stage_root: "".to_string(),
117117
copy_into_location_ordered: self.plan.is_ordered,
118+
is_variant: false,
118119
},
119120
}));
120121

โ€Žsrc/query/service/src/sessions/query_ctx.rs

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1657,30 +1657,56 @@ impl TableContext for QueryContext {
16571657
};
16581658
match stage_info.file_format_params {
16591659
FileFormatParams::Parquet(..) => {
1660-
let mut read_options = ParquetReadOptions::default();
1661-
1662-
if !self.get_settings().get_enable_parquet_page_index()? {
1663-
read_options = read_options.with_prune_pages(false);
1664-
}
1660+
if max_column_position > 1 {
1661+
Err(ErrorCode::SemanticError(
1662+
"[QUERY-CTX] Query from parquet file only support $1 as column position",
1663+
))
1664+
} else if max_column_position == 0 {
1665+
let mut read_options = ParquetReadOptions::default();
1666+
let settings = self.query_settings.clone();
1667+
1668+
if !settings.get_enable_parquet_page_index()? {
1669+
read_options = read_options.with_prune_pages(false);
1670+
}
16651671

1666-
if !self.get_settings().get_enable_parquet_rowgroup_pruning()? {
1667-
read_options = read_options.with_prune_row_groups(false);
1668-
}
1672+
if !settings.get_enable_parquet_rowgroup_pruning()? {
1673+
read_options = read_options.with_prune_row_groups(false);
1674+
}
16691675

1670-
if !self.get_settings().get_enable_parquet_prewhere()? {
1671-
read_options = read_options.with_do_prewhere(false);
1676+
if !settings.get_enable_parquet_prewhere()? {
1677+
read_options = read_options.with_do_prewhere(false);
1678+
}
1679+
ParquetTable::create(
1680+
stage_info.clone(),
1681+
files_info,
1682+
read_options,
1683+
files_to_copy,
1684+
self.get_settings(),
1685+
self.get_query_kind(),
1686+
case_sensitive,
1687+
)
1688+
.await
1689+
} else {
1690+
let schema = Arc::new(TableSchema::new(vec![TableField::new(
1691+
"_$1",
1692+
TableDataType::Variant,
1693+
)]));
1694+
let info = StageTableInfo {
1695+
schema,
1696+
stage_info,
1697+
files_info,
1698+
files_to_copy,
1699+
duplicated_files_detected: vec![],
1700+
is_select: true,
1701+
default_exprs: None,
1702+
copy_into_location_options: Default::default(),
1703+
copy_into_table_options: Default::default(),
1704+
stage_root,
1705+
copy_into_location_ordered: false,
1706+
is_variant: true,
1707+
};
1708+
StageTable::try_create(info)
16721709
}
1673-
1674-
ParquetTable::create(
1675-
stage_info.clone(),
1676-
files_info,
1677-
read_options,
1678-
files_to_copy,
1679-
self.get_settings(),
1680-
self.get_query_kind(),
1681-
case_sensitive,
1682-
)
1683-
.await
16841710
}
16851711
FileFormatParams::Orc(..) => {
16861712
let schema = Arc::new(TableSchema::empty());
@@ -1696,6 +1722,7 @@ impl TableContext for QueryContext {
16961722
copy_into_table_options: Default::default(),
16971723
stage_root,
16981724
copy_into_location_ordered: false,
1725+
is_variant: false,
16991726
};
17001727
OrcTable::try_create(info).await
17011728
}
@@ -1716,6 +1743,7 @@ impl TableContext for QueryContext {
17161743
copy_into_table_options: Default::default(),
17171744
stage_root,
17181745
copy_into_location_ordered: false,
1746+
is_variant: true,
17191747
};
17201748
StageTable::try_create(info)
17211749
}
@@ -1754,6 +1782,7 @@ impl TableContext for QueryContext {
17541782
copy_into_table_options: Default::default(),
17551783
stage_root,
17561784
copy_into_location_ordered: false,
1785+
is_variant: false,
17571786
};
17581787
StageTable::try_create(info)
17591788
}

โ€Žsrc/query/sql/src/planner/binder/copy_into_table.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ impl Binder {
225225
copy_into_table_options: stmt.options.clone(),
226226
stage_root: "".to_string(),
227227
copy_into_location_ordered: false,
228+
is_variant: false,
228229
},
229230
values_consts: vec![],
230231
required_source_schema: required_values_schema.clone(),
@@ -406,6 +407,7 @@ impl Binder {
406407
copy_into_table_options: options,
407408
stage_root: "".to_string(),
408409
copy_into_location_ordered: false,
410+
is_variant: false,
409411
},
410412
write_mode,
411413
query: None,

โ€Žsrc/query/storages/parquet/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ databend-storages-common-pruner = { workspace = true }
3232
databend-storages-common-stage = { workspace = true }
3333
databend-storages-common-table-meta = { workspace = true }
3434
futures = { workspace = true }
35+
jiff = { workspace = true }
3536
log = { workspace = true }
3637
opendal = { workspace = true }
3738
parquet = { workspace = true }

โ€Žsrc/query/storages/parquet/src/copy_into_table/source.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ impl Processor for ParquetCopySource {
185185
}
186186
_ => unreachable!(),
187187
}
188+
188189
Ok(())
189190
}
190191
}

โ€Žsrc/query/storages/parquet/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ mod statistics;
4141
mod transformer;
4242

4343
mod meta;
44+
mod parquet_variant_table;
4445
mod schema;
4546

4647
pub use copy_into_table::ParquetTableForCopy;
@@ -53,6 +54,7 @@ pub use parquet_reader::ParquetFileReader;
5354
pub use parquet_reader::ParquetReaderBuilder;
5455
pub use parquet_reader::ParquetWholeFileReader;
5556
pub use parquet_table::ParquetTable;
57+
pub use parquet_variant_table::ParquetVariantTable;
5658
// for it test
5759
pub use pruning::ParquetPruner;
5860
pub use source::ParquetSource;

0 commit comments

Comments
ย (0)