diff --git a/src/query/functions/src/srfs/variant.rs b/src/query/functions/src/srfs/variant.rs index cc11b3249f95b..0aed415c837e8 100644 --- a/src/query/functions/src/srfs/variant.rs +++ b/src/query/functions/src/srfs/variant.rs @@ -768,7 +768,11 @@ impl FlattenGenerator { ) { if let Ok(Some(vals)) = input.array_values() { for (i, val) in vals.into_iter().enumerate() { - let inner_path = format!("{}[{}]", path, i); + let inner_path = if path_builder.is_some() { + format!("{}[{}]", path, i) + } else { + "".to_string() + }; if let Some(key_builder) = key_builder { key_builder.push_null(); @@ -822,10 +826,14 @@ impl FlattenGenerator { if let Some(key_builder) = key_builder { key_builder.push(key.as_ref()); } - let inner_path = if !path.is_empty() { - format!("{}.{}", path, key) + let inner_path = if path_builder.is_some() { + if !path.is_empty() { + format!("{}.{}", path, key) + } else { + key + } } else { - key + "".to_string() }; if let Some(path_builder) = path_builder { path_builder.put_and_commit(&inner_path); diff --git a/src/query/sql/src/executor/physical_plans/physical_eval_scalar.rs b/src/query/sql/src/executor/physical_plans/physical_eval_scalar.rs index 19851fb6a37ac..89c30b3bc8e92 100644 --- a/src/query/sql/src/executor/physical_plans/physical_eval_scalar.rs +++ b/src/query/sql/src/executor/physical_plans/physical_eval_scalar.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeSet; use std::collections::HashSet; use std::sync::Arc; @@ -21,16 +22,22 @@ use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; use databend_common_expression::RemoteExpr; +use databend_common_expression::Scalar; use databend_common_functions::BUILTIN_FUNCTIONS; use crate::executor::explain::PlanStatsInfo; use crate::executor::physical_plan::PhysicalPlan; use crate::executor::physical_plan_builder::PhysicalPlanBuilder; +use crate::optimizer::ir::Matcher; use crate::optimizer::ir::SExpr; +use crate::plans::Filter; use crate::plans::FunctionCall; use crate::plans::ProjectSet; +use crate::plans::RelOp; use crate::plans::RelOperator; use crate::plans::ScalarExpr; +use crate::plans::ScalarItem; +use crate::plans::Visitor; use crate::ColumnSet; use crate::IndexType; use crate::TypeCheck; @@ -99,11 +106,7 @@ impl PhysicalPlanBuilder { self.build(s_expr.child(0)?, required).await } else { let child = s_expr.child(0)?; - let input = if let RelOperator::ProjectSet(project_set) = child.plan() { - let new_project_set = - self.prune_flatten_columns(eval_scalar, project_set, &required); - let mut new_child = child.clone(); - new_child.plan = Arc::new(new_project_set.into()); + let input = if let Some(new_child) = self.try_eliminate_flatten_columns(&used, child)? { self.build(&new_child, required).await? } else { self.build(child, required).await? @@ -171,45 +174,136 @@ impl PhysicalPlanBuilder { })) } + fn try_eliminate_flatten_columns( + &mut self, + scalar_items: &Vec, + s_expr: &SExpr, + ) -> Result> { + // (1) ProjectSet + // \ + // * + // + // (2) Filter + // \ + // ProjectSet + // \ + // * + let matchers = vec![ + Matcher::MatchOp { + op_type: RelOp::ProjectSet, + children: vec![Matcher::Leaf], + }, + Matcher::MatchOp { + op_type: RelOp::Filter, + children: vec![Matcher::MatchOp { + op_type: RelOp::ProjectSet, + children: vec![Matcher::Leaf], + }], + }, + ]; + + let mut matched = false; + for matcher in matchers { + if matcher.matches(s_expr) { + matched = true; + break; + } + } + if !matched { + return Ok(None); + } + + if let RelOperator::Filter(filter) = s_expr.plan() { + let child = s_expr.child(0)?; + let project_set: ProjectSet = child.plan().clone().try_into()?; + let Some(new_project_set) = + self.eliminate_flatten_columns(scalar_items, Some(filter), &project_set) + else { + return Ok(None); + }; + let mut new_child = child.clone(); + new_child.plan = Arc::new(new_project_set.into()); + let new_filter = + SExpr::create_unary(Arc::new(s_expr.plan().clone()), Arc::new(new_child)); + Ok(Some(new_filter)) + } else { + let project_set: ProjectSet = s_expr.plan().clone().try_into()?; + let Some(new_project_set) = + self.eliminate_flatten_columns(scalar_items, None, &project_set) + else { + return Ok(None); + }; + let mut new_expr = s_expr.clone(); + new_expr.plan = Arc::new(new_project_set.into()); + Ok(Some(new_expr)) + } + } + // The flatten function returns a tuple, which contains 6 columns. - // Only keep columns required by parent plan, other columns can be pruned + // Only keep columns required by parent plan, other columns can be eliminated // to reduce the memory usage. - fn prune_flatten_columns( + fn eliminate_flatten_columns( &mut self, - eval_scalar: &crate::plans::EvalScalar, + scalar_items: &Vec, + filter: Option<&Filter>, project_set: &ProjectSet, - required: &ColumnSet, - ) -> ProjectSet { + ) -> Option { + let mut has_flatten = false; let mut project_set = project_set.clone(); for srf_item in &mut project_set.srfs { if let ScalarExpr::FunctionCall(srf_func) = &srf_item.scalar { if srf_func.func_name == "flatten" { - // Store the columns required by the parent plan in params. - let mut params = Vec::new(); - for item in &eval_scalar.items { - if !required.contains(&item.index) { - continue; - } - if let ScalarExpr::FunctionCall(func) = &item.scalar { - if func.func_name == "get" && !func.arguments.is_empty() { - if let ScalarExpr::BoundColumnRef(column_ref) = &func.arguments[0] { - if column_ref.column.index == srf_item.index { - params.push(func.params[0].clone()); - } - } - } + has_flatten = true; + let mut visitor = FlattenColumnsVisitor { + params: BTreeSet::new(), + column_index: srf_item.index, + }; + // Collect columns required by the parent plan in params. + for item in scalar_items { + visitor.visit(&item.scalar).unwrap(); + } + if let Some(filter) = filter { + for pred in &filter.predicates { + visitor.visit(pred).unwrap(); } } srf_item.scalar = ScalarExpr::FunctionCall(FunctionCall { span: srf_func.span, func_name: srf_func.func_name.clone(), - params, + params: visitor.params.into_iter().collect::>(), arguments: srf_func.arguments.clone(), }); } } } - project_set + if has_flatten { + Some(project_set) + } else { + None + } + } +} + +struct FlattenColumnsVisitor { + params: BTreeSet, + column_index: IndexType, +} + +impl<'a> Visitor<'a> for FlattenColumnsVisitor { + // Collect the params in get function which is used to extract the inner column of flatten function. + fn visit_function_call(&mut self, func: &'a FunctionCall) -> Result<()> { + if func.func_name == "get" && !func.arguments.is_empty() { + if let ScalarExpr::BoundColumnRef(column_ref) = &func.arguments[0] { + if column_ref.column.index == self.column_index { + self.params.insert(func.params[0].clone()); + return Ok(()); + } + } + } + for expr in &func.arguments { + self.visit(expr)?; + } + Ok(()) } } diff --git a/tests/sqllogictests/suites/mode/standalone/explain/project_set.test b/tests/sqllogictests/suites/mode/standalone/explain/project_set.test index 6c237748d2df9..8e93e2e2136f1 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/project_set.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/project_set.test @@ -110,6 +110,31 @@ EvalScalar ├── push downs: [filters: [], limit: NONE] └── estimated rows: 0.00 +query T +EXPLAIN SELECT t.a, f.seq, f.value FROM t, LATERAL FLATTEN(input => t.b) f where f.key = 'k' +---- +EvalScalar +├── output columns: [t.a (#0), seq (#3), value (#7)] +├── expressions: [get(1)(flatten(t.b (#1)) (#2)), get(5)(flatten(t.b (#1)) (#2))] +├── estimated rows: 0.00 +└── Filter + ├── output columns: [t.a (#0), flatten(t.b (#1)) (#2)] + ├── filters: [is_true(get(2)(flatten(t.b (#1)) (#2)) = 'k')] + ├── estimated rows: 0.00 + └── ProjectSet + ├── output columns: [t.a (#0), flatten(t.b (#1)) (#2)] + ├── estimated rows: 0.00 + ├── set returning functions: flatten(1, 2, 5)(t.b (#1)) + └── TableScan + ├── table: default.project_set.t + ├── output columns: [a (#0), b (#1)] + ├── read rows: 0 + ├── read size: 0 + ├── partitions total: 0 + ├── partitions scanned: 0 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 0.00 + query T EXPLAIN SELECT json_each(t.b), unnest(t.b) FROM t ---- diff --git a/tests/sqllogictests/suites/query/lateral.test b/tests/sqllogictests/suites/query/lateral.test index 50e9a50dec415..bb851f612fcfe 100644 --- a/tests/sqllogictests/suites/query/lateral.test +++ b/tests/sqllogictests/suites/query/lateral.test @@ -127,6 +127,13 @@ running 1 swimming 1 writing 1 +query ITT +SELECT u.user_id, f.value::STRING, f.path AS activity FROM + user_activities u, + LATERAL FLATTEN(input => u.activities) f WHERE f.value = 'reading' +---- +1 reading [0] + statement ok CREATE TABLE persons(id int, c variant)