Skip to content

feat: support load/select parquet files into a single variant column. #18028

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct StageTableInfo {
pub duplicated_files_detected: Vec<String>,
pub is_select: bool,
pub copy_into_table_options: CopyIntoTableOptions,
pub is_variant: bool,

// copy into location only
pub copy_into_location_ordered: bool,
Expand Down
71 changes: 59 additions & 12 deletions src/query/expression/src/types/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::values::Column;
use crate::values::Scalar;
use crate::values::ScalarRef;
use crate::ColumnBuilder;
use crate::TableDataType;

/// JSONB bytes representation of `null`.
pub const JSONB_NULL: &[u8] = &[0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
Expand Down Expand Up @@ -215,7 +216,12 @@ impl VariantType {
}
}

pub fn cast_scalar_to_variant(scalar: ScalarRef, tz: &TimeZone, buf: &mut Vec<u8>) {
pub fn cast_scalar_to_variant(
scalar: ScalarRef,
tz: &TimeZone,
buf: &mut Vec<u8>,
table_data_type: Option<&TableDataType>,
) {
let value = match scalar {
ScalarRef::Null => jsonb::Value::Null,
ScalarRef::EmptyArray => jsonb::Value::Array(vec![]),
Expand Down Expand Up @@ -264,13 +270,24 @@ pub fn cast_scalar_to_variant(scalar: ScalarRef, tz: &TimeZone, buf: &mut Vec<u8
jsonb::Value::Interval(interval)
}
ScalarRef::Array(col) => {
let items = cast_scalars_to_variants(col.iter(), tz);
let typ = if let Some(TableDataType::Array(typ)) = table_data_type {
Some(typ.remove_nullable())
} else {
None
};
let items = cast_scalars_to_variants(col.iter(), tz, typ.as_ref());
let owned_jsonb = OwnedJsonb::build_array(items.iter().map(RawJsonb::new))
.expect("failed to build jsonb array");
buf.extend_from_slice(owned_jsonb.as_ref());
return;
}
ScalarRef::Map(col) => {
let typ = if let Some(TableDataType::Map(typ)) = table_data_type {
Some(typ.remove_nullable())
} else {
None
};

let kv_col = KvPair::<AnyType, AnyType>::try_downcast_column(&col).unwrap();
let mut kvs = BTreeMap::new();
for (k, v) in kv_col.iter() {
Expand All @@ -284,7 +301,7 @@ pub fn cast_scalar_to_variant(scalar: ScalarRef, tz: &TimeZone, buf: &mut Vec<u8
_ => unreachable!(),
};
let mut val = vec![];
cast_scalar_to_variant(v, tz, &mut val);
cast_scalar_to_variant(v, tz, &mut val, typ.as_ref());
kvs.insert(key, val);
}
let owned_jsonb =
Expand All @@ -305,14 +322,43 @@ pub fn cast_scalar_to_variant(scalar: ScalarRef, tz: &TimeZone, buf: &mut Vec<u8
return;
}
ScalarRef::Tuple(fields) => {
let values = cast_scalars_to_variants(fields, tz);
let owned_jsonb = OwnedJsonb::build_object(
values
.iter()
.enumerate()
.map(|(i, bytes)| (format!("{}", i + 1), RawJsonb::new(bytes))),
)
.expect("failed to build jsonb object from tuple");
let owned_jsonb = match table_data_type {
Some(TableDataType::Tuple {
fields_name,
fields_type,
}) => {
assert_eq!(fields.len(), fields_type.len());
let iter = fields.into_iter();
let mut builder = BinaryColumnBuilder::with_capacity(iter.size_hint().0, 0);
for (scalar, typ) in iter.zip(fields_type) {
cast_scalar_to_variant(
scalar,
tz,
&mut builder.data,
Some(&typ.remove_nullable()),
);
builder.commit_row();
}
let values = builder.build();
OwnedJsonb::build_object(
values
.iter()
.enumerate()
.map(|(i, bytes)| (fields_name[i].clone(), RawJsonb::new(bytes))),
)
.expect("failed to build jsonb object from tuple")
}
_ => {
let values = cast_scalars_to_variants(fields, tz, None);
OwnedJsonb::build_object(
values
.iter()
.enumerate()
.map(|(i, bytes)| (format!("{}", i + 1), RawJsonb::new(bytes))),
)
.expect("failed to build jsonb object from tuple")
}
};
buf.extend_from_slice(owned_jsonb.as_ref());
return;
}
Expand Down Expand Up @@ -342,11 +388,12 @@ pub fn cast_scalar_to_variant(scalar: ScalarRef, tz: &TimeZone, buf: &mut Vec<u8
pub fn cast_scalars_to_variants(
scalars: impl IntoIterator<Item = ScalarRef>,
tz: &TimeZone,
table_data_type: Option<&TableDataType>,
) -> BinaryColumn {
let iter = scalars.into_iter();
let mut builder = BinaryColumnBuilder::with_capacity(iter.size_hint().0, 0);
for scalar in iter {
cast_scalar_to_variant(scalar, tz, &mut builder.data);
cast_scalar_to_variant(scalar, tz, &mut builder.data, table_data_type);
builder.commit_row();
}
builder.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ where
continue;
}
let mut val = vec![];
cast_scalar_to_variant(v.as_ref(), &tz, &mut val);
cast_scalar_to_variant(v.as_ref(), &tz, &mut val, None);
items.push(val);
}
let owned_jsonb = OwnedJsonb::build_array(items.iter().map(|v| RawJsonb::new(v)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ where
continue;
}
let mut val = vec![];
cast_scalar_to_variant(v.as_ref(), &tz, &mut val);
cast_scalar_to_variant(v.as_ref(), &tz, &mut val, None);
values.push((key, val));
}
let owned_jsonb =
Expand Down
19 changes: 12 additions & 7 deletions src/query/functions/src/scalars/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,12 @@ pub fn register(registry: &mut FunctionRegistry) {
Scalar::Null => Value::Scalar(Scalar::Null),
_ => {
let mut buf = Vec::new();
cast_scalar_to_variant(scalar.as_ref(), &ctx.func_ctx.tz, &mut buf);
cast_scalar_to_variant(
scalar.as_ref(),
&ctx.func_ctx.tz,
&mut buf,
None,
);
Value::Scalar(Scalar::Variant(buf))
}
},
Expand All @@ -1124,7 +1129,7 @@ pub fn register(registry: &mut FunctionRegistry) {
}
_ => None,
};
let new_col = cast_scalars_to_variants(col.iter(), &ctx.func_ctx.tz);
let new_col = cast_scalars_to_variants(col.iter(), &ctx.func_ctx.tz, None);
if let Some(validity) = validity {
Value::Column(NullableColumn::new_column(
Column::Variant(new_col),
Expand Down Expand Up @@ -1157,7 +1162,7 @@ pub fn register(registry: &mut FunctionRegistry) {
Scalar::Null => Value::Scalar(None),
_ => {
let mut buf = Vec::new();
cast_scalar_to_variant(scalar.as_ref(), &ctx.func_ctx.tz, &mut buf);
cast_scalar_to_variant(scalar.as_ref(), &ctx.func_ctx.tz, &mut buf, None);
Value::Scalar(Some(buf))
}
},
Expand All @@ -1167,7 +1172,7 @@ pub fn register(registry: &mut FunctionRegistry) {
Column::Nullable(box ref nullable_column) => nullable_column.validity.clone(),
_ => Bitmap::new_constant(true, col.len()),
};
let new_col = cast_scalars_to_variants(col.iter(), &ctx.func_ctx.tz);
let new_col = cast_scalars_to_variants(col.iter(), &ctx.func_ctx.tz, None);
Value::Column(NullableColumn::new(new_col, validity))
}
},
Expand Down Expand Up @@ -2132,7 +2137,7 @@ fn json_array_fn(args: &[Value<AnyType>], ctx: &mut EvalContext) -> Value<AnyTyp
for column in &columns {
let v = unsafe { column.index_unchecked(idx) };
let mut val = vec![];
cast_scalar_to_variant(v, &ctx.func_ctx.tz, &mut val);
cast_scalar_to_variant(v, &ctx.func_ctx.tz, &mut val, None);
items.push(val);
}
match OwnedJsonb::build_array(items.iter().map(|v| RawJsonb::new(v))) {
Expand Down Expand Up @@ -2203,7 +2208,7 @@ fn json_object_impl_fn(
}
set.insert(key);
let mut val = vec![];
cast_scalar_to_variant(v, &ctx.func_ctx.tz, &mut val);
cast_scalar_to_variant(v, &ctx.func_ctx.tz, &mut val, None);
kvs.push((key, val));
}
if !has_err {
Expand Down Expand Up @@ -2598,7 +2603,7 @@ fn json_object_insert_fn(
_ => {
// if the new value is not a json value, cast it to json.
let mut new_val_buf = vec![];
cast_scalar_to_variant(new_val.clone(), &ctx.func_ctx.tz, &mut new_val_buf);
cast_scalar_to_variant(new_val.clone(), &ctx.func_ctx.tz, &mut new_val_buf, None);
let new_val = RawJsonb::new(new_val_buf.as_bytes());
value.object_insert(new_key, &new_val, update_flag)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl CopyIntoLocationInterpreter {
copy_into_table_options: Default::default(),
stage_root: "".to_string(),
copy_into_location_ordered: self.plan.is_ordered,
is_variant: false,
},
}));

Expand Down
71 changes: 50 additions & 21 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1657,30 +1657,56 @@ impl TableContext for QueryContext {
};
match stage_info.file_format_params {
FileFormatParams::Parquet(..) => {
let mut read_options = ParquetReadOptions::default();

if !self.get_settings().get_enable_parquet_page_index()? {
read_options = read_options.with_prune_pages(false);
}
if max_column_position > 1 {
Err(ErrorCode::SemanticError(
"[QUERY-CTX] Query from parquet file only support $1 as column position",
))
} else if max_column_position == 0 {
let mut read_options = ParquetReadOptions::default();
let settings = self.query_settings.clone();

if !settings.get_enable_parquet_page_index()? {
read_options = read_options.with_prune_pages(false);
}

if !self.get_settings().get_enable_parquet_rowgroup_pruning()? {
read_options = read_options.with_prune_row_groups(false);
}
if !settings.get_enable_parquet_rowgroup_pruning()? {
read_options = read_options.with_prune_row_groups(false);
}

if !self.get_settings().get_enable_parquet_prewhere()? {
read_options = read_options.with_do_prewhere(false);
if !settings.get_enable_parquet_prewhere()? {
read_options = read_options.with_do_prewhere(false);
}
ParquetTable::create(
stage_info.clone(),
files_info,
read_options,
files_to_copy,
self.get_settings(),
self.get_query_kind(),
case_sensitive,
)
.await
} else {
let schema = Arc::new(TableSchema::new(vec![TableField::new(
"_$1",
TableDataType::Variant,
)]));
let info = StageTableInfo {
schema,
stage_info,
files_info,
files_to_copy,
duplicated_files_detected: vec![],
is_select: true,
default_exprs: None,
copy_into_location_options: Default::default(),
copy_into_table_options: Default::default(),
stage_root,
copy_into_location_ordered: false,
is_variant: true,
};
StageTable::try_create(info)
}

ParquetTable::create(
stage_info.clone(),
files_info,
read_options,
files_to_copy,
self.get_settings(),
self.get_query_kind(),
case_sensitive,
)
.await
}
FileFormatParams::Orc(..) => {
let schema = Arc::new(TableSchema::empty());
Expand All @@ -1696,6 +1722,7 @@ impl TableContext for QueryContext {
copy_into_table_options: Default::default(),
stage_root,
copy_into_location_ordered: false,
is_variant: false,
};
OrcTable::try_create(info).await
}
Expand All @@ -1716,6 +1743,7 @@ impl TableContext for QueryContext {
copy_into_table_options: Default::default(),
stage_root,
copy_into_location_ordered: false,
is_variant: true,
};
StageTable::try_create(info)
}
Expand Down Expand Up @@ -1754,6 +1782,7 @@ impl TableContext for QueryContext {
copy_into_table_options: Default::default(),
stage_root,
copy_into_location_ordered: false,
is_variant: false,
};
StageTable::try_create(info)
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/binder/copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ impl Binder {
copy_into_table_options: stmt.options.clone(),
stage_root: "".to_string(),
copy_into_location_ordered: false,
is_variant: false,
},
values_consts: vec![],
required_source_schema: required_values_schema.clone(),
Expand Down Expand Up @@ -406,6 +407,7 @@ impl Binder {
copy_into_table_options: options,
stage_root: "".to_string(),
copy_into_location_ordered: false,
is_variant: false,
},
write_mode,
query: None,
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ databend-storages-common-pruner = { workspace = true }
databend-storages-common-stage = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
futures = { workspace = true }
jiff = { workspace = true }
log = { workspace = true }
opendal = { workspace = true }
parquet = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/parquet/src/copy_into_table/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ impl Processor for ParquetCopySource {
}
_ => unreachable!(),
}

Ok(())
}
}
2 changes: 2 additions & 0 deletions src/query/storages/parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod statistics;
mod transformer;

mod meta;
mod parquet_variant_table;
mod schema;

pub use copy_into_table::ParquetTableForCopy;
Expand All @@ -53,6 +54,7 @@ pub use parquet_reader::ParquetFileReader;
pub use parquet_reader::ParquetReaderBuilder;
pub use parquet_reader::ParquetWholeFileReader;
pub use parquet_table::ParquetTable;
pub use parquet_variant_table::ParquetVariantTable;
// for it test
pub use pruning::ParquetPruner;
pub use source::ParquetSource;
Expand Down
Loading
Loading