Skip to content

Commit 282a3b1

Browse files
authored
feat: support using fragment forest to execute additional broadcast operations. (#17872)
* update * avoid committing multiple times * fix merge * fix fragment type * rm unused modify * fix * improve err msg * fix build pipeline * fix exhchange * refactor * fix root pipeline * fix pipeline * improve error msg * fix serialize block * fix sink * add log * fix send * merge rf * fix * fix * fix * fix conflict * update * fix * rm unused modify * make lint * fix * rm log * rm empty impl * rm log * fix * add comment
1 parent 2ab9ced commit 282a3b1

24 files changed

+519
-89
lines changed

src/query/catalog/src/table_context.rs

+9
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,15 @@ pub trait TableContext: Send + Sync {
418418
fn set_pruned_partitions_stats(&self, _partitions: PartStatistics) {
419419
unimplemented!()
420420
}
421+
422+
/// Calling this function will automatically create a pipeline for broadcast data in `build_distributed_pipeline()`
423+
///
424+
/// The returned id can be used to get sender and receiver for broadcasting data.
425+
fn get_next_broadcast_id(&self) -> u32;
426+
427+
fn reset_broadcast_id(&self) {
428+
unimplemented!()
429+
}
421430
}
422431

423432
pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_exception::Result;
16+
use databend_common_sql::executor::physical_plans::BroadcastSink;
17+
use databend_common_sql::executor::physical_plans::BroadcastSource;
18+
19+
use crate::pipelines::processors::transforms::BroadcastSinkProcessor;
20+
use crate::pipelines::processors::transforms::BroadcastSourceProcessor;
21+
use crate::pipelines::PipelineBuilder;
22+
23+
impl PipelineBuilder {
24+
pub(crate) fn build_broadcast_source(&mut self, source: &BroadcastSource) -> Result<()> {
25+
let receiver = self.ctx.broadcast_source_receiver(source.broadcast_id);
26+
self.main_pipeline.add_source(
27+
|output| BroadcastSourceProcessor::create(self.ctx.clone(), receiver.clone(), output),
28+
1,
29+
)
30+
}
31+
32+
pub(crate) fn build_broadcast_sink(&mut self, sink: &BroadcastSink) -> Result<()> {
33+
self.build_pipeline(&sink.input)?;
34+
self.main_pipeline.resize(1, true)?;
35+
self.main_pipeline.add_sink(|input| {
36+
BroadcastSinkProcessor::create(input, self.ctx.broadcast_sink_sender(sink.broadcast_id))
37+
})
38+
}
39+
}

src/query/service/src/pipelines/builders/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ mod builder_add_stream_column;
1616
mod builder_aggregate;
1717
mod builder_append_table;
1818
mod builder_async_function;
19+
mod builder_broadcast;
1920
mod builder_column_mutation;
2021
mod builder_commit;
2122
mod builder_compact;

src/query/service/src/pipelines/pipeline_builder.rs

+2
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ impl PipelineBuilder {
220220
}
221221
PhysicalPlan::Shuffle(shuffle) => self.build_shuffle(shuffle),
222222
PhysicalPlan::Duplicate(duplicate) => self.build_duplicate(duplicate),
223+
PhysicalPlan::BroadcastSource(source) => self.build_broadcast_source(source),
224+
PhysicalPlan::BroadcastSink(sink) => self.build_broadcast_sink(sink),
223225

224226
// ==============================
225227
// 4. Data Modification Operations
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use async_channel::Receiver;
18+
use async_channel::Sender;
19+
use databend_common_catalog::table_context::TableContext;
20+
use databend_common_exception::ErrorCode;
21+
use databend_common_exception::Result;
22+
use databend_common_expression::BlockMetaInfoPtr;
23+
use databend_common_expression::DataBlock;
24+
use databend_common_pipeline_core::processors::InputPort;
25+
use databend_common_pipeline_core::processors::OutputPort;
26+
use databend_common_pipeline_core::processors::ProcessorPtr;
27+
use databend_common_pipeline_sinks::AsyncSink;
28+
use databend_common_pipeline_sinks::AsyncSinker;
29+
use databend_common_pipeline_sources::AsyncSource;
30+
use databend_common_pipeline_sources::AsyncSourcer;
31+
32+
pub struct BroadcastSourceProcessor {
33+
pub receiver: Receiver<BlockMetaInfoPtr>,
34+
}
35+
36+
impl BroadcastSourceProcessor {
37+
pub fn create(
38+
ctx: Arc<dyn TableContext>,
39+
receiver: Receiver<BlockMetaInfoPtr>,
40+
output_port: Arc<OutputPort>,
41+
) -> Result<ProcessorPtr> {
42+
AsyncSourcer::create(ctx, output_port, Self { receiver })
43+
}
44+
}
45+
46+
#[async_trait::async_trait]
47+
impl AsyncSource for BroadcastSourceProcessor {
48+
const NAME: &'static str = "BroadcastSource";
49+
const SKIP_EMPTY_DATA_BLOCK: bool = false;
50+
51+
#[async_backtrace::framed]
52+
async fn generate(&mut self) -> Result<Option<DataBlock>> {
53+
let received = self.receiver.recv().await;
54+
match received {
55+
Ok(meta) => Ok(Some(DataBlock::empty_with_meta(meta))),
56+
Err(_) => {
57+
// The channel is closed, we should return None to stop generating
58+
Ok(None)
59+
}
60+
}
61+
}
62+
}
63+
64+
pub struct BroadcastSinkProcessor {
65+
received: Vec<BlockMetaInfoPtr>,
66+
sender: Sender<BlockMetaInfoPtr>,
67+
}
68+
69+
impl BroadcastSinkProcessor {
70+
pub fn create(input: Arc<InputPort>, sender: Sender<BlockMetaInfoPtr>) -> Result<ProcessorPtr> {
71+
Ok(ProcessorPtr::create(AsyncSinker::create(input, Self {
72+
received: vec![],
73+
sender,
74+
})))
75+
}
76+
}
77+
78+
#[async_trait::async_trait]
79+
impl AsyncSink for BroadcastSinkProcessor {
80+
const NAME: &'static str = "BroadcastSink";
81+
82+
async fn on_finish(&mut self) -> Result<()> {
83+
self.sender.close();
84+
Ok(())
85+
}
86+
87+
async fn consume(&mut self, mut data_block: DataBlock) -> Result<bool> {
88+
let meta = data_block
89+
.take_meta()
90+
.ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to BroadcastMeta"))?;
91+
self.sender
92+
.send(meta)
93+
.await
94+
.map_err(|_| ErrorCode::Internal("BroadcastSinkProcessor send error"))?;
95+
Ok(false)
96+
}
97+
}

src/query/service/src/pipelines/processors/transforms/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
// limitations under the License.
1414

1515
pub mod aggregator;
16+
#[allow(dead_code)]
17+
mod broadcast;
1618
mod hash_join;
1719
pub(crate) mod range_join;
1820
mod runtime_pool;
@@ -40,6 +42,8 @@ mod transform_udf_script;
4042
mod transform_udf_server;
4143
mod window;
4244

45+
pub use broadcast::BroadcastSinkProcessor;
46+
pub use broadcast::BroadcastSourceProcessor;
4347
pub use hash_join::*;
4448
pub use transform_add_computed_columns::TransformAddComputedColumns;
4549
pub use transform_add_const_columns::TransformAddConstColumns;

src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,9 @@ async fn create_memory_table_for_cte_scan(
351351
| PhysicalPlan::ChunkFillAndReorder(_)
352352
| PhysicalPlan::ChunkAppendData(_)
353353
| PhysicalPlan::ChunkMerge(_)
354-
| PhysicalPlan::ChunkCommitInsert(_) => {}
354+
| PhysicalPlan::ChunkCommitInsert(_)
355+
| PhysicalPlan::BroadcastSource(_)
356+
| PhysicalPlan::BroadcastSink(_) => {}
355357
}
356358
Ok(())
357359
}

src/query/service/src/schedulers/fragments/fragmenter.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,13 @@ impl Fragmenter {
123123

124124
pub fn build_fragment(mut self, plan: &PhysicalPlan) -> Result<PlanFragment> {
125125
let root = self.replace(plan)?;
126+
let fragment_type = match plan {
127+
PhysicalPlan::BroadcastSink(_) => FragmentType::Intermediate,
128+
_ => FragmentType::Root,
129+
};
126130
let mut root_fragment = PlanFragment {
127131
plan: root,
128-
fragment_type: FragmentType::Root,
132+
fragment_type,
129133
fragment_id: self.ctx.get_fragment_id(),
130134
exchange: None,
131135
query_id: self.query_id.clone(),
@@ -218,6 +222,7 @@ impl PhysicalPlanReplacer for Fragmenter {
218222
fragments.append(&mut self.fragments);
219223
let probe_input = self.replace(plan.probe.as_ref())?;
220224
fragments.append(&mut self.fragments);
225+
221226
self.fragments = fragments;
222227

223228
Ok(PhysicalPlan::HashJoin(HashJoin {

src/query/service/src/schedulers/fragments/query_fragment_actions.rs

+10-6
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,16 @@ impl QueryFragmentsActions {
126126
self.ctx.get_cluster().local_id()
127127
}
128128

129-
pub fn get_root_actions(&self) -> Result<&QueryFragmentActions> {
130-
self.fragments_actions.last().ok_or_else(|| {
131-
ErrorCode::Internal(
132-
"Logical error, call get_root_actions in empty QueryFragmentsActions",
133-
)
134-
})
129+
pub fn get_root_fragment_ids(&self) -> Result<Vec<usize>> {
130+
let mut fragment_ids = Vec::new();
131+
for fragment_actions in &self.fragments_actions {
132+
let plan = &fragment_actions.fragment_actions[0].physical_plan;
133+
if !matches!(plan, PhysicalPlan::ExchangeSink(_)) {
134+
fragment_ids.push(fragment_actions.fragment_id);
135+
}
136+
}
137+
138+
Ok(fragment_ids)
135139
}
136140

137141
pub fn pop_root_actions(&mut self) -> Option<QueryFragmentActions> {

src/query/service/src/schedulers/scheduler.rs

+9-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::sync::Arc;
1717
use async_trait::async_trait;
1818
use databend_common_exception::Result;
1919
use databend_common_expression::DataBlock;
20+
use databend_common_sql::executor::build_broadcast_plans;
2021
use databend_common_sql::planner::QueryExecutor;
2122
use databend_common_sql::Planner;
2223
use futures_util::TryStreamExt;
@@ -99,11 +100,15 @@ pub async fn build_distributed_pipeline(
99100
ctx: &Arc<QueryContext>,
100101
plan: &PhysicalPlan,
101102
) -> Result<PipelineBuildResult> {
102-
let fragmenter = Fragmenter::try_create(ctx.clone())?;
103-
104-
let root_fragment = fragmenter.build_fragment(plan)?;
105103
let mut fragments_actions = QueryFragmentsActions::create(ctx.clone());
106-
root_fragment.get_actions(ctx.clone(), &mut fragments_actions)?;
104+
for plan in build_broadcast_plans(ctx.as_ref())?
105+
.iter()
106+
.chain(std::iter::once(plan))
107+
{
108+
let fragmenter = Fragmenter::try_create(ctx.clone())?;
109+
let root_fragment = fragmenter.build_fragment(plan)?;
110+
root_fragment.get_actions(ctx.clone(), &mut fragments_actions)?;
111+
}
107112

108113
let exchange_manager = ctx.get_exchange_manager();
109114

0 commit comments

Comments
 (0)