Skip to content

Commit 1dd5def

Browse files
committed
fix agg, ensure agg across multiple scans in same stream job
1 parent e640ff6 commit 1dd5def

File tree

2 files changed

+61
-20
lines changed

2 files changed

+61
-20
lines changed

src/frontend/src/optimizer/heuristic_optimizer.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl<'a> HeuristicOptimizer<'a> {
102102
#[cfg(debug_assertions)]
103103
pub fn check_equivalent_plan(rule_desc: &str, input_plan: &PlanRef, output_plan: &PlanRef) {
104104
if !input_plan.schema().type_eq(output_plan.schema()) {
105-
panic!("{} fails to generate equivalent plan.\nInput schema: {:?}\nInput plan: \n{}\nOutput schema: {:?}\nOutput plan: \n{}\nSQL: {}",
105+
panic!("{} fails to generate equivalent schema.\nInput schema: {:?}\nInput plan: \n{}\nOutput schema: {:?}\nOutput plan: \n{}\nSQL: {}",
106106
rule_desc,
107107
input_plan.schema(),
108108
input_plan.explain_to_string(),

src/frontend/src/optimizer/rule/table_function_to_internal_backfill_progress.rs

+60-19
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,20 @@ use anyhow::anyhow;
1818
use itertools::Itertools;
1919
use risingwave_common::catalog::{internal_table_name_to_parts, Field, Schema, StreamJobStatus};
2020
use risingwave_common::types::{DataType, ScalarImpl};
21+
use risingwave_expr::aggregate::AggType;
2122

2223
use super::{ApplyResult, BoxedRule, FallibleRule};
2324
use crate::catalog::catalog_service::CatalogReadGuard;
2425
use crate::catalog::table_catalog::TableType;
25-
use crate::expr::{ExprImpl, InputRef, Literal, TableFunctionType};
26+
use crate::expr::{AggCall, ExprImpl, InputRef, Literal, OrderBy, TableFunctionType};
2627
use crate::optimizer::plan_node::generic::GenericPlanRef;
2728
use crate::optimizer::plan_node::{
2829
LogicalAgg, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion, LogicalValues,
2930
};
3031
use crate::optimizer::PlanRef;
31-
use crate::utils::GroupBy;
32+
use crate::utils::{Condition, GroupBy};
3233
use crate::TableCatalog;
34+
pub use risingwave_pb::expr::agg_call::PbKind as PbAggKind;
3335

3436
/// Transform a special `TableFunction` (with `FILE_SCAN` table function type) into a `LogicalFileScan`
3537
pub struct TableFunctionToInternalBackfillProgressRule {}
@@ -79,27 +81,66 @@ impl FallibleRule for TableFunctionToInternalBackfillProgressRule {
7981
None,
8082
Default::default(),
8183
);
82-
let project = LogicalProject::new(
83-
scan.into(),
84-
vec![ExprImpl::InputRef(Box::new(InputRef {
84+
let project = {
85+
let job_id_expr = ExprImpl::Literal(Box::new(Literal::new(
86+
Some(ScalarImpl::Int32(job_id.table_id as i32)),
87+
DataType::Int32,
88+
)));
89+
let row_count_expr = ExprImpl::InputRef(Box::new(InputRef {
8590
index: row_count_column_index,
8691
data_type: DataType::Int64,
87-
}))],
88-
);
89-
let select_exprs = vec![ExprImpl::Literal(Box::new(Literal::new(
90-
Some(ScalarImpl::Int32(job_id.table_id as i32)),
91-
DataType::Int32,
92-
)))];
93-
let group_key = GroupBy::GroupKey(vec![ExprImpl::InputRef(Box::new(InputRef {
92+
}));
93+
LogicalProject::new(
94+
scan.into(),
95+
vec![job_id_expr, row_count_expr],
96+
)
97+
};
98+
counts.push(project.into());
99+
}
100+
let union = LogicalUnion::new(true, counts);
101+
let select_exprs = {
102+
let job_id = ExprImpl::InputRef(Box::new(InputRef {
94103
index: 0,
95104
data_type: DataType::Int32,
96-
}))]);
97-
let (count, _rewritten_select_exprs, _) =
98-
LogicalAgg::create(select_exprs, group_key, None, project.into())?;
99-
counts.push(count);
100-
}
101-
println!("counts: {:?}", counts);
102-
ApplyResult::Ok(LogicalUnion::new(true, counts).into())
105+
}));
106+
let sum_agg = ExprImpl::AggCall(Box::new(AggCall::new(
107+
AggType::Builtin(PbAggKind::Sum),
108+
vec![
109+
ExprImpl::InputRef(Box::new(InputRef {
110+
index: 1,
111+
data_type: DataType::Int64,
112+
})),
113+
],
114+
false,
115+
OrderBy::any(),
116+
Condition::true_cond(),
117+
vec![],
118+
)?));
119+
vec![
120+
job_id,
121+
sum_agg,
122+
]
123+
};
124+
let group_key = GroupBy::GroupKey(vec![ExprImpl::InputRef(Box::new(InputRef {
125+
index: 0,
126+
data_type: DataType::Int32,
127+
}))]);
128+
let (agg, _rewritten_select_exprs, _rewritten_having_exprs) =
129+
LogicalAgg::create(select_exprs, group_key, None, union.into())?;
130+
let project = LogicalProject::new(
131+
agg.into(),
132+
vec![
133+
ExprImpl::InputRef(Box::new(InputRef {
134+
index: 0,
135+
data_type: DataType::Int32,
136+
})),
137+
ExprImpl::InputRef(Box::new(InputRef {
138+
index: 1,
139+
data_type: DataType::Decimal,
140+
})).cast_explicit(DataType::Int64)?,
141+
],
142+
);
143+
ApplyResult::Ok(project.into())
103144
}
104145
}
105146

0 commit comments

Comments
 (0)