Skip to content

Commit cdb2e25

Browse files
authored
refactor: refactor merge into optimizer and fix unexpected distribution plan (#15507)
* refactor: refactor merge into optimizer and fix unexpected distribution plan * fix get row id * fix * fix * save * fix * fix pipeline
1 parent 8b615a1 commit cdb2e25

19 files changed

+292
-318
lines changed

src/query/service/src/interpreters/interpreter_explain.rs

+21-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use databend_common_pipeline_core::processors::PlanProfile;
3030
use databend_common_sql::binder::ExplainConfig;
3131
use databend_common_sql::optimizer::ColumnSet;
3232
use databend_common_sql::plans::FunctionCall;
33+
use databend_common_sql::plans::MergeInto;
3334
use databend_common_sql::plans::UpdatePlan;
3435
use databend_common_sql::BindContext;
3536
use databend_common_sql::MetadataRef;
@@ -40,6 +41,7 @@ use databend_common_users::UserApiProvider;
4041
use super::InsertMultiTableInterpreter;
4142
use super::InterpreterFactory;
4243
use super::UpdateInterpreter;
44+
use crate::interpreters::interpreter_merge_into::MergeIntoInterpreter;
4345
use crate::interpreters::Interpreter;
4446
use crate::pipelines::executor::ExecutorSettings;
4547
use crate::pipelines::executor::PipelineCompleteExecutor;
@@ -191,7 +193,7 @@ impl Interpreter for ExplainInterpreter {
191193
// todo:(JackTan25), we need to make all execute2() just do `build pipeline` work,
192194
// don't take real actions. for now we fix #13657 like below.
193195
let pipeline = match &self.plan {
194-
Plan::Query { .. } => {
196+
Plan::Query { .. } | Plan::MergeInto(_) => {
195197
let interpter =
196198
InterpreterFactory::get(self.ctx.clone(), &self.plan).await?;
197199
interpter.execute2().await?
@@ -216,6 +218,7 @@ impl Interpreter for ExplainInterpreter {
216218
)
217219
.await?
218220
}
221+
Plan::MergeInto(merge_into) => self.explain_merge_fragments(merge_into).await?,
219222
Plan::Update(update) => self.explain_update_fragments(update.as_ref()).await?,
220223
_ => {
221224
return Err(ErrorCode::Unimplemented("Unsupported EXPLAIN statement"));
@@ -563,4 +566,21 @@ impl ExplainInterpreter {
563566
}
564567
Ok(vec![DataBlock::concat(&result)?])
565568
}
569+
570+
async fn explain_merge_fragments(&self, merge_into: &MergeInto) -> Result<Vec<DataBlock>> {
571+
let interpreter = MergeIntoInterpreter::try_create(self.ctx.clone(), merge_into.clone())?;
572+
let (plan, _) = interpreter.build_physical_plan().await?;
573+
let root_fragment = Fragmenter::try_create(self.ctx.clone())?.build_fragment(&plan)?;
574+
575+
let mut fragments_actions = QueryFragmentsActions::create(self.ctx.clone());
576+
root_fragment.get_actions(self.ctx.clone(), &mut fragments_actions)?;
577+
578+
let display_string = fragments_actions
579+
.display_indent(&merge_into.meta_data)
580+
.to_string();
581+
582+
let line_split_result = display_string.lines().collect::<Vec<_>>();
583+
let formatted_plan = StringType::from_data(line_split_result);
584+
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
585+
}
566586
}

src/query/service/src/interpreters/interpreter_factory.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -339,9 +339,11 @@ impl InterpreterFactory {
339339
Plan::Insert(insert) => InsertInterpreter::try_create(ctx, *insert.clone()),
340340

341341
Plan::Replace(replace) => ReplaceInterpreter::try_create(ctx, *replace.clone()),
342-
Plan::MergeInto(merge_into) => {
343-
MergeIntoInterpreter::try_create(ctx, *merge_into.clone())
344-
}
342+
Plan::MergeInto(merge_into) => Ok(Arc::new(MergeIntoInterpreter::try_create(
343+
ctx,
344+
*merge_into.clone(),
345+
)?)),
346+
345347
Plan::Delete(delete) => Ok(Arc::new(DeleteInterpreter::try_create(
346348
ctx,
347349
*delete.clone(),

src/query/service/src/interpreters/interpreter_merge_into.rs

+17-33
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use databend_common_sql::executor::PhysicalPlan;
4545
use databend_common_sql::executor::PhysicalPlanBuilder;
4646
use databend_common_sql::plans;
4747
use databend_common_sql::plans::MergeInto as MergePlan;
48-
use databend_common_sql::plans::RelOperator;
4948
use databend_common_sql::IndexType;
5049
use databend_common_sql::ScalarExpr;
5150
use databend_common_sql::TypeCheck;
@@ -60,7 +59,6 @@ use itertools::Itertools;
6059
use crate::interpreters::common::dml_build_update_stream_req;
6160
use crate::interpreters::HookOperator;
6261
use crate::interpreters::Interpreter;
63-
use crate::interpreters::InterpreterPtr;
6462
use crate::pipelines::PipelineBuildResult;
6563
use crate::schedulers::build_query_pipeline_without_render_result_set;
6664
use crate::sessions::QueryContext;
@@ -74,8 +72,8 @@ pub struct MergeIntoInterpreter {
7472
}
7573

7674
impl MergeIntoInterpreter {
77-
pub fn try_create(ctx: Arc<QueryContext>, plan: MergePlan) -> Result<InterpreterPtr> {
78-
Ok(Arc::new(MergeIntoInterpreter { ctx, plan }))
75+
pub fn try_create(ctx: Arc<QueryContext>, plan: MergePlan) -> Result<MergeIntoInterpreter> {
76+
Ok(MergeIntoInterpreter { ctx, plan })
7977
}
8078
}
8179

@@ -125,7 +123,7 @@ impl Interpreter for MergeIntoInterpreter {
125123
}
126124

127125
impl MergeIntoInterpreter {
128-
async fn build_physical_plan(&self) -> Result<(PhysicalPlan, TableInfo)> {
126+
pub async fn build_physical_plan(&self) -> Result<(PhysicalPlan, TableInfo)> {
129127
let MergePlan {
130128
bind_context,
131129
input,
@@ -145,8 +143,10 @@ impl MergeIntoInterpreter {
145143
split_idx,
146144
row_id_index,
147145
can_try_update_column_only,
146+
enable_right_broadcast,
148147
..
149148
} = &self.plan;
149+
let enable_right_broadcast = *enable_right_broadcast;
150150
let mut columns_set = columns_set.clone();
151151
let table = self.ctx.get_table(catalog, database, table_name).await?;
152152
let fuse_table = table.as_any().downcast_ref::<FuseTable>().ok_or_else(|| {
@@ -211,16 +211,8 @@ impl MergeIntoInterpreter {
211211
let table_name = table_name.clone();
212212
let input = input.clone();
213213

214-
// we need to extract join plan, but we need to give this exchange
215-
// back at last.
216-
let (input, extract_exchange) = if let RelOperator::Exchange(_) = input.plan() {
217-
(Box::new(input.child(0)?.clone()), true)
218-
} else {
219-
(input, false)
220-
};
221-
222214
let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false);
223-
let mut join_input = builder.build(&input, *columns_set.clone()).await?;
215+
let join_input = builder.build(&input, *columns_set.clone()).await?;
224216

225217
// find row_id column index
226218
let join_output_schema = join_input.output_schema()?;
@@ -265,7 +257,7 @@ impl MergeIntoInterpreter {
265257
}
266258
}
267259

268-
if *distributed && !*change_join_order {
260+
if enable_right_broadcast {
269261
row_number_idx = Some(join_output_schema.index_of(ROW_NUMBER_COL_NAME)?);
270262
}
271263

@@ -276,7 +268,7 @@ impl MergeIntoInterpreter {
276268
));
277269
}
278270

279-
if *distributed && row_number_idx.is_none() && !*change_join_order {
271+
if enable_right_broadcast && row_number_idx.is_none() {
280272
return Err(ErrorCode::InvalidRowIdIndex(
281273
"can't get internal row_number_idx when running merge into",
282274
));
@@ -285,17 +277,6 @@ impl MergeIntoInterpreter {
285277
let table_info = fuse_table.get_table_info().clone();
286278
let catalog_ = self.ctx.get_catalog(catalog).await?;
287279

288-
if !*distributed && extract_exchange {
289-
join_input = PhysicalPlan::Exchange(Exchange {
290-
plan_id: 0,
291-
input: Box::new(join_input),
292-
kind: FragmentKind::Merge,
293-
keys: vec![],
294-
allow_adjust_parallelism: true,
295-
ignore_exchange: false,
296-
});
297-
};
298-
299280
// transform unmatched for insert
300281
// reference to func `build_eval_scalar`
301282
// (DataSchemaRef, Option<RemoteExpr>, Vec<RemoteExpr>,Vec<usize>) => (source_schema, condition, value_exprs)
@@ -432,6 +413,7 @@ impl MergeIntoInterpreter {
432413
can_try_update_column_only: *can_try_update_column_only,
433414
plan_id: u32::MAX,
434415
merge_into_split_idx,
416+
enable_right_broadcast,
435417
}))
436418
} else {
437419
let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto {
@@ -444,23 +426,25 @@ impl MergeIntoInterpreter {
444426
row_id_idx,
445427
segments: segments.clone(),
446428
distributed: true,
447-
output_schema: match *change_join_order {
448-
false => DataSchemaRef::new(DataSchema::new(vec![
449-
join_output_schema.fields[row_number_idx.unwrap()].clone(),
450-
])),
451-
true => DataSchemaRef::new(DataSchema::new(vec![DataField::new(
429+
output_schema: if let Some(row_number_idx) = row_number_idx {
430+
DataSchemaRef::new(DataSchema::new(vec![
431+
join_output_schema.fields[row_number_idx].clone(),
432+
]))
433+
} else {
434+
DataSchemaRef::new(DataSchema::new(vec![DataField::new(
452435
ROW_ID_COL_NAME,
453436
databend_common_expression::types::DataType::Number(
454437
databend_common_expression::types::NumberDataType::UInt64,
455438
),
456-
)])),
439+
)]))
457440
},
458441
merge_type: merge_type.clone(),
459442
change_join_order: *change_join_order,
460443
target_build_optimization: false, // we don't support for distributed mode for now..
461444
can_try_update_column_only: *can_try_update_column_only,
462445
plan_id: u32::MAX,
463446
merge_into_split_idx,
447+
enable_right_broadcast,
464448
}));
465449
// if change_join_order = true, it means the target is build side,
466450
// in this way, we will do matched operation and not matched operation

src/query/service/src/pipelines/builders/builder_merge_into.rs

+15-15
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl PipelineBuilder {
143143
}
144144
assert!(self.join_state.is_some());
145145
assert!(self.merge_into_probe_data_fields.is_some());
146-
146+
self.main_pipeline.resize(1, false)?;
147147
let join_state = self.join_state.clone().unwrap();
148148
// split row_number and log
149149
// output_port_row_number
@@ -291,6 +291,7 @@ impl PipelineBuilder {
291291
// we will receive MutationLogs only without rowids.
292292
return Ok(());
293293
}
294+
self.main_pipeline.resize(1, false)?;
294295
// we will receive MutationLogs and rowids. So we should apply
295296
// rowids firstly and then send all mutation logs to commit sink.
296297
// we need to spilt rowid and mutationlogs, and we can get pipeitems:
@@ -366,9 +367,10 @@ impl PipelineBuilder {
366367
change_join_order,
367368
can_try_update_column_only,
368369
merge_into_split_idx,
370+
enable_right_broadcast,
369371
..
370372
} = merge_into;
371-
373+
let enable_right_broadcast = *enable_right_broadcast;
372374
self.build_pipeline(input)?;
373375

374376
self.main_pipeline
@@ -457,10 +459,8 @@ impl PipelineBuilder {
457459
}
458460

459461
if need_unmatch {
460-
// distributed: false, standalone mode, we need to add insert processor
461-
// (distributed,change join order):(true,true) target is build side, we
462-
// need to support insert in local node.
463-
if !*distributed || *change_join_order {
462+
// If merge into doesn't contain right broadcast join, execute insert in local.
463+
if !enable_right_broadcast {
464464
let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create(
465465
unmatched.clone(),
466466
input.output_schema()?,
@@ -613,7 +613,7 @@ impl PipelineBuilder {
613613
}
614614
} else if need_match && need_unmatch {
615615
// remove first row_id port and last row_number_port
616-
if !*change_join_order {
616+
if enable_right_broadcast {
617617
self.main_pipeline.output_len() - 2
618618
} else {
619619
// remove first row_id port
@@ -624,7 +624,7 @@ impl PipelineBuilder {
624624
self.main_pipeline.output_len() - 1
625625
} else {
626626
// there are only row_number
627-
if !*change_join_order {
627+
if enable_right_broadcast {
628628
0
629629
} else {
630630
// unmatched prot
@@ -659,7 +659,7 @@ impl PipelineBuilder {
659659
// receive row_id
660660
builder.add_items_prepend(vec![create_dummy_item()]);
661661
}
662-
if need_unmatch && !*change_join_order {
662+
if enable_right_broadcast {
663663
// receive row_number
664664
builder.add_items(vec![create_dummy_item()]);
665665
}
@@ -721,7 +721,7 @@ impl PipelineBuilder {
721721
}
722722

723723
// need to receive row_number, we should give a dummy item here.
724-
if *distributed && need_unmatch && !*change_join_order {
724+
if enable_right_broadcast {
725725
builder.add_items(vec![create_dummy_item()]);
726726
}
727727
self.main_pipeline.add_pipe(builder.finalize());
@@ -773,7 +773,7 @@ impl PipelineBuilder {
773773
}
774774

775775
// need to receive row_number, we should give a dummy item here.
776-
if *distributed && need_unmatch && !*change_join_order {
776+
if enable_right_broadcast {
777777
builder.add_items(vec![create_dummy_item()]);
778778
}
779779
self.main_pipeline.add_pipe(builder.finalize());
@@ -817,7 +817,7 @@ impl PipelineBuilder {
817817
}
818818

819819
// receive row_number
820-
if *distributed && need_unmatch && !*change_join_order {
820+
if enable_right_broadcast {
821821
pipe_items.push(create_dummy_item());
822822
}
823823

@@ -853,7 +853,7 @@ impl PipelineBuilder {
853853
}
854854

855855
// with row_number
856-
if *distributed && need_unmatch && !change_join_order {
856+
if enable_right_broadcast {
857857
ranges.push(vec![self.main_pipeline.output_len() - 1]);
858858
}
859859

@@ -877,7 +877,7 @@ impl PipelineBuilder {
877877
vec.push(serialize_segment_transform.into_pipe_item());
878878
}
879879

880-
if need_unmatch && !*change_join_order {
880+
if enable_right_broadcast {
881881
vec.push(create_dummy_item())
882882
}
883883
vec
@@ -911,7 +911,7 @@ impl PipelineBuilder {
911911
));
912912

913913
// accumulate row_number
914-
if *distributed && need_unmatch && !*change_join_order {
914+
if enable_right_broadcast {
915915
let pipe_items = if need_match {
916916
vec![
917917
create_dummy_item(),

src/query/service/src/schedulers/fragments/fragmenter.rs

-4
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,6 @@ impl PhysicalPlanReplacer for Fragmenter {
162162

163163
fn replace_merge_into(&mut self, plan: &MergeInto) -> Result<PhysicalPlan> {
164164
let input = self.replace(&plan.input)?;
165-
if !plan.change_join_order {
166-
self.state = State::SelectLeaf;
167-
}
168165
Ok(PhysicalPlan::MergeInto(Box::new(MergeInto {
169166
input: Box::new(input),
170167
..plan.clone()
@@ -232,7 +229,6 @@ impl PhysicalPlanReplacer for Fragmenter {
232229
// Consume current fragments to prevent them being consumed by `probe_input`.
233230
fragments.append(&mut self.fragments);
234231
let probe_input = self.replace(plan.probe.as_ref())?;
235-
236232
fragments.append(&mut self.fragments);
237233
self.fragments = fragments;
238234

0 commit comments

Comments
 (0)