Skip to content

Commit ef53984

Browse files
committed
fix send
1 parent cbbfb38 commit ef53984

File tree

6 files changed

+51
-35
lines changed

6 files changed

+51
-35
lines changed

src/query/service/src/pipelines/builders/builder_runtime_filter.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use databend_common_exception::Result;
1616
use databend_common_sql::executor::physical_plans::RuntimeFilterSink;
1717
use databend_common_sql::executor::physical_plans::RuntimeFilterSource;
18+
use databend_common_storages_fuse::TableContext;
1819

1920
use crate::pipelines::processors::transforms::RuntimeFilterSinkProcessor;
2021
use crate::pipelines::processors::transforms::RuntimeFilterSourceProcessor;
@@ -37,7 +38,8 @@ impl PipelineBuilder {
3738
pub(crate) fn build_runtime_filter_sink(&mut self, sink: &RuntimeFilterSink) -> Result<()> {
3839
self.build_pipeline(&sink.input)?;
3940
self.main_pipeline.resize(1, true)?;
41+
let node_num = self.ctx.get_cluster().nodes.len();
4042
self.main_pipeline
41-
.add_sink(RuntimeFilterSinkProcessor::create)
43+
.add_sink(|input| RuntimeFilterSinkProcessor::create(input, node_num))
4244
}
4345
}

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs

+10-6
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ use crate::pipelines::processors::transforms::hash_join::FixedKeyHashJoinHashTab
7979
use crate::pipelines::processors::transforms::hash_join::HashJoinHashTable;
8080
use crate::pipelines::processors::transforms::hash_join::SerializerHashJoinHashTable;
8181
use crate::pipelines::processors::transforms::hash_join::SingleBinaryHashJoinHashTable;
82-
use crate::pipelines::processors::transforms::RuntimeFilterMeta;
82+
use crate::pipelines::processors::transforms::RuntimeFiltersMeta;
8383
use crate::pipelines::processors::HashJoinState;
8484
use crate::sessions::QueryContext;
8585

@@ -119,7 +119,7 @@ pub struct HashJoinBuildState {
119119

120120
/// Spill related states.
121121
pub(crate) memory_settings: MemorySettings,
122-
pub(crate) runtime_filter_sender: Option<Sender<RuntimeFilterMeta>>,
122+
pub(crate) runtime_filter_sender: Option<Sender<RuntimeFiltersMeta>>,
123123
}
124124

125125
impl HashJoinBuildState {
@@ -131,7 +131,7 @@ impl HashJoinBuildState {
131131
build_projections: &ColumnSet,
132132
hash_join_state: Arc<HashJoinState>,
133133
num_threads: usize,
134-
rf_src_send: Option<Sender<RuntimeFilterMeta>>,
134+
rf_src_send: Option<Sender<RuntimeFiltersMeta>>,
135135
) -> Result<Arc<HashJoinBuildState>> {
136136
let hash_key_types = build_keys
137137
.iter()
@@ -291,6 +291,7 @@ impl HashJoinBuildState {
291291
.build_watcher
292292
.send(HashTableType::Empty)
293293
.map_err(|_| ErrorCode::TokioError("build_watcher channel is closed"))?;
294+
self.send_runtime_filter_meta(Default::default())?;
294295
self.set_bloom_filter_ready(false)?;
295296
return Ok(());
296297
}
@@ -310,6 +311,7 @@ impl HashJoinBuildState {
310311
if self.hash_join_state.spilled_partitions.read().is_empty() {
311312
self.add_runtime_filter(&build_chunks, build_num_rows)?;
312313
} else {
314+
self.send_runtime_filter_meta(Default::default())?;
313315
self.set_bloom_filter_ready(false)?;
314316
}
315317

@@ -842,6 +844,7 @@ impl HashJoinBuildState {
842844

843845
fn add_runtime_filter(&self, build_chunks: &[DataBlock], build_num_rows: usize) -> Result<()> {
844846
let mut bloom_filter_ready = false;
847+
let mut runtime_filters_meta = RuntimeFiltersMeta::default();
845848
for rf in self.runtime_filter_desc() {
846849
let mut runtime_filter = RuntimeFilterInfo::default();
847850
if rf.enable_inlist_runtime_filter && build_num_rows < INLIST_RUNTIME_FILTER_THRESHOLD {
@@ -870,10 +873,11 @@ impl HashJoinBuildState {
870873
}
871874
if !runtime_filter.is_empty() {
872875
bloom_filter_ready |= !runtime_filter.is_blooms_empty();
873-
self.send_runtime_filter_meta(&runtime_filter)?;
876+
runtime_filters_meta.add(rf.scan_id, &runtime_filter);
874877
self.ctx.set_runtime_filter((rf.scan_id, runtime_filter));
875878
}
876879
}
880+
self.send_runtime_filter_meta(runtime_filters_meta)?;
877881
self.set_bloom_filter_ready(bloom_filter_ready)?;
878882
Ok(())
879883
}
@@ -1075,9 +1079,9 @@ impl HashJoinBuildState {
10751079
.any(|rf| rf.enable_min_max_runtime_filter)
10761080
}
10771081

1078-
fn send_runtime_filter_meta(&self, runtime_filter_info: &RuntimeFilterInfo) -> Result<()> {
1082+
fn send_runtime_filter_meta(&self, runtime_filter: RuntimeFiltersMeta) -> Result<()> {
10791083
if let Some(sender) = self.runtime_filter_sender.as_ref() {
1080-
sender.send_blocking(runtime_filter_info.into()).unwrap();
1084+
sender.send_blocking(runtime_filter).unwrap();
10811085
sender.close();
10821086
}
10831087
Ok(())

src/query/service/src/pipelines/processors/transforms/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ mod transform_udf_server;
4242
mod window;
4343

4444
pub use hash_join::*;
45-
pub use runtime_filter::RuntimeFilterMeta;
4645
pub use runtime_filter::RuntimeFilterSinkProcessor;
4746
pub use runtime_filter::RuntimeFilterSourceProcessor;
47+
pub use runtime_filter::RuntimeFiltersMeta;
4848
pub use transform_add_computed_columns::TransformAddComputedColumns;
4949
pub use transform_add_const_columns::TransformAddConstColumns;
5050
pub use transform_add_internal_columns::TransformAddInternalColumns;

src/query/service/src/pipelines/processors/transforms/runtime_filter.rs

+29-19
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ use databend_common_pipeline_sources::AsyncSource;
3232
use databend_common_pipeline_sources::AsyncSourcer;
3333

3434
pub struct RuntimeFilterSourceProcessor {
35-
pub meta_receiver: Receiver<RuntimeFilterMeta>,
35+
pub meta_receiver: Receiver<RuntimeFiltersMeta>,
3636
}
3737

3838
impl RuntimeFilterSourceProcessor {
3939
pub fn create(
4040
ctx: Arc<dyn TableContext>,
41-
receiver: Receiver<RuntimeFilterMeta>,
41+
receiver: Receiver<RuntimeFiltersMeta>,
4242
output_port: Arc<OutputPort>,
4343
) -> Result<ProcessorPtr> {
4444
AsyncSourcer::create(ctx, output_port, Self {
@@ -63,12 +63,7 @@ impl AsyncSource for RuntimeFilterSourceProcessor {
6363
rf.is_ok()
6464
);
6565
match rf {
66-
Ok(runtime_filter) => Ok(Some(DataBlock::empty_with_meta(Box::new(
67-
RuntimeFilterMeta {
68-
inlist: runtime_filter.inlist,
69-
min_max: runtime_filter.min_max,
70-
},
71-
)))),
66+
Ok(runtime_filter) => Ok(Some(DataBlock::empty_with_meta(Box::new(runtime_filter)))),
7267
Err(_) => {
7368
// The channel is closed, we should return None to stop generating
7469
Ok(None)
@@ -77,22 +72,29 @@ impl AsyncSource for RuntimeFilterSourceProcessor {
7772
}
7873
}
7974

75+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
76+
pub struct RuntimeFiltersMeta {
77+
runtime_filters: Vec<RuntimeFilterMeta>,
78+
}
79+
8080
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
8181
pub struct RuntimeFilterMeta {
82+
scan_id: usize,
8283
inlist: Vec<RemoteExpr<String>>,
8384
min_max: Vec<RemoteExpr<String>>,
8485
}
8586

86-
#[typetag::serde(name = "runtime_filter_meta")]
87-
impl BlockMetaInfo for RuntimeFilterMeta {
87+
#[typetag::serde(name = "runtime_filters_meta")]
88+
impl BlockMetaInfo for RuntimeFiltersMeta {
8889
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
8990
Box::new(self.clone())
9091
}
9192
}
9293

93-
impl From<&RuntimeFilterInfo> for RuntimeFilterMeta {
94-
fn from(runtime_filter_info: &RuntimeFilterInfo) -> Self {
95-
Self {
94+
impl RuntimeFiltersMeta {
95+
pub fn add(&mut self, scan_id: usize, runtime_filter_info: &RuntimeFilterInfo) {
96+
let rf = RuntimeFilterMeta {
97+
scan_id,
9698
inlist: runtime_filter_info
9799
.inlists_ref()
98100
.iter()
@@ -103,14 +105,21 @@ impl From<&RuntimeFilterInfo> for RuntimeFilterMeta {
103105
.iter()
104106
.map(|expr| expr.as_remote_expr())
105107
.collect(),
106-
}
108+
};
109+
self.runtime_filters.push(rf);
107110
}
108111
}
109-
pub struct RuntimeFilterSinkProcessor {}
112+
pub struct RuntimeFilterSinkProcessor {
113+
node_num: usize,
114+
recv_num: usize,
115+
}
110116

111117
impl RuntimeFilterSinkProcessor {
112-
pub fn create(input: Arc<InputPort>) -> Result<ProcessorPtr> {
113-
Ok(ProcessorPtr::create(AsyncSinker::create(input, Self {})))
118+
pub fn create(input: Arc<InputPort>, node_num: usize) -> Result<ProcessorPtr> {
119+
Ok(ProcessorPtr::create(AsyncSinker::create(input, Self {
120+
node_num,
121+
recv_num: 0,
122+
})))
114123
}
115124
}
116125

@@ -128,12 +137,13 @@ impl AsyncSink for RuntimeFilterSinkProcessor {
128137
let ptr = data_block
129138
.take_meta()
130139
.ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to RuntimeFilterMeta"))?;
131-
let runtime_filter = RuntimeFilterMeta::downcast_from(ptr)
140+
let runtime_filter = RuntimeFiltersMeta::downcast_from(ptr)
132141
.ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to RuntimeFilterMeta"))?;
133142
log::info!(
134143
"RuntimeFilterSinkProcessor recv runtime filter: {:?}",
135144
runtime_filter
136145
);
137-
Ok(true)
146+
self.recv_num += 1;
147+
Ok(self.node_num == self.recv_num)
138148
}
139149
}

src/query/service/src/sessions/query_ctx.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ use crate::clusters::Cluster;
137137
use crate::clusters::ClusterHelper;
138138
use crate::locks::LockManager;
139139
use crate::pipelines::executor::PipelineExecutor;
140-
use crate::pipelines::processors::transforms::RuntimeFilterMeta;
140+
use crate::pipelines::processors::transforms::RuntimeFiltersMeta;
141141
use crate::servers::flight::v1::exchange::DataExchangeManager;
142142
use crate::sessions::query_affect::QueryAffect;
143143
use crate::sessions::query_ctx_shared::MemoryUpdater;
@@ -284,11 +284,11 @@ impl QueryContext {
284284
}
285285
}
286286

287-
pub fn rf_src_recv(&self, join_id: u32) -> Receiver<RuntimeFilterMeta> {
287+
pub fn rf_src_recv(&self, join_id: u32) -> Receiver<RuntimeFiltersMeta> {
288288
self.shared.rf_src_recv(join_id)
289289
}
290290

291-
pub fn rf_src_send(&self, join_id: u32) -> Sender<RuntimeFilterMeta> {
291+
pub fn rf_src_send(&self, join_id: u32) -> Sender<RuntimeFiltersMeta> {
292292
self.shared.rf_src_send(join_id)
293293
}
294294

src/query/service/src/sessions/query_ctx_shared.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ use uuid::Uuid;
7070
use crate::clusters::Cluster;
7171
use crate::clusters::ClusterDiscovery;
7272
use crate::pipelines::executor::PipelineExecutor;
73-
use crate::pipelines::processors::transforms::RuntimeFilterMeta;
73+
use crate::pipelines::processors::transforms::RuntimeFiltersMeta;
7474
use crate::sessions::query_affect::QueryAffect;
7575
use crate::sessions::Session;
7676
use crate::storages::Table;
@@ -179,8 +179,8 @@ pub struct QueryContextShared {
179179
}
180180

181181
type RuntimeFilterChannel = (
182-
Option<Sender<RuntimeFilterMeta>>,
183-
Option<Receiver<RuntimeFilterMeta>>,
182+
Option<Sender<RuntimeFiltersMeta>>,
183+
Option<Receiver<RuntimeFiltersMeta>>,
184184
);
185185

186186
impl QueryContextShared {
@@ -251,7 +251,7 @@ impl QueryContextShared {
251251
}))
252252
}
253253

254-
pub fn rf_src_recv(&self, join_id: u32) -> Receiver<RuntimeFilterMeta> {
254+
pub fn rf_src_recv(&self, join_id: u32) -> Receiver<RuntimeFiltersMeta> {
255255
let mut rf_source = self.rf_source.lock();
256256
match rf_source.get_mut(&join_id).map(|(_, receiver)| receiver) {
257257
Some(receiver) => receiver.take().unwrap(),
@@ -262,7 +262,7 @@ impl QueryContextShared {
262262
}
263263
}
264264
}
265-
pub fn rf_src_send(&self, join_id: u32) -> Sender<RuntimeFilterMeta> {
265+
pub fn rf_src_send(&self, join_id: u32) -> Sender<RuntimeFiltersMeta> {
266266
let mut rf_source = self.rf_source.lock();
267267
match rf_source.get_mut(&join_id).map(|(sender, _)| sender) {
268268
Some(sender) => sender.take().unwrap(),

0 commit comments

Comments
 (0)