@@ -16,18 +16,23 @@ use std::sync::Arc;
16
16
17
17
use anyhow:: anyhow;
18
18
use itertools:: Itertools ;
19
- use risingwave_common:: bail;
20
19
use risingwave_common:: catalog:: { internal_table_name_to_parts, Field , Schema , StreamJobStatus } ;
21
20
use risingwave_common:: types:: { DataType , ScalarImpl } ;
22
21
use risingwave_expr:: aggregate:: AggType ;
23
22
pub use risingwave_pb:: expr:: agg_call:: PbKind as PbAggKind ;
23
+ use risingwave_pb:: plan_common:: JoinType ;
24
24
25
25
use super :: { ApplyResult , BoxedRule , FallibleRule } ;
26
26
use crate :: catalog:: catalog_service:: CatalogReadGuard ;
27
27
use crate :: catalog:: table_catalog:: TableType ;
28
- use crate :: expr:: { AggCall , ExprImpl , ExprType , FunctionCall , InputRef , Literal , OrderBy , TableFunctionType } ;
28
+ use crate :: expr:: {
29
+ AggCall , ExprImpl , ExprType , FunctionCall , InputRef , Literal , OrderBy , TableFunctionType ,
30
+ } ;
29
31
use crate :: optimizer:: plan_node:: generic:: GenericPlanRef ;
30
- use crate :: optimizer:: plan_node:: { LogicalAgg , LogicalJoin , LogicalProject , LogicalScan , LogicalTableFunction , LogicalUnion , LogicalValues } ;
32
+ use crate :: optimizer:: plan_node:: {
33
+ LogicalAgg , LogicalJoin , LogicalProject , LogicalScan , LogicalTableFunction , LogicalUnion ,
34
+ LogicalValues ,
35
+ } ;
31
36
use crate :: optimizer:: PlanRef ;
32
37
use crate :: utils:: { Condition , GroupBy } ;
33
38
use crate :: TableCatalog ;
@@ -120,7 +125,7 @@ impl FallibleRule for TableFunctionToInternalBackfillProgressRule {
120
125
} ) ) ] ) ;
121
126
let ( agg, _rewritten_select_exprs, _rewritten_having_exprs) =
122
127
LogicalAgg :: create ( select_exprs, group_key, None , union. into ( ) ) ?;
123
- let project = LogicalProject :: new (
128
+ let current_counts = LogicalProject :: new (
124
129
agg,
125
130
vec ! [
126
131
ExprImpl :: InputRef ( Box :: new( InputRef {
@@ -139,7 +144,9 @@ impl FallibleRule for TableFunctionToInternalBackfillProgressRule {
139
144
let catalog = plan. ctx ( ) . session_ctx ( ) . env ( ) . catalog_reader ( ) . read_guard ( ) ;
140
145
let mut total_counts = vec ! [ ] ;
141
146
for job_id in backfilling_job_ids {
142
- let total_key_count = if let Some ( stats) = catalog. table_stats ( ) . table_stats . get ( & ( job_id. table_id ) ) {
147
+ let total_key_count = if let Some ( stats) =
148
+ catalog. table_stats ( ) . table_stats . get ( & ( job_id. table_id ) )
149
+ {
143
150
stats. total_key_count
144
151
} else {
145
152
return ApplyResult :: Err (
@@ -168,21 +175,19 @@ impl FallibleRule for TableFunctionToInternalBackfillProgressRule {
168
175
} ;
169
176
170
177
let join = {
171
- let conjunctions = vec ! [
172
- ExprImpl :: FunctionCall ( Box :: new( FunctionCall :: new(
173
- ExprType :: Equal ,
174
- vec![
175
- ExprImpl :: InputRef ( Box :: new( InputRef {
176
- index: 0 ,
177
- data_type: DataType :: Int32 ,
178
- } ) ) ,
179
- ExprImpl :: InputRef ( Box :: new( InputRef {
180
- index: 2 ,
181
- data_type: DataType :: Int32 ,
182
- } ) ) ,
183
- ] ,
184
- ) ?) ) ,
185
- ] ;
178
+ let conjunctions = vec ! [ ExprImpl :: FunctionCall ( Box :: new( FunctionCall :: new(
179
+ ExprType :: Equal ,
180
+ vec![
181
+ ExprImpl :: InputRef ( Box :: new( InputRef {
182
+ index: 0 ,
183
+ data_type: DataType :: Int32 ,
184
+ } ) ) ,
185
+ ExprImpl :: InputRef ( Box :: new( InputRef {
186
+ index: 2 ,
187
+ data_type: DataType :: Int32 ,
188
+ } ) ) ,
189
+ ] ,
190
+ ) ?) ) ] ;
186
191
let condition = Condition { conjunctions } ;
187
192
LogicalJoin :: new (
188
193
current_counts. into ( ) ,
@@ -196,11 +201,13 @@ impl FallibleRule for TableFunctionToInternalBackfillProgressRule {
196
201
let op1 = ExprImpl :: InputRef ( Box :: new ( InputRef {
197
202
index : 1 ,
198
203
data_type : DataType :: Int64 ,
199
- } ) ) . cast_implicit ( DataType :: Decimal ) ?;
204
+ } ) )
205
+ . cast_implicit ( DataType :: Decimal ) ?;
200
206
let op2 = ExprImpl :: InputRef ( Box :: new ( InputRef {
201
207
index : 3 ,
202
208
data_type : DataType :: Int64 ,
203
- } ) ) . cast_implicit ( DataType :: Decimal ) ?;
209
+ } ) )
210
+ . cast_implicit ( DataType :: Decimal ) ?;
204
211
let div_expr = ExprImpl :: FunctionCall ( Box :: new ( FunctionCall :: new (
205
212
ExprType :: Divide ,
206
213
vec ! [ op1, op2] ,
0 commit comments