Skip to content

Commit 7308158

Browse files
authored
refactor(query): Optimize FLATTEN function with filter condition (#17892)
* refactor(query): Optimize `FLATTEN` function with filter condition * fix
1 parent a66af84 commit 7308158

File tree

4 files changed

+164
-30
lines changed

4 files changed

+164
-30
lines changed

โ€Žsrc/query/functions/src/srfs/variant.rs

+12-4
Original file line numberDiff line numberDiff line change
@@ -768,7 +768,11 @@ impl FlattenGenerator {
768768
) {
769769
if let Ok(Some(vals)) = input.array_values() {
770770
for (i, val) in vals.into_iter().enumerate() {
771-
let inner_path = format!("{}[{}]", path, i);
771+
let inner_path = if path_builder.is_some() {
772+
format!("{}[{}]", path, i)
773+
} else {
774+
"".to_string()
775+
};
772776

773777
if let Some(key_builder) = key_builder {
774778
key_builder.push_null();
@@ -822,10 +826,14 @@ impl FlattenGenerator {
822826
if let Some(key_builder) = key_builder {
823827
key_builder.push(key.as_ref());
824828
}
825-
let inner_path = if !path.is_empty() {
826-
format!("{}.{}", path, key)
829+
let inner_path = if path_builder.is_some() {
830+
if !path.is_empty() {
831+
format!("{}.{}", path, key)
832+
} else {
833+
key
834+
}
827835
} else {
828-
key
836+
"".to_string()
829837
};
830838
if let Some(path_builder) = path_builder {
831839
path_builder.put_and_commit(&inner_path);

โ€Žsrc/query/sql/src/executor/physical_plans/physical_eval_scalar.rs

+120-26
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeSet;
1516
use std::collections::HashSet;
1617
use std::sync::Arc;
1718

@@ -21,16 +22,22 @@ use databend_common_expression::DataField;
2122
use databend_common_expression::DataSchemaRef;
2223
use databend_common_expression::DataSchemaRefExt;
2324
use databend_common_expression::RemoteExpr;
25+
use databend_common_expression::Scalar;
2426
use databend_common_functions::BUILTIN_FUNCTIONS;
2527

2628
use crate::executor::explain::PlanStatsInfo;
2729
use crate::executor::physical_plan::PhysicalPlan;
2830
use crate::executor::physical_plan_builder::PhysicalPlanBuilder;
31+
use crate::optimizer::ir::Matcher;
2932
use crate::optimizer::ir::SExpr;
33+
use crate::plans::Filter;
3034
use crate::plans::FunctionCall;
3135
use crate::plans::ProjectSet;
36+
use crate::plans::RelOp;
3237
use crate::plans::RelOperator;
3338
use crate::plans::ScalarExpr;
39+
use crate::plans::ScalarItem;
40+
use crate::plans::Visitor;
3441
use crate::ColumnSet;
3542
use crate::IndexType;
3643
use crate::TypeCheck;
@@ -99,11 +106,7 @@ impl PhysicalPlanBuilder {
99106
self.build(s_expr.child(0)?, required).await
100107
} else {
101108
let child = s_expr.child(0)?;
102-
let input = if let RelOperator::ProjectSet(project_set) = child.plan() {
103-
let new_project_set =
104-
self.prune_flatten_columns(eval_scalar, project_set, &required);
105-
let mut new_child = child.clone();
106-
new_child.plan = Arc::new(new_project_set.into());
109+
let input = if let Some(new_child) = self.try_eliminate_flatten_columns(&used, child)? {
107110
self.build(&new_child, required).await?
108111
} else {
109112
self.build(child, required).await?
@@ -171,45 +174,136 @@ impl PhysicalPlanBuilder {
171174
}))
172175
}
173176

177+
fn try_eliminate_flatten_columns(
178+
&mut self,
179+
scalar_items: &Vec<ScalarItem>,
180+
s_expr: &SExpr,
181+
) -> Result<Option<SExpr>> {
182+
// (1) ProjectSet
183+
// \
184+
// *
185+
//
186+
// (2) Filter
187+
// \
188+
// ProjectSet
189+
// \
190+
// *
191+
let matchers = vec![
192+
Matcher::MatchOp {
193+
op_type: RelOp::ProjectSet,
194+
children: vec![Matcher::Leaf],
195+
},
196+
Matcher::MatchOp {
197+
op_type: RelOp::Filter,
198+
children: vec![Matcher::MatchOp {
199+
op_type: RelOp::ProjectSet,
200+
children: vec![Matcher::Leaf],
201+
}],
202+
},
203+
];
204+
205+
let mut matched = false;
206+
for matcher in matchers {
207+
if matcher.matches(s_expr) {
208+
matched = true;
209+
break;
210+
}
211+
}
212+
if !matched {
213+
return Ok(None);
214+
}
215+
216+
if let RelOperator::Filter(filter) = s_expr.plan() {
217+
let child = s_expr.child(0)?;
218+
let project_set: ProjectSet = child.plan().clone().try_into()?;
219+
let Some(new_project_set) =
220+
self.eliminate_flatten_columns(scalar_items, Some(filter), &project_set)
221+
else {
222+
return Ok(None);
223+
};
224+
let mut new_child = child.clone();
225+
new_child.plan = Arc::new(new_project_set.into());
226+
let new_filter =
227+
SExpr::create_unary(Arc::new(s_expr.plan().clone()), Arc::new(new_child));
228+
Ok(Some(new_filter))
229+
} else {
230+
let project_set: ProjectSet = s_expr.plan().clone().try_into()?;
231+
let Some(new_project_set) =
232+
self.eliminate_flatten_columns(scalar_items, None, &project_set)
233+
else {
234+
return Ok(None);
235+
};
236+
let mut new_expr = s_expr.clone();
237+
new_expr.plan = Arc::new(new_project_set.into());
238+
Ok(Some(new_expr))
239+
}
240+
}
241+
174242
// The flatten function returns a tuple, which contains 6 columns.
175-
// Only keep columns required by parent plan, other columns can be pruned
243+
// Only keep columns required by parent plan, other columns can be eliminated
176244
// to reduce the memory usage.
177-
fn prune_flatten_columns(
245+
fn eliminate_flatten_columns(
178246
&mut self,
179-
eval_scalar: &crate::plans::EvalScalar,
247+
scalar_items: &Vec<ScalarItem>,
248+
filter: Option<&Filter>,
180249
project_set: &ProjectSet,
181-
required: &ColumnSet,
182-
) -> ProjectSet {
250+
) -> Option<ProjectSet> {
251+
let mut has_flatten = false;
183252
let mut project_set = project_set.clone();
184253
for srf_item in &mut project_set.srfs {
185254
if let ScalarExpr::FunctionCall(srf_func) = &srf_item.scalar {
186255
if srf_func.func_name == "flatten" {
187-
// Store the columns required by the parent plan in params.
188-
let mut params = Vec::new();
189-
for item in &eval_scalar.items {
190-
if !required.contains(&item.index) {
191-
continue;
192-
}
193-
if let ScalarExpr::FunctionCall(func) = &item.scalar {
194-
if func.func_name == "get" && !func.arguments.is_empty() {
195-
if let ScalarExpr::BoundColumnRef(column_ref) = &func.arguments[0] {
196-
if column_ref.column.index == srf_item.index {
197-
params.push(func.params[0].clone());
198-
}
199-
}
200-
}
256+
has_flatten = true;
257+
let mut visitor = FlattenColumnsVisitor {
258+
params: BTreeSet::new(),
259+
column_index: srf_item.index,
260+
};
261+
// Collect columns required by the parent plan in params.
262+
for item in scalar_items {
263+
visitor.visit(&item.scalar).unwrap();
264+
}
265+
if let Some(filter) = filter {
266+
for pred in &filter.predicates {
267+
visitor.visit(pred).unwrap();
201268
}
202269
}
203270

204271
srf_item.scalar = ScalarExpr::FunctionCall(FunctionCall {
205272
span: srf_func.span,
206273
func_name: srf_func.func_name.clone(),
207-
params,
274+
params: visitor.params.into_iter().collect::<Vec<_>>(),
208275
arguments: srf_func.arguments.clone(),
209276
});
210277
}
211278
}
212279
}
213-
project_set
280+
if has_flatten {
281+
Some(project_set)
282+
} else {
283+
None
284+
}
285+
}
286+
}
287+
288+
struct FlattenColumnsVisitor {
289+
params: BTreeSet<Scalar>,
290+
column_index: IndexType,
291+
}
292+
293+
impl<'a> Visitor<'a> for FlattenColumnsVisitor {
294+
// Collect the params in get function which is used to extract the inner column of flatten function.
295+
fn visit_function_call(&mut self, func: &'a FunctionCall) -> Result<()> {
296+
if func.func_name == "get" && !func.arguments.is_empty() {
297+
if let ScalarExpr::BoundColumnRef(column_ref) = &func.arguments[0] {
298+
if column_ref.column.index == self.column_index {
299+
self.params.insert(func.params[0].clone());
300+
return Ok(());
301+
}
302+
}
303+
}
304+
for expr in &func.arguments {
305+
self.visit(expr)?;
306+
}
307+
Ok(())
214308
}
215309
}

โ€Žtests/sqllogictests/suites/mode/standalone/explain/project_set.test

+25
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,31 @@ EvalScalar
110110
โ”œโ”€โ”€ push downs: [filters: [], limit: NONE]
111111
โ””โ”€โ”€ estimated rows: 0.00
112112

113+
query T
114+
EXPLAIN SELECT t.a, f.seq, f.value FROM t, LATERAL FLATTEN(input => t.b) f where f.key = 'k'
115+
----
116+
EvalScalar
117+
โ”œโ”€โ”€ output columns: [t.a (#0), seq (#3), value (#7)]
118+
โ”œโ”€โ”€ expressions: [get(1)(flatten(t.b (#1)) (#2)), get(5)(flatten(t.b (#1)) (#2))]
119+
โ”œโ”€โ”€ estimated rows: 0.00
120+
โ””โ”€โ”€ Filter
121+
โ”œโ”€โ”€ output columns: [t.a (#0), flatten(t.b (#1)) (#2)]
122+
โ”œโ”€โ”€ filters: [is_true(get(2)(flatten(t.b (#1)) (#2)) = 'k')]
123+
โ”œโ”€โ”€ estimated rows: 0.00
124+
โ””โ”€โ”€ ProjectSet
125+
โ”œโ”€โ”€ output columns: [t.a (#0), flatten(t.b (#1)) (#2)]
126+
โ”œโ”€โ”€ estimated rows: 0.00
127+
โ”œโ”€โ”€ set returning functions: flatten(1, 2, 5)(t.b (#1))
128+
โ””โ”€โ”€ TableScan
129+
โ”œโ”€โ”€ table: default.project_set.t
130+
โ”œโ”€โ”€ output columns: [a (#0), b (#1)]
131+
โ”œโ”€โ”€ read rows: 0
132+
โ”œโ”€โ”€ read size: 0
133+
โ”œโ”€โ”€ partitions total: 0
134+
โ”œโ”€โ”€ partitions scanned: 0
135+
โ”œโ”€โ”€ push downs: [filters: [], limit: NONE]
136+
โ””โ”€โ”€ estimated rows: 0.00
137+
113138
query T
114139
EXPLAIN SELECT json_each(t.b), unnest(t.b) FROM t
115140
----

โ€Žtests/sqllogictests/suites/query/lateral.test

+7
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,13 @@ running 1
127127
swimming 1
128128
writing 1
129129

130+
query ITT
131+
SELECT u.user_id, f.value::STRING, f.path AS activity FROM
132+
user_activities u,
133+
LATERAL FLATTEN(input => u.activities) f WHERE f.value = 'reading'
134+
----
135+
1 reading [0]
136+
130137
statement ok
131138
CREATE TABLE persons(id int, c variant)
132139

0 commit comments

Comments
ย (0)