Skip to content

Commit 4a03a16

Browse files
committed
fix
1 parent 4bb4b41 commit 4a03a16

File tree

6 files changed

+49
-35
lines changed

6 files changed

+49
-35
lines changed

src/query/service/src/interpreters/interpreter_explain.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ impl Interpreter for ExplainInterpreter {
218218
)
219219
.await?
220220
}
221-
Plan::MergeInto(merge_into) => self.explain_merge_fragments(&merge_into).await?,
221+
Plan::MergeInto(merge_into) => self.explain_merge_fragments(merge_into).await?,
222222
Plan::Update(update) => self.explain_update_fragments(update.as_ref()).await?,
223223
_ => {
224224
return Err(ErrorCode::Unimplemented("Unsupported EXPLAIN statement"));

src/query/sql/src/planner/optimizer/optimizer.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -483,10 +483,10 @@ async fn optimize_merge_into(mut opt_ctx: OptimizerContext, plan: Box<MergeInto>
483483
join_s_expr = merge_source_optimizer.optimize(&join_s_expr)?;
484484
enable_right_broadcast = true;
485485
}
486-
486+
let distributed = !join_s_expr.has_merge_exchange();
487487
Ok(Plan::MergeInto(Box::new(MergeInto {
488488
input: Box::new(join_s_expr),
489-
distributed: true,
489+
distributed,
490490
change_join_order,
491491
columns_set: new_columns_set.clone(),
492492
enable_right_broadcast,

src/query/sql/src/planner/optimizer/s_expr.rs

+7
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,13 @@ impl SExpr {
394394
.collect::<Vec<_>>();
395395
self.children = children;
396396
}
397+
398+
pub fn has_merge_exchange(&self) -> bool {
399+
if let RelOperator::Exchange(Exchange::Merge) = self.plan.as_ref() {
400+
return true;
401+
}
402+
self.children.iter().any(|child| child.has_merge_exchange())
403+
}
397404
}
398405

399406
fn find_subquery(rel_op: &RelOperator) -> bool {

tests/sqllogictests/suites/base/09_fuse_engine/09_0035_merge_into_separate_pipeline_without_distributed_enable.test

+1-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ merge into t1_separate as t1 using (select * from t2_separate) as t2 on t1.a = t
149149
2 4
150150

151151
query T
152-
select count(*) from fuse_block('default','t1_separate');
152+
select count(*) from fuse_block('db','t1_separate');
153153
----
154154
1
155155

tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test

+4-4
Original file line numberDiff line numberDiff line change
@@ -874,10 +874,10 @@ insert into t11 values (1, 10),(2, 20),(3, 30),(4, 40);
874874
statement ok
875875
insert into t12 values (1, 10),(2, 20),(3, 30),(4, 40);
876876

877-
#query T
878-
#MERGE INTO t11 USING(SELECT NULL AS c0 FROM t12) AS t12 ON (t11.a OR TRUE) WHEN MATCHED AND TRUE THEN DELETE;
879-
#----
880-
#4
877+
query T
878+
MERGE INTO t11 USING(SELECT NULL AS c0 FROM t12) AS t12 ON (t11.a OR TRUE) WHEN MATCHED AND TRUE THEN DELETE;
879+
----
880+
4
881881

882882
query T
883883
select count(*) from t11;

tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test

+34-27
Original file line numberDiff line numberDiff line change
@@ -58,44 +58,42 @@ explain merge into t1 using t2 on t1.a < t2.a when matched then update * when no
5858
----
5959
MergeInto:
6060
target_table: default.default.t1
61-
├── distributed: false
61+
├── distributed: true
6262
├── target_build_optimization: false
6363
├── can_try_update_column_only: true
6464
├── matched update: [condition: None,update set a = a (#0)]
6565
└── unmatched insert: [condition: None,insert into (a) values(CAST(a (#0) AS Int32 NULL))]
6666
HashJoin
67-
├── output columns: [t1.a (#1), t1._row_id (#2), t2.a (#0)]
67+
├── output columns: [t1.a (#1), t1._row_id (#2), t2.a (#0), #_row_number]
6868
├── join type: RIGHT OUTER
6969
├── build keys: []
7070
├── probe keys: []
7171
├── filters: [t1.a (#1) < t2.a (#0)]
7272
├── estimated rows: 15.00
7373
├── Exchange(Build)
74-
│ ├── output columns: [t2.a (#0)]
75-
│ ├── exchange type: Merge
76-
│ └── TableScan
77-
│ ├── table: default.default.t2
78-
│ ├── output columns: [a (#0)]
79-
│ ├── read rows: 1
80-
│ ├── read size: < 1 KiB
81-
│ ├── partitions total: 1
82-
│ ├── partitions scanned: 1
83-
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
84-
│ ├── push downs: [filters: [], limit: NONE]
85-
│ └── estimated rows: 1.00
86-
└── Exchange(Probe)
87-
├── output columns: [t1.a (#1), t1._row_id (#2)]
88-
├── exchange type: Merge
89-
└── TableScan
90-
├── table: default.default.t1
91-
├── output columns: [a (#1), _row_id (#2)]
92-
├── read rows: 15
93-
├── read size: < 1 KiB
94-
├── partitions total: 3
95-
├── partitions scanned: 3
96-
├── pruning stats: [segments: <range pruning: 3 to 3>, blocks: <range pruning: 3 to 3>]
97-
├── push downs: [filters: [], limit: NONE]
98-
└── estimated rows: 15.00
74+
│ ├── output columns: [t2.a (#0), #_row_number]
75+
│ ├── exchange type: Broadcast
76+
│ └── MergeIntoAddRowNumber
77+
│ └── TableScan
78+
│ ├── table: default.default.t2
79+
│ ├── output columns: [a (#0)]
80+
│ ├── read rows: 1
81+
│ ├── read size: < 1 KiB
82+
│ ├── partitions total: 1
83+
│ ├── partitions scanned: 1
84+
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
85+
│ ├── push downs: [filters: [], limit: NONE]
86+
│ └── estimated rows: 1.00
87+
└── TableScan(Probe)
88+
├── table: default.default.t1
89+
├── output columns: [a (#1), _row_id (#2)]
90+
├── read rows: 15
91+
├── read size: < 1 KiB
92+
├── partitions total: 3
93+
├── partitions scanned: 3
94+
├── pruning stats: [segments: <range pruning: 3 to 3>, blocks: <range pruning: 3 to 3>]
95+
├── push downs: [filters: [], limit: NONE]
96+
└── estimated rows: 15.00
9997

10098
query TT
10199
merge into t1 using t2 on t1.a < t2.a when matched then update * when not matched then insert *;
@@ -173,6 +171,15 @@ HashJoin
173171
│ ├── output columns: [stage._$1 (#0), #_row_number]
174172
│ ├── exchange type: Broadcast
175173
│ └── MergeIntoAddRowNumber
174+
│ └── TableScan
175+
│ ├── table: default.system.stage
176+
│ ├── output columns: [_$1 (#0)]
177+
│ ├── read rows: 6
178+
│ ├── read size: < 1 KiB
179+
│ ├── partitions total: 1
180+
│ ├── partitions scanned: 1
181+
│ ├── push downs: [filters: [], limit: NONE]
182+
│ └── estimated rows: 0.00
176183
└── TableScan(Probe)
177184
├── table: default.default.t1
178185
├── output columns: [a (#1), _row_id (#2)]

0 commit comments

Comments
 (0)