Skip to content

refactor(query): Optimize FLATTEN function with filter condition #17892

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions src/query/functions/src/srfs/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,11 @@ impl FlattenGenerator {
) {
if let Ok(Some(vals)) = input.array_values() {
for (i, val) in vals.into_iter().enumerate() {
let inner_path = format!("{}[{}]", path, i);
let inner_path = if path_builder.is_some() {
format!("{}[{}]", path, i)
} else {
"".to_string()
};

if let Some(key_builder) = key_builder {
key_builder.push_null();
Expand Down Expand Up @@ -822,10 +826,14 @@ impl FlattenGenerator {
if let Some(key_builder) = key_builder {
key_builder.push(key.as_ref());
}
let inner_path = if !path.is_empty() {
format!("{}.{}", path, key)
let inner_path = if path_builder.is_some() {
if !path.is_empty() {
format!("{}.{}", path, key)
} else {
key
}
} else {
key
"".to_string()
};
if let Some(path_builder) = path_builder {
path_builder.put_and_commit(&inner_path);
Expand Down
146 changes: 120 additions & 26 deletions src/query/sql/src/executor/physical_plans/physical_eval_scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;
use std::collections::HashSet;
use std::sync::Arc;

Expand All @@ -21,16 +22,22 @@ use databend_common_expression::DataField;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;
use databend_common_expression::RemoteExpr;
use databend_common_expression::Scalar;
use databend_common_functions::BUILTIN_FUNCTIONS;

use crate::executor::explain::PlanStatsInfo;
use crate::executor::physical_plan::PhysicalPlan;
use crate::executor::physical_plan_builder::PhysicalPlanBuilder;
use crate::optimizer::ir::Matcher;
use crate::optimizer::ir::SExpr;
use crate::plans::Filter;
use crate::plans::FunctionCall;
use crate::plans::ProjectSet;
use crate::plans::RelOp;
use crate::plans::RelOperator;
use crate::plans::ScalarExpr;
use crate::plans::ScalarItem;
use crate::plans::Visitor;
use crate::ColumnSet;
use crate::IndexType;
use crate::TypeCheck;
Expand Down Expand Up @@ -99,11 +106,7 @@ impl PhysicalPlanBuilder {
self.build(s_expr.child(0)?, required).await
} else {
let child = s_expr.child(0)?;
let input = if let RelOperator::ProjectSet(project_set) = child.plan() {
let new_project_set =
self.prune_flatten_columns(eval_scalar, project_set, &required);
let mut new_child = child.clone();
new_child.plan = Arc::new(new_project_set.into());
let input = if let Some(new_child) = self.try_eliminate_flatten_columns(&used, child)? {
self.build(&new_child, required).await?
} else {
self.build(child, required).await?
Expand Down Expand Up @@ -171,45 +174,136 @@ impl PhysicalPlanBuilder {
}))
}

fn try_eliminate_flatten_columns(
&mut self,
scalar_items: &Vec<ScalarItem>,
s_expr: &SExpr,
) -> Result<Option<SExpr>> {
// (1) ProjectSet
// \
// *
//
// (2) Filter
// \
// ProjectSet
// \
// *
let matchers = vec![
Matcher::MatchOp {
op_type: RelOp::ProjectSet,
children: vec![Matcher::Leaf],
},
Matcher::MatchOp {
op_type: RelOp::Filter,
children: vec![Matcher::MatchOp {
op_type: RelOp::ProjectSet,
children: vec![Matcher::Leaf],
}],
},
];

let mut matched = false;
for matcher in matchers {
if matcher.matches(s_expr) {
matched = true;
break;
}
}
if !matched {
return Ok(None);
}

if let RelOperator::Filter(filter) = s_expr.plan() {
let child = s_expr.child(0)?;
let project_set: ProjectSet = child.plan().clone().try_into()?;
let Some(new_project_set) =
self.eliminate_flatten_columns(scalar_items, Some(filter), &project_set)
else {
return Ok(None);
};
let mut new_child = child.clone();
new_child.plan = Arc::new(new_project_set.into());
let new_filter =
SExpr::create_unary(Arc::new(s_expr.plan().clone()), Arc::new(new_child));
Ok(Some(new_filter))
} else {
let project_set: ProjectSet = s_expr.plan().clone().try_into()?;
let Some(new_project_set) =
self.eliminate_flatten_columns(scalar_items, None, &project_set)
else {
return Ok(None);
};
let mut new_expr = s_expr.clone();
new_expr.plan = Arc::new(new_project_set.into());
Ok(Some(new_expr))
}
}

// The flatten function returns a tuple, which contains 6 columns.
// Only keep columns required by parent plan, other columns can be pruned
// Only keep columns required by parent plan, other columns can be eliminated
// to reduce the memory usage.
fn prune_flatten_columns(
fn eliminate_flatten_columns(
&mut self,
eval_scalar: &crate::plans::EvalScalar,
scalar_items: &Vec<ScalarItem>,
filter: Option<&Filter>,
project_set: &ProjectSet,
required: &ColumnSet,
) -> ProjectSet {
) -> Option<ProjectSet> {
let mut has_flatten = false;
let mut project_set = project_set.clone();
for srf_item in &mut project_set.srfs {
if let ScalarExpr::FunctionCall(srf_func) = &srf_item.scalar {
if srf_func.func_name == "flatten" {
// Store the columns required by the parent plan in params.
let mut params = Vec::new();
for item in &eval_scalar.items {
if !required.contains(&item.index) {
continue;
}
if let ScalarExpr::FunctionCall(func) = &item.scalar {
if func.func_name == "get" && !func.arguments.is_empty() {
if let ScalarExpr::BoundColumnRef(column_ref) = &func.arguments[0] {
if column_ref.column.index == srf_item.index {
params.push(func.params[0].clone());
}
}
}
has_flatten = true;
let mut visitor = FlattenColumnsVisitor {
params: BTreeSet::new(),
column_index: srf_item.index,
};
// Collect columns required by the parent plan in params.
for item in scalar_items {
visitor.visit(&item.scalar).unwrap();
}
if let Some(filter) = filter {
for pred in &filter.predicates {
visitor.visit(pred).unwrap();
}
}

srf_item.scalar = ScalarExpr::FunctionCall(FunctionCall {
span: srf_func.span,
func_name: srf_func.func_name.clone(),
params,
params: visitor.params.into_iter().collect::<Vec<_>>(),
arguments: srf_func.arguments.clone(),
});
}
}
}
project_set
if has_flatten {
Some(project_set)
} else {
None
}
}
}

struct FlattenColumnsVisitor {
params: BTreeSet<Scalar>,
column_index: IndexType,
}

impl<'a> Visitor<'a> for FlattenColumnsVisitor {
// Collect the params in get function which is used to extract the inner column of flatten function.
fn visit_function_call(&mut self, func: &'a FunctionCall) -> Result<()> {
if func.func_name == "get" && !func.arguments.is_empty() {
if let ScalarExpr::BoundColumnRef(column_ref) = &func.arguments[0] {
if column_ref.column.index == self.column_index {
self.params.insert(func.params[0].clone());
return Ok(());
}
}
}
for expr in &func.arguments {
self.visit(expr)?;
}
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,31 @@ EvalScalar
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 0.00

query T
EXPLAIN SELECT t.a, f.seq, f.value FROM t, LATERAL FLATTEN(input => t.b) f where f.key = 'k'
----
EvalScalar
├── output columns: [t.a (#0), seq (#3), value (#7)]
├── expressions: [get(1)(flatten(t.b (#1)) (#2)), get(5)(flatten(t.b (#1)) (#2))]
├── estimated rows: 0.00
└── Filter
├── output columns: [t.a (#0), flatten(t.b (#1)) (#2)]
├── filters: [is_true(get(2)(flatten(t.b (#1)) (#2)) = 'k')]
├── estimated rows: 0.00
└── ProjectSet
├── output columns: [t.a (#0), flatten(t.b (#1)) (#2)]
├── estimated rows: 0.00
├── set returning functions: flatten(1, 2, 5)(t.b (#1))
└── TableScan
├── table: default.project_set.t
├── output columns: [a (#0), b (#1)]
├── read rows: 0
├── read size: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 0.00

query T
EXPLAIN SELECT json_each(t.b), unnest(t.b) FROM t
----
Expand Down
7 changes: 7 additions & 0 deletions tests/sqllogictests/suites/query/lateral.test
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ running 1
swimming 1
writing 1

query ITT
SELECT u.user_id, f.value::STRING, f.path AS activity FROM
user_activities u,
LATERAL FLATTEN(input => u.activities) f WHERE f.value = 'reading'
----
1 reading [0]

statement ok
CREATE TABLE persons(id int, c variant)

Expand Down
Loading