diff --git a/src/query/catalog/src/runtime_filter_info.rs b/src/query/catalog/src/runtime_filter_info.rs index 5caeae4090789..8bffe222cb0a2 100644 --- a/src/query/catalog/src/runtime_filter_info.rs +++ b/src/query/catalog/src/runtime_filter_info.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use databend_common_base::base::tokio::sync::watch; use databend_common_base::base::tokio::sync::watch::Receiver; use databend_common_base::base::tokio::sync::watch::Sender; @@ -19,47 +21,23 @@ use databend_common_expression::Expr; use xorf::BinaryFuse16; #[derive(Clone, Debug, Default)] -pub struct RuntimeFilterInfo { - inlist: Vec>, - min_max: Vec>, - bloom: Vec<(String, BinaryFuse16)>, +pub struct RuntimeFiltersForScan { + pub inlist: HashMap>, + pub min_max: HashMap>, + pub bloom: HashMap, } -impl RuntimeFilterInfo { - pub fn add_inlist(&mut self, expr: Expr) { - self.inlist.push(expr); - } - - pub fn add_bloom(&mut self, bloom: (String, BinaryFuse16)) { - self.bloom.push(bloom); - } - - pub fn add_min_max(&mut self, expr: Expr) { - self.min_max.push(expr); - } - - pub fn get_inlist(&self) -> &Vec> { - &self.inlist - } - - pub fn get_bloom(&self) -> &Vec<(String, BinaryFuse16)> { - &self.bloom - } - - pub fn get_min_max(&self) -> &Vec> { - &self.min_max - } - - pub fn blooms(self) -> Vec<(String, BinaryFuse16)> { - self.bloom +impl RuntimeFiltersForScan { + pub fn add_inlist(&mut self, rf_id: usize, expr: Expr) { + self.inlist.insert(rf_id, expr); } - pub fn inlists(self) -> Vec> { - self.inlist + pub fn add_bloom(&mut self, rf_id: usize, bloom: (String, BinaryFuse16)) { + self.bloom.insert(rf_id, bloom); } - pub fn min_maxs(self) -> Vec> { - self.min_max + pub fn add_min_max(&mut self, rf_id: usize, expr: Expr) { + self.min_max.insert(rf_id, expr); } pub fn is_empty(&self) -> bool { diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 5eb826974ef68..346d1a959364e 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -74,8 +74,8 @@ use crate::plan::PartInfoPtr; use crate::plan::PartStatistics; use crate::plan::Partitions; use crate::query_kind::QueryKind; -use crate::runtime_filter_info::RuntimeFilterInfo; use crate::runtime_filter_info::RuntimeFilterReady; +use crate::runtime_filter_info::RuntimeFiltersForScan; use crate::statistics::data_cache_statistics::DataCacheMetrics; use crate::table::Table; @@ -324,7 +324,7 @@ pub trait TableContext: Send + Sync { fn get_query_profiles(&self) -> Vec; - fn set_runtime_filter(&self, filters: (usize, RuntimeFilterInfo)); + fn set_runtime_filter(&self, filters: (usize, RuntimeFiltersForScan)); fn set_runtime_filter_ready(&self, table_index: usize, ready: Arc); diff --git a/src/query/pipeline/sources/src/async_source.rs b/src/query/pipeline/sources/src/async_source.rs index 77360d4045e2d..72904a145b124 100644 --- a/src/query/pipeline/sources/src/async_source.rs +++ b/src/query/pipeline/sources/src/async_source.rs @@ -131,7 +131,9 @@ impl Processor for AsyncSourcer { return Ok(()); } match self.inner.generate().await? { - None => self.is_finish = true, + None => { + self.is_finish = true; + } Some(data_block) => { if !data_block.is_empty() { let progress_values = ProgressValues { diff --git a/src/query/service/src/pipelines/builders/builder_join.rs b/src/query/service/src/pipelines/builders/builder_join.rs index 5dc183497928d..d4d7d5cfa3639 100644 --- a/src/query/service/src/pipelines/builders/builder_join.rs +++ b/src/query/service/src/pipelines/builders/builder_join.rs @@ -27,6 +27,7 @@ use crate::pipelines::processors::transforms::range_join::TransformRangeJoinLeft use crate::pipelines::processors::transforms::range_join::TransformRangeJoinRight; use crate::pipelines::processors::transforms::HashJoinBuildState; use crate::pipelines::processors::transforms::HashJoinProbeState; +use crate::pipelines::processors::transforms::RuntimeFilterChannels; use crate::pipelines::processors::transforms::TransformHashJoinBuild; use crate::pipelines::processors::transforms::TransformHashJoinProbe; use crate::pipelines::processors::HashJoinDesc; @@ -102,6 +103,7 @@ impl PipelineBuilder { self.hash_join_states .insert(build_cache_index, state.clone()); } + self.expand_build_side_pipeline(&join.build, join, state.clone())?; self.build_join_probe(join, state)?; @@ -153,6 +155,13 @@ impl PipelineBuilder { &hash_join_plan.build_projections, join_state.clone(), output_len, + hash_join_plan + .runtime_filter_plan + .as_ref() + .map(|_| RuntimeFilterChannels { + rf_src_send: self.ctx.rf_src_send(hash_join_plan.join_id), + rf_sink_recv: self.ctx.rf_sink_recv(hash_join_plan.join_id), + }), )?; build_state.add_runtime_filter_ready(); diff --git a/src/query/service/src/pipelines/builders/builder_runtime_filter.rs b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs new file mode 100644 index 0000000000000..4be2d23e42699 --- /dev/null +++ b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs @@ -0,0 +1,46 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_sql::executor::physical_plans::RuntimeFilterSink; +use databend_common_sql::executor::physical_plans::RuntimeFilterSource; +use databend_common_storages_fuse::TableContext; + +use crate::pipelines::processors::transforms::RuntimeFilterSinkProcessor; +use crate::pipelines::processors::transforms::RuntimeFilterSourceProcessor; +use crate::pipelines::PipelineBuilder; + +impl PipelineBuilder { + pub(crate) fn build_runtime_filter_source( + &mut self, + _source: &RuntimeFilterSource, + ) -> Result<()> { + let receiver = self.ctx.rf_src_recv(_source.join_id); + self.main_pipeline.add_source( + |output| { + RuntimeFilterSourceProcessor::create(self.ctx.clone(), receiver.clone(), output) + }, + 1, + ) + } + + pub(crate) fn build_runtime_filter_sink(&mut self, sink: &RuntimeFilterSink) -> Result<()> { + self.build_pipeline(&sink.input)?; + self.main_pipeline.resize(1, true)?; + let node_num = self.ctx.get_cluster().nodes.len(); + self.main_pipeline.add_sink(|input| { + RuntimeFilterSinkProcessor::create(input, node_num, self.ctx.rf_sink_send(sink.join_id)) + }) + } +} diff --git a/src/query/service/src/pipelines/builders/mod.rs b/src/query/service/src/pipelines/builders/mod.rs index 8a2e5a481a349..3118b795f8b47 100644 --- a/src/query/service/src/pipelines/builders/mod.rs +++ b/src/query/service/src/pipelines/builders/mod.rs @@ -40,6 +40,7 @@ mod builder_recluster; mod builder_recursive_cte; mod builder_replace_into; mod builder_row_fetch; +mod builder_runtime_filter; mod builder_scalar; mod builder_scan; mod builder_sort; diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 49bb9eccdb24e..0799456da42e5 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -258,6 +258,8 @@ impl PipelineBuilder { PhysicalPlan::ColumnMutation(column_mutation) => { self.build_column_mutation(column_mutation) } + PhysicalPlan::RuntimeFilterSource(source) => self.build_runtime_filter_source(source), + PhysicalPlan::RuntimeFilterSink(sink) => self.build_runtime_filter_sink(sink), }?; self.is_exchange_neighbor = is_exchange_neighbor; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs index 428ad2f03ae38..f464b3c358846 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs @@ -20,8 +20,8 @@ use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_sql::executor::cast_expr_to_non_null_boolean; use databend_common_sql::executor::physical_plans::HashJoin; -use databend_common_sql::executor::PhysicalRuntimeFilter; -use databend_common_sql::executor::PhysicalRuntimeFilters; +use databend_common_sql::executor::RemoteRuntimeFilterDesc; +use databend_common_sql::executor::RemoteRuntimeFiltersDesc; use parking_lot::RwLock; use crate::sql::plans::JoinType; @@ -54,7 +54,7 @@ pub struct HashJoinDesc { } pub struct RuntimeFilterDesc { - pub _id: usize, + pub id: usize, pub build_key: Expr, pub probe_key: Expr, pub scan_id: usize, @@ -67,18 +67,18 @@ pub struct RuntimeFiltersDesc { pub filters: Vec, } -impl From<&PhysicalRuntimeFilters> for RuntimeFiltersDesc { - fn from(runtime_filter: &PhysicalRuntimeFilters) -> Self { +impl From<&RemoteRuntimeFiltersDesc> for RuntimeFiltersDesc { + fn from(runtime_filter: &RemoteRuntimeFiltersDesc) -> Self { Self { filters: runtime_filter.filters.iter().map(|rf| rf.into()).collect(), } } } -impl From<&PhysicalRuntimeFilter> for RuntimeFilterDesc { - fn from(runtime_filter: &PhysicalRuntimeFilter) -> Self { +impl From<&RemoteRuntimeFilterDesc> for RuntimeFilterDesc { + fn from(runtime_filter: &RemoteRuntimeFilterDesc) -> Self { Self { - _id: runtime_filter.id, + id: runtime_filter.id, build_key: runtime_filter.build_key.as_expr(&BUILTIN_FUNCTIONS), probe_key: runtime_filter.probe_key.as_expr(&BUILTIN_FUNCTIONS), scan_id: runtime_filter.scan_id, @@ -117,7 +117,7 @@ impl HashJoinDesc { from_correlated_subquery: join.from_correlated_subquery, broadcast: join.broadcast, single_to_inner: join.single_to_inner.clone(), - runtime_filter: (&join.runtime_filter).into(), + runtime_filter: (&join.runtime_filter_desc).into(), }) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index 0af13ee98f003..f4a1459499f67 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::collections::HashSet; use std::collections::VecDeque; use std::ops::ControlFlow; @@ -20,9 +21,11 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; +use async_channel::Receiver; +use async_channel::Sender; use databend_common_base::base::tokio::sync::Barrier; -use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; +use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan; use databend_common_catalog::table_context::TableContext; use databend_common_column::bitmap::Bitmap; use databend_common_exception::ErrorCode; @@ -78,6 +81,7 @@ use crate::pipelines::processors::transforms::hash_join::FixedKeyHashJoinHashTab use crate::pipelines::processors::transforms::hash_join::HashJoinHashTable; use crate::pipelines::processors::transforms::hash_join::SerializerHashJoinHashTable; use crate::pipelines::processors::transforms::hash_join::SingleBinaryHashJoinHashTable; +use crate::pipelines::processors::transforms::RemoteRuntimeFilters; use crate::pipelines::processors::HashJoinState; use crate::sessions::QueryContext; @@ -117,6 +121,14 @@ pub struct HashJoinBuildState { /// Spill related states. pub(crate) memory_settings: MemorySettings, + pub(crate) rf_channels: Option, +} + +pub struct RuntimeFilterChannels { + /// send runtime filter to `RuntimeFilterSourceProcessor` + pub(crate) rf_src_send: Sender, + /// receive runtime filter from `RuntimeFilterSinkProcessor` + pub(crate) rf_sink_recv: Receiver, } impl HashJoinBuildState { @@ -128,6 +140,7 @@ impl HashJoinBuildState { build_projections: &ColumnSet, hash_join_state: Arc, num_threads: usize, + rf_channels: Option, ) -> Result> { let hash_key_types = build_keys .iter() @@ -164,6 +177,7 @@ impl HashJoinBuildState { build_hash_table_tasks: Default::default(), mutex: Default::default(), memory_settings, + rf_channels, })) } @@ -286,6 +300,7 @@ impl HashJoinBuildState { .build_watcher .send(HashTableType::Empty) .map_err(|_| ErrorCode::TokioError("build_watcher channel is closed"))?; + self.send_runtime_filter_meta(None)?; self.set_bloom_filter_ready(false)?; return Ok(()); } @@ -837,14 +852,16 @@ impl HashJoinBuildState { fn add_runtime_filter(&self, build_chunks: &[DataBlock], build_num_rows: usize) -> Result<()> { let mut bloom_filter_ready = false; + let mut runtime_filters = HashMap::new(); for rf in self.runtime_filter_desc() { - let mut runtime_filter = RuntimeFilterInfo::default(); + let mut runtime_filter = RuntimeFiltersForScan::default(); if rf.enable_inlist_runtime_filter && build_num_rows < INLIST_RUNTIME_FILTER_THRESHOLD { self.inlist_runtime_filter( &mut runtime_filter, build_chunks, &rf.build_key, &rf.probe_key, + rf.id, )?; } if rf.enable_bloom_runtime_filter { @@ -853,6 +870,7 @@ impl HashJoinBuildState { &mut runtime_filter, &rf.build_key, &rf.probe_key, + rf.id, )?; } if rf.enable_min_max_runtime_filter { @@ -861,13 +879,15 @@ impl HashJoinBuildState { &mut runtime_filter, &rf.build_key, &rf.probe_key, + rf.id, )?; } if !runtime_filter.is_empty() { bloom_filter_ready |= !runtime_filter.is_blooms_empty(); - self.ctx.set_runtime_filter((rf.scan_id, runtime_filter)); + runtime_filters.insert(rf.scan_id, runtime_filter); } } + self.send_runtime_filter_meta(Some(runtime_filters))?; self.set_bloom_filter_ready(bloom_filter_ready)?; Ok(()) } @@ -875,9 +895,10 @@ impl HashJoinBuildState { fn bloom_runtime_filter( &self, data_blocks: &[DataBlock], - runtime_filter: &mut RuntimeFilterInfo, + runtime_filter: &mut RuntimeFiltersForScan, build_key: &Expr, probe_key: &Expr, + rf_id: usize, ) -> Result<()> { if !build_key.data_type().remove_nullable().is_number() && !build_key.data_type().remove_nullable().is_string() @@ -915,23 +936,24 @@ impl HashJoinBuildState { hashes_vec.push(hash); }); let filter = BinaryFuse16::try_from(&hashes_vec)?; - runtime_filter.add_bloom((id.to_string(), filter)); + runtime_filter.add_bloom(rf_id, (id.to_string(), filter)); Ok(()) } fn inlist_runtime_filter( &self, - runtime_filter: &mut RuntimeFilterInfo, + runtime_filter: &mut RuntimeFiltersForScan, data_blocks: &[DataBlock], build_key: &Expr, probe_key: &Expr, + rf_id: usize, ) -> Result<()> { if let Some(distinct_build_column) = dedup_build_key_column(&self.func_ctx, data_blocks, build_key)? { if let Some(filter) = inlist_filter(probe_key, distinct_build_column.clone())? { info!("inlist_filter: {:?}", filter.sql_display()); - runtime_filter.add_inlist(filter); + runtime_filter.add_inlist(rf_id, filter); } } Ok(()) @@ -940,9 +962,10 @@ impl HashJoinBuildState { fn min_max_runtime_filter( &self, data_blocks: &[DataBlock], - runtime_filter: &mut RuntimeFilterInfo, + runtime_filter: &mut RuntimeFiltersForScan, build_key: &Expr, probe_key: &Expr, + rf_id: usize, ) -> Result<()> { if !build_key.runtime_filter_supported_types() { return Ok(()); @@ -1035,7 +1058,7 @@ impl HashJoinBuildState { }; if let Some(min_max_filter) = min_max_filter { info!("min_max_filter: {:?}", min_max_filter.sql_display()); - runtime_filter.add_min_max(min_max_filter); + runtime_filter.add_min_max(rf_id, min_max_filter); } } Ok(()) @@ -1068,4 +1091,26 @@ impl HashJoinBuildState { .iter() .any(|rf| rf.enable_min_max_runtime_filter) } + + fn send_runtime_filter_meta( + &self, + mut rf: Option>, + ) -> Result<()> { + if let Some(channels) = self.rf_channels.as_ref() { + channels + .rf_src_send + .send_blocking(rf.into()) + .map_err(|_| ErrorCode::TokioError("send runtime filter meta failed"))?; + channels.rf_src_send.close(); + let merged_rf = channels + .rf_sink_recv + .recv_blocking() + .map_err(|_| ErrorCode::TokioError("receive runtime filter meta failed"))?; + rf = merged_rf.into(); + } + for (scan_id, runtime_filter) in rf.unwrap_or_default().into_iter() { + self.ctx.set_runtime_filter((scan_id, runtime_filter)); + } + Ok(()) + } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs index d50ee45b166eb..1337e15f35cf3 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs @@ -31,6 +31,7 @@ mod util; pub use desc::HashJoinDesc; pub use hash_join_build_state::HashJoinBuildState; +pub use hash_join_build_state::RuntimeFilterChannels; pub use hash_join_probe_state::HashJoinProbeState; pub use hash_join_spiller::HashJoinSpiller; pub use hash_join_state::*; diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index da8b50455878f..904af8803524b 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -15,6 +15,7 @@ pub mod aggregator; mod hash_join; pub(crate) mod range_join; +mod runtime_filter; mod runtime_pool; mod transform_add_computed_columns; mod transform_add_const_columns; @@ -41,6 +42,9 @@ mod transform_udf_server; mod window; pub use hash_join::*; +pub use runtime_filter::RemoteRuntimeFilters; +pub use runtime_filter::RuntimeFilterSinkProcessor; +pub use runtime_filter::RuntimeFilterSourceProcessor; pub use transform_add_computed_columns::TransformAddComputedColumns; pub use transform_add_const_columns::TransformAddConstColumns; pub use transform_add_internal_columns::TransformAddInternalColumns; diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs new file mode 100644 index 0000000000000..4f29c19377521 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs @@ -0,0 +1,294 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_channel::Receiver; +use async_channel::Sender; +use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::BlockMetaInfo; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_expression::FunctionID; +use databend_common_expression::RemoteExpr; +use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_sinks::AsyncSink; +use databend_common_pipeline_sinks::AsyncSinker; +use databend_common_pipeline_sources::AsyncSource; +use databend_common_pipeline_sources::AsyncSourcer; + +pub struct RuntimeFilterSourceProcessor { + pub receiver: Receiver, +} + +impl RuntimeFilterSourceProcessor { + pub fn create( + ctx: Arc, + receiver: Receiver, + output_port: Arc, + ) -> Result { + AsyncSourcer::create(ctx, output_port, Self { receiver }) + } +} + +#[async_trait::async_trait] +impl AsyncSource for RuntimeFilterSourceProcessor { + const NAME: &'static str = "RuntimeFilterSource"; + const SKIP_EMPTY_DATA_BLOCK: bool = false; + + #[async_backtrace::framed] + async fn generate(&mut self) -> Result> { + let start = std::time::Instant::now(); + log::info!("RuntimeFilterSource recv() start"); + let rf = self.receiver.recv().await; + log::info!( + "RuntimeFilterSource recv() take {:?},get {}", + start.elapsed(), + rf.is_ok() + ); + match rf { + Ok(runtime_filter) => Ok(Some(DataBlock::empty_with_meta(Box::new(runtime_filter)))), + Err(_) => { + // The channel is closed, we should return None to stop generating + Ok(None) + } + } + } +} + +pub struct RuntimeFilterSinkProcessor { + node_num: usize, + recv_num: usize, + rf: Vec, + sender: Sender, +} + +impl RuntimeFilterSinkProcessor { + pub fn create( + input: Arc, + node_num: usize, + sender: Sender, + ) -> Result { + Ok(ProcessorPtr::create(AsyncSinker::create(input, Self { + node_num, + recv_num: 0, + rf: vec![], + sender, + }))) + } +} + +impl RuntimeFilterSinkProcessor {} + +#[async_trait::async_trait] +impl AsyncSink for RuntimeFilterSinkProcessor { + const NAME: &'static str = "RuntimeFilterSink"; + + async fn on_finish(&mut self) -> Result<()> { + Ok(()) + } + + async fn consume(&mut self, mut data_block: DataBlock) -> Result { + let ptr = data_block + .take_meta() + .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to RuntimeFilterMeta"))?; + let runtime_filter = RemoteRuntimeFilters::downcast_from(ptr) + .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to RuntimeFilterMeta"))?; + log::info!( + "RuntimeFilterSinkProcessor recv runtime filter: {:?}", + runtime_filter + ); + self.recv_num += 1; + self.rf.push(runtime_filter); + let all_recv = self.node_num == self.recv_num; + if all_recv { + let merged_rf = RemoteRuntimeFilters::merge(&self.rf); + self.sender.send(merged_rf).await.map_err(|_| { + ErrorCode::Internal("RuntimeFilterSinkProcessor failed to send runtime filter") + })?; + } + Ok(all_recv) + } +} + +/// One-to-one correspondence with HashJoin operator. +/// +/// When the build side is empty, `scan_id_to_runtime_filter` is `None`. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)] +pub struct RemoteRuntimeFilters { + scan_id_to_runtime_filter: Option>, +} + +impl From>> for RemoteRuntimeFilters { + fn from(rfs: Option>) -> Self { + RemoteRuntimeFilters { + scan_id_to_runtime_filter: rfs.map(|rfs| { + rfs.into_iter() + .map(|(scan_id, runtime_filter)| (scan_id, runtime_filter.into())) + .collect() + }), + } + } +} + +impl From for Option> { + fn from(rfs: RemoteRuntimeFilters) -> Self { + rfs.scan_id_to_runtime_filter.map(|rfs| { + rfs.into_iter() + .map(|(scan_id, runtime_filter)| (scan_id, runtime_filter.into())) + .collect() + }) + } +} + +impl From for RuntimeFiltersForScan { + fn from(rfs: RemoteRuntimeFiltersForScan) -> Self { + Self { + inlist: rfs + .rf_id_to_inlist + .into_iter() + .map(|(id, expr)| (id, expr.as_expr(&BUILTIN_FUNCTIONS))) + .collect(), + min_max: rfs + .rf_id_to_min_max + .into_iter() + .map(|(id, expr)| (id, expr.as_expr(&BUILTIN_FUNCTIONS))) + .collect(), + bloom: Default::default(), + } + } +} + +impl From for RemoteRuntimeFiltersForScan { + fn from(rfs: RuntimeFiltersForScan) -> Self { + Self { + rf_id_to_inlist: rfs + .inlist + .iter() + .map(|(id, expr)| (*id, expr.as_remote_expr())) + .collect(), + rf_id_to_min_max: rfs + .min_max + .iter() + .map(|(id, expr)| (*id, expr.as_remote_expr())) + .collect(), + } + } +} +impl RemoteRuntimeFilters { + pub fn merge(rfs: &[RemoteRuntimeFilters]) -> Self { + log::info!("start merge runtime filters: {:?}", rfs); + let rfs = rfs + .iter() + .filter_map(|rfs| rfs.scan_id_to_runtime_filter.as_ref()) + .collect::>(); + + if rfs.is_empty() { + return RemoteRuntimeFilters::default(); + } + + let mut common_scans: Vec = rfs[0].keys().cloned().collect(); + for rf in &rfs[1..] { + common_scans.retain(|scan_id| rf.contains_key(scan_id)); + } + + let mut merged = HashMap::new(); + + for scan_id in common_scans { + let mut merged_for_scan = RemoteRuntimeFiltersForScan::default(); + let first_scan = &rfs[0][&scan_id]; + + let mut common_inlist_ids: Vec = + first_scan.rf_id_to_inlist.keys().cloned().collect(); + let mut common_min_max_ids: Vec = + first_scan.rf_id_to_min_max.keys().cloned().collect(); + for rf in &rfs[1..] { + let scan_filter = &rf[&scan_id]; + common_inlist_ids.retain(|id| scan_filter.rf_id_to_inlist.contains_key(id)); + common_min_max_ids.retain(|id| scan_filter.rf_id_to_min_max.contains_key(id)); + } + + for rf_id in &common_inlist_ids { + let mut exprs = Vec::new(); + for rf in rfs.iter() { + exprs.push(rf[&scan_id].rf_id_to_inlist[rf_id].clone()); + } + log::info!("merge inlist: {:?}, rf_id: {:?}", exprs, rf_id); + let merged_expr = exprs + .into_iter() + .reduce(|acc, expr| RemoteExpr::FunctionCall { + span: None, + id: Box::new(FunctionID::Builtin { + name: "or".to_string(), + id: 1, + }), + generics: vec![], + args: vec![acc, expr], + return_type: DataType::Nullable(Box::new(DataType::Boolean)), + }) + .unwrap(); + merged_for_scan.rf_id_to_inlist.insert(*rf_id, merged_expr); + } + + for rf_id in &common_min_max_ids { + let mut exprs = Vec::new(); + for rf in rfs.iter() { + exprs.push(rf[&scan_id].rf_id_to_min_max[rf_id].clone()); + } + log::info!("merge min_max: {:?}, rf_id: {:?}", exprs, rf_id); + let merged_expr = exprs + .into_iter() + .reduce(|acc, expr| RemoteExpr::FunctionCall { + span: None, + id: Box::new(FunctionID::Builtin { + name: "or".to_string(), + id: 1, + }), + generics: vec![], + args: vec![acc, expr], + return_type: DataType::Nullable(Box::new(DataType::Boolean)), + }) + .unwrap(); + merged_for_scan.rf_id_to_min_max.insert(*rf_id, merged_expr); + } + + merged.insert(scan_id, merged_for_scan); + } + + RemoteRuntimeFilters { + scan_id_to_runtime_filter: Some(merged), + } + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)] +pub struct RemoteRuntimeFiltersForScan { + rf_id_to_inlist: HashMap>, + rf_id_to_min_max: HashMap>, +} + +#[typetag::serde(name = "runtime_filters_for_join")] +impl BlockMetaInfo for RemoteRuntimeFilters { + fn clone_self(&self) -> Box { + Box::new(self.clone()) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs index 126ae6c959b5c..4516180f93220 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs @@ -351,7 +351,9 @@ async fn create_memory_table_for_cte_scan( | PhysicalPlan::ChunkFillAndReorder(_) | PhysicalPlan::ChunkAppendData(_) | PhysicalPlan::ChunkMerge(_) - | PhysicalPlan::ChunkCommitInsert(_) => {} + | PhysicalPlan::ChunkCommitInsert(_) + | PhysicalPlan::RuntimeFilterSource(_) + | PhysicalPlan::RuntimeFilterSink(_) => {} } Ok(()) } diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index a205a4ec6e963..161117159256d 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -123,9 +123,13 @@ impl Fragmenter { pub fn build_fragment(mut self, plan: &PhysicalPlan) -> Result { let root = self.replace(plan)?; + let fragment_type = match plan { + PhysicalPlan::RuntimeFilterSink(_) => FragmentType::Intermediate, + _ => FragmentType::Root, + }; let mut root_fragment = PlanFragment { plan: root, - fragment_type: FragmentType::Root, + fragment_type, fragment_id: self.ctx.get_fragment_id(), exchange: None, query_id: self.query_id.clone(), @@ -218,6 +222,11 @@ impl PhysicalPlanReplacer for Fragmenter { fragments.append(&mut self.fragments); let probe_input = self.replace(plan.probe.as_ref())?; fragments.append(&mut self.fragments); + + let runtime_filter_plan = match &plan.runtime_filter_plan { + Some(runtime_filter_plan) => Some(Box::new(self.replace(runtime_filter_plan)?)), + None => None, + }; self.fragments = fragments; Ok(PhysicalPlan::HashJoin(HashJoin { @@ -241,7 +250,9 @@ impl PhysicalPlanReplacer for Fragmenter { broadcast: plan.broadcast, single_to_inner: plan.single_to_inner.clone(), build_side_cache_info: plan.build_side_cache_info.clone(), - runtime_filter: plan.runtime_filter.clone(), + runtime_filter_desc: plan.runtime_filter_desc.clone(), + runtime_filter_plan, + join_id: plan.join_id, })) } diff --git a/src/query/service/src/schedulers/fragments/query_fragment_actions.rs b/src/query/service/src/schedulers/fragments/query_fragment_actions.rs index dece1fdfb21c5..9fa10c1585bfc 100644 --- a/src/query/service/src/schedulers/fragments/query_fragment_actions.rs +++ b/src/query/service/src/schedulers/fragments/query_fragment_actions.rs @@ -126,12 +126,16 @@ impl QueryFragmentsActions { self.ctx.get_cluster().local_id() } - pub fn get_root_actions(&self) -> Result<&QueryFragmentActions> { - self.fragments_actions.last().ok_or_else(|| { - ErrorCode::Internal( - "Logical error, call get_root_actions in empty QueryFragmentsActions", - ) - }) + pub fn get_root_fragment_ids(&self) -> Result> { + let mut fragment_ids = Vec::new(); + for fragment_actions in &self.fragments_actions { + let plan = &fragment_actions.fragment_actions[0].physical_plan; + if !matches!(plan, PhysicalPlan::ExchangeSink(_)) { + fragment_ids.push(fragment_actions.fragment_id); + } + } + + Ok(fragment_ids) } pub fn pop_root_actions(&mut self) -> Option { diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs index f3259edae81c6..6a6224d788341 100644 --- a/src/query/service/src/schedulers/scheduler.rs +++ b/src/query/service/src/schedulers/scheduler.rs @@ -99,11 +99,15 @@ pub async fn build_distributed_pipeline( ctx: &Arc, plan: &PhysicalPlan, ) -> Result { - let fragmenter = Fragmenter::try_create(ctx.clone())?; - - let root_fragment = fragmenter.build_fragment(plan)?; let mut fragments_actions = QueryFragmentsActions::create(ctx.clone()); - root_fragment.get_actions(ctx.clone(), &mut fragments_actions)?; + for plan in collect_runtime_filter_broadcast_plans(plan)? + .iter() + .chain(std::iter::once(plan)) + { + let fragmenter = Fragmenter::try_create(ctx.clone())?; + let root_fragment = fragmenter.build_fragment(plan)?; + root_fragment.get_actions(ctx.clone(), &mut fragments_actions)?; + } let exchange_manager = ctx.get_exchange_manager(); @@ -123,6 +127,27 @@ pub async fn build_distributed_pipeline( } } +fn collect_runtime_filter_broadcast_plans(plan: &PhysicalPlan) -> Result> { + let mut runtime_filter_broadcast_plans = Vec::new(); + + let mut collect_runtime_filter_broadcast_plans = |plan: &PhysicalPlan| { + if let PhysicalPlan::HashJoin(hash_join) = plan { + if let Some(runtime_filter_plan) = &hash_join.runtime_filter_plan { + runtime_filter_broadcast_plans.push(runtime_filter_plan.as_ref().clone()); + } + } + }; + + PhysicalPlan::traverse( + plan, + &mut |_| true, + &mut collect_runtime_filter_broadcast_plans, + &mut |_| {}, + ); + + Ok(runtime_filter_broadcast_plans) +} + pub struct ServiceQueryExecutor { ctx: Arc, } diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 1eb5adb336efd..64b0e23c23617 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -57,7 +57,6 @@ use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; use crate::pipelines::PipelineBuilder; -use crate::schedulers::QueryFragmentActions; use crate::schedulers::QueryFragmentsActions; use crate::servers::flight::v1::actions::init_query_fragments; use crate::servers::flight::v1::actions::INIT_QUERY_FRAGMENTS; @@ -443,7 +442,7 @@ impl DataExchangeManager { retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, }; - let root_actions = actions.get_root_actions()?; + let mut root_fragment_ids = actions.get_root_fragment_ids()?; let conf = GlobalConfig::instance(); // Initialize query env between cluster nodes @@ -466,7 +465,8 @@ impl DataExchangeManager { } // Get local pipeline of local task - let build_res = self.get_root_pipeline(ctx, root_actions)?; + let main_fragment_id = root_fragment_ids.pop().unwrap(); + let build_res = self.get_root_pipeline(ctx, main_fragment_id, root_fragment_ids)?; let prepared_query = actions.prepared_query()?; let _: HashMap = cluster @@ -479,10 +479,10 @@ impl DataExchangeManager { fn get_root_pipeline( &self, ctx: Arc, - root_actions: &QueryFragmentActions, + main_fragment_id: usize, + fragment_ids: Vec, ) -> Result { let query_id = ctx.get_id(); - let fragment_id = root_actions.fragment_id; let queries_coordinator_guard = self.queries_coordinator.lock(); let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() }; @@ -490,10 +490,34 @@ impl DataExchangeManager { match queries_coordinator.get_mut(&query_id) { None => Err(ErrorCode::Internal("Query not exists.")), Some(query_coordinator) => { - assert!(query_coordinator.fragment_exchanges.is_empty()); + assert!( + query_coordinator.fragment_exchanges.is_empty(), + "query_coordinator.fragment_exchanges is not empty: {:?}", + query_coordinator + .fragment_exchanges + .keys() + .collect::>() + ); let injector = DefaultExchangeInjector::create(); - let mut build_res = - query_coordinator.subscribe_fragment(&ctx, fragment_id, injector)?; + let mut build_res = query_coordinator.subscribe_fragment( + &ctx, + main_fragment_id, + injector.clone(), + )?; + + for fragment_id in fragment_ids { + let sub_build_res = query_coordinator.subscribe_fragment( + &ctx, + fragment_id, + injector.clone(), + )?; + build_res + .sources_pipelines + .push(sub_build_res.main_pipeline); + build_res + .sources_pipelines + .extend(sub_build_res.sources_pipelines); + } let exchanges = std::mem::take(&mut query_coordinator.statistics_exchanges); let statistics_receiver = StatisticsReceiver::spawn_receiver(&ctx, exchanges)?; @@ -775,16 +799,18 @@ impl QueryCoordinator { return Ok(fragment_coordinator.pipeline_build_res.unwrap()); } - let exchange_params = fragment_coordinator.create_exchange_params( - info, - fragment_coordinator - .pipeline_build_res - .as_ref() - .map(|x| x.exchange_injector.clone()) - .ok_or_else(|| { - ErrorCode::Internal("Pipeline build result is none, It's a bug") - })?, - )?; + let exchange_params = fragment_coordinator + .create_exchange_params( + info, + fragment_coordinator + .pipeline_build_res + .as_ref() + .map(|x| x.exchange_injector.clone()) + .ok_or_else(|| { + ErrorCode::Internal("Pipeline build result is none, It's a bug") + })?, + )? + .unwrap(); let mut build_res = fragment_coordinator.pipeline_build_res.unwrap(); // Add exchange data transform. @@ -854,12 +880,19 @@ impl QueryCoordinator { if let Some(mut build_res) = coordinator.pipeline_build_res.take() { build_res.set_max_threads(max_threads as usize); - if !build_res.main_pipeline.is_pulling_pipeline()? { - return Err(ErrorCode::Internal("Logical error, It's a bug")); - } - - // Add exchange data publisher. - ExchangeSink::via(&info.query_ctx, ¶ms, &mut build_res.main_pipeline)?; + if build_res.main_pipeline.is_pulling_pipeline()? { + let Some(params) = params else { + return Err(ErrorCode::Internal( + "pipeline is pulling pipeline, but exchange params is none", + )); + }; + // Add exchange data publisher. + ExchangeSink::via(&info.query_ctx, ¶ms, &mut build_res.main_pipeline)?; + } else if build_res.main_pipeline.is_complete_pipeline()? && params.is_some() { + return Err(ErrorCode::Internal( + "pipeline is complete pipeline, but exchange params is some", + )); + }; if !build_res.main_pipeline.is_complete_pipeline()? { return Err(ErrorCode::Internal("Logical error, It's a bug")); @@ -938,48 +971,47 @@ impl FragmentCoordinator { &self, info: &QueryInfo, exchange_injector: Arc, - ) -> Result { - if let Some(data_exchange) = &self.data_exchange { - return match data_exchange { - DataExchange::Merge(exchange) => { - Ok(ExchangeParams::MergeExchange(MergeExchangeParams { - exchange_injector: exchange_injector.clone(), - schema: self.physical_plan.output_schema()?, - fragment_id: self.fragment_id, - query_id: info.query_id.to_string(), - destination_id: exchange.destination_id.clone(), - allow_adjust_parallelism: exchange.allow_adjust_parallelism, - ignore_exchange: exchange.ignore_exchange, - })) - } - DataExchange::Broadcast(exchange) => { - Ok(ExchangeParams::ShuffleExchange(ShuffleExchangeParams { - exchange_injector: exchange_injector.clone(), - schema: self.physical_plan.output_schema()?, - fragment_id: self.fragment_id, - query_id: info.query_id.to_string(), - executor_id: info.current_executor.to_string(), - destination_ids: exchange.destination_ids.to_owned(), - shuffle_scatter: exchange_injector - .flight_scatter(&info.query_ctx, data_exchange)?, - })) - } - DataExchange::ShuffleDataExchange(exchange) => { - Ok(ExchangeParams::ShuffleExchange(ShuffleExchangeParams { - exchange_injector: exchange_injector.clone(), - schema: self.physical_plan.output_schema()?, - fragment_id: self.fragment_id, - query_id: info.query_id.to_string(), - executor_id: info.current_executor.to_string(), - destination_ids: exchange.destination_ids.to_owned(), - shuffle_scatter: exchange_injector - .flight_scatter(&info.query_ctx, data_exchange)?, - })) - } - }; + ) -> Result> { + let Some(data_exchange) = &self.data_exchange else { + return Ok(None); + }; + match data_exchange { + DataExchange::Merge(exchange) => { + Ok(Some(ExchangeParams::MergeExchange(MergeExchangeParams { + exchange_injector: exchange_injector.clone(), + schema: self.physical_plan.output_schema()?, + fragment_id: self.fragment_id, + query_id: info.query_id.to_string(), + destination_id: exchange.destination_id.clone(), + allow_adjust_parallelism: exchange.allow_adjust_parallelism, + ignore_exchange: exchange.ignore_exchange, + }))) + } + DataExchange::Broadcast(exchange) => Ok(Some(ExchangeParams::ShuffleExchange( + ShuffleExchangeParams { + exchange_injector: exchange_injector.clone(), + schema: self.physical_plan.output_schema()?, + fragment_id: self.fragment_id, + query_id: info.query_id.to_string(), + executor_id: info.current_executor.to_string(), + destination_ids: exchange.destination_ids.to_owned(), + shuffle_scatter: exchange_injector + .flight_scatter(&info.query_ctx, data_exchange)?, + }, + ))), + DataExchange::ShuffleDataExchange(exchange) => Ok(Some( + ExchangeParams::ShuffleExchange(ShuffleExchangeParams { + exchange_injector: exchange_injector.clone(), + schema: self.physical_plan.output_schema()?, + fragment_id: self.fragment_id, + query_id: info.query_id.to_string(), + executor_id: info.current_executor.to_string(), + destination_ids: exchange.destination_ids.to_owned(), + shuffle_scatter: exchange_injector + .flight_scatter(&info.query_ctx, data_exchange)?, + }), + )), } - - Err(ErrorCode::Internal("Cannot find data exchange.")) } pub fn prepare_pipeline(&mut self, ctx: Arc) -> Result<()> { diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs index abebc2ba6a254..91bdf774f9bf8 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs @@ -70,10 +70,10 @@ impl AsyncSink for ExchangeWriterSink { async fn consume(&mut self, mut data_block: DataBlock) -> Result { let serialize_meta = match data_block.take_meta() { None => Err(ErrorCode::Internal( - "ExchangeWriterSink only recv ExchangeSerializeMeta.", + "ExchangeWriterSink only recv ExchangeSerializeMeta, but got none.", )), Some(block_meta) => ExchangeSerializeMeta::downcast_from(block_meta).ok_or_else(|| { - ErrorCode::Internal("ExchangeWriterSink only recv ExchangeSerializeMeta.") + ErrorCode::Internal("ExchangeWriterSink only recv ExchangeSerializeMeta") }), }?; diff --git a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs index 5a757f37ba299..11d17ff6641fe 100644 --- a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs +++ b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs @@ -159,11 +159,6 @@ impl BlockMetaTransform for TransformScatterExchangeSeriali fn transform(&mut self, meta: ExchangeShuffleMeta) -> Result> { let mut new_blocks = Vec::with_capacity(meta.blocks.len()); for (index, block) in meta.blocks.into_iter().enumerate() { - if block.is_empty() { - new_blocks.push(block); - continue; - } - new_blocks.push(match self.local_pos == index { true => block, false => serialize_block(0, block, &self.options)?, diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 458882d1561df..e44a7ad6c3914 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -28,6 +28,8 @@ use std::time::Instant; use std::time::SystemTime; use std::time::UNIX_EPOCH; +use async_channel::Receiver; +use async_channel::Sender; use chrono_tz::Tz; use dashmap::mapref::multiple::RefMulti; use dashmap::DashMap; @@ -52,8 +54,8 @@ use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::StageTableInfo; use databend_common_catalog::query_kind::QueryKind; -use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; +use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table_args::TableArgs; use databend_common_catalog::table_context::ContextError; @@ -135,6 +137,7 @@ use crate::clusters::Cluster; use crate::clusters::ClusterHelper; use crate::locks::LockManager; use crate::pipelines::executor::PipelineExecutor; +use crate::pipelines::processors::transforms::RemoteRuntimeFilters; use crate::servers::flight::v1::exchange::DataExchangeManager; use crate::sessions::query_affect::QueryAffect; use crate::sessions::query_ctx_shared::MemoryUpdater; @@ -281,6 +284,22 @@ impl QueryContext { } } + pub fn rf_src_recv(&self, join_id: u32) -> Receiver { + self.shared.rf_src_recv(join_id) + } + + pub fn rf_src_send(&self, join_id: u32) -> Sender { + self.shared.rf_src_send(join_id) + } + + pub fn rf_sink_recv(&self, join_id: u32) -> Receiver { + self.shared.rf_sink_recv(join_id) + } + + pub fn rf_sink_send(&self, join_id: u32) -> Sender { + self.shared.rf_sink_send(join_id) + } + pub fn attach_table(&self, catalog: &str, database: &str, name: &str, table: Arc) { self.shared.attach_table(catalog, database, name, table) } @@ -1381,21 +1400,21 @@ impl TableContext for QueryContext { runtime_filters.clear(); } - fn set_runtime_filter(&self, filters: (IndexType, RuntimeFilterInfo)) { + fn set_runtime_filter(&self, filters: (IndexType, RuntimeFiltersForScan)) { let mut runtime_filters = self.shared.runtime_filters.write(); match runtime_filters.entry(filters.0) { Entry::Vacant(v) => { v.insert(filters.1); } Entry::Occupied(mut v) => { - for filter in filters.1.get_inlist() { - v.get_mut().add_inlist(filter.clone()); + for (rf_id, filter) in filters.1.inlist.into_iter() { + v.get_mut().add_inlist(rf_id, filter); } - for filter in filters.1.get_min_max() { - v.get_mut().add_min_max(filter.clone()); + for (rf_id, filter) in filters.1.min_max.into_iter() { + v.get_mut().add_min_max(rf_id, filter); } - for filter in filters.1.blooms() { - v.get_mut().add_bloom(filter); + for (rf_id, filter) in filters.1.bloom.into_iter() { + v.get_mut().add_bloom(rf_id, filter); } } } @@ -1446,7 +1465,7 @@ impl TableContext for QueryContext { fn get_bloom_runtime_filter_with_id(&self, id: IndexType) -> Vec<(String, BinaryFuse16)> { let runtime_filters = self.shared.runtime_filters.read(); match runtime_filters.get(&id) { - Some(v) => (v.get_bloom()).clone(), + Some(v) => v.bloom.values().cloned().collect(), None => vec![], } } @@ -1454,7 +1473,7 @@ impl TableContext for QueryContext { fn get_inlist_runtime_filter_with_id(&self, id: IndexType) -> Vec> { let runtime_filters = self.shared.runtime_filters.read(); match runtime_filters.get(&id) { - Some(v) => (v.get_inlist()).clone(), + Some(v) => v.inlist.values().cloned().collect(), None => vec![], } } @@ -1462,14 +1481,14 @@ impl TableContext for QueryContext { fn get_min_max_runtime_filter_with_id(&self, id: IndexType) -> Vec> { let runtime_filters = self.shared.runtime_filters.read(); match runtime_filters.get(&id) { - Some(v) => (v.get_min_max()).clone(), + Some(v) => v.min_max.values().cloned().collect(), None => vec![], } } fn has_bloom_runtime_filters(&self, id: usize) -> bool { if let Some(runtime_filter) = self.shared.runtime_filters.read().get(&id) { - return !runtime_filter.get_bloom().is_empty(); + return !runtime_filter.bloom.is_empty(); } false } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 2ab2f3c864d8f..6d74defe45344 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -23,6 +23,8 @@ use std::sync::Weak; use std::time::Duration; use std::time::SystemTime; +use async_channel::Receiver; +use async_channel::Sender; use dashmap::DashMap; use databend_common_base::base::short_sql; use databend_common_base::base::Progress; @@ -35,8 +37,8 @@ use databend_common_catalog::catalog::CatalogManager; use databend_common_catalog::merge_into_join::MergeIntoJoin; use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::query_kind::QueryKind; -use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; +use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table_context::ContextError; use databend_common_catalog::table_context::StageAttachment; @@ -68,6 +70,7 @@ use uuid::Uuid; use crate::clusters::Cluster; use crate::clusters::ClusterDiscovery; use crate::pipelines::executor::PipelineExecutor; +use crate::pipelines::processors::transforms::RemoteRuntimeFilters; use crate::sessions::query_affect::QueryAffect; use crate::sessions::Session; use crate::storages::Table; @@ -144,7 +147,7 @@ pub struct QueryContextShared { pub(in crate::sessions) query_profiles: Arc, PlanProfile>>>, - pub(in crate::sessions) runtime_filters: Arc>>, + pub(in crate::sessions) runtime_filters: Arc>>, pub(in crate::sessions) runtime_filter_ready: Arc>>>>, @@ -169,9 +172,19 @@ pub struct QueryContextShared { // Used by hilbert clustering when do recluster. pub(in crate::sessions) selected_segment_locs: Arc>>, + // join_id -> (sender, receiver) + pub(in crate::sessions) rf_source: Arc>>, + + pub(in crate::sessions) rf_sink: Arc>>, + pub(in crate::sessions) pruned_partitions_stats: Arc>>, } +type RuntimeFilterSourceChannel = ( + Option>, + Option>, +); + impl QueryContextShared { pub fn try_create( session: Arc, @@ -235,10 +248,58 @@ impl QueryContextShared { mem_stat: Arc::new(RwLock::new(None)), node_memory_usage: Arc::new(RwLock::new(HashMap::new())), selected_segment_locs: Default::default(), + rf_source: Arc::new(Mutex::new(HashMap::new())), + rf_sink: Arc::new(Mutex::new(HashMap::new())), pruned_partitions_stats: Arc::new(RwLock::new(None)), })) } + pub fn rf_src_recv(&self, join_id: u32) -> Receiver { + let mut rf_source = self.rf_source.lock(); + match rf_source.get_mut(&join_id).map(|(_, receiver)| receiver) { + Some(receiver) => receiver.take().unwrap(), + None => { + let (sender, receiver) = async_channel::unbounded(); + rf_source.insert(join_id, (Some(sender), None)); + receiver + } + } + } + pub fn rf_src_send(&self, join_id: u32) -> Sender { + let mut rf_source = self.rf_source.lock(); + match rf_source.get_mut(&join_id).map(|(sender, _)| sender) { + Some(sender) => sender.take().unwrap(), + None => { + let (sender, receiver) = async_channel::unbounded(); + rf_source.insert(join_id, (None, Some(receiver))); + sender + } + } + } + + pub fn rf_sink_recv(&self, join_id: u32) -> Receiver { + let mut rf_sink = self.rf_sink.lock(); + match rf_sink.get_mut(&join_id).map(|(_, receiver)| receiver) { + Some(receiver) => receiver.take().unwrap(), + None => { + let (sender, receiver) = async_channel::unbounded(); + rf_sink.insert(join_id, (Some(sender), None)); + receiver + } + } + } + pub fn rf_sink_send(&self, join_id: u32) -> Sender { + let mut rf_sink = self.rf_sink.lock(); + match rf_sink.get_mut(&join_id).map(|(sender, _)| sender) { + Some(sender) => sender.take().unwrap(), + None => { + let (sender, receiver) = async_channel::unbounded(); + rf_sink.insert(join_id, (None, Some(receiver))); + sender + } + } + } + pub fn set_error(&self, err: ErrorCode) { let err = err.with_context("query context error"); diff --git a/src/query/service/tests/it/pipelines/builders/runtime_filter.rs b/src/query/service/tests/it/pipelines/builders/runtime_filter.rs index ce1e81871f3f1..b739612ea5a44 100644 --- a/src/query/service/tests/it/pipelines/builders/runtime_filter.rs +++ b/src/query/service/tests/it/pipelines/builders/runtime_filter.rs @@ -100,6 +100,7 @@ async fn join_build_state( &join.build_projections, join_state.clone(), 1, + None, )?; Ok(build_state) } diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index 0c2bebe146f42..272c96e97b748 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -31,8 +31,8 @@ use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::Partitions; use databend_common_catalog::query_kind::QueryKind; -use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; +use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::ContextError; @@ -924,7 +924,7 @@ impl TableContext for CtxDelegation { todo!() } - fn set_runtime_filter(&self, _filters: (IndexType, RuntimeFilterInfo)) { + fn set_runtime_filter(&self, _filters: (IndexType, RuntimeFiltersForScan)) { todo!() } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index e41d6c7c2fb73..eeed126767ae1 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -30,8 +30,8 @@ use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::Partitions; use databend_common_catalog::query_kind::QueryKind; -use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; +use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::ContextError; @@ -789,7 +789,7 @@ impl TableContext for CtxDelegation { todo!() } - fn set_runtime_filter(&self, _filters: (IndexType, RuntimeFilterInfo)) { + fn set_runtime_filter(&self, _filters: (IndexType, RuntimeFiltersForScan)) { todo!() } diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index b7166308a7855..c0d37da3cf022 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -26,7 +26,7 @@ use databend_common_pipeline_core::processors::PlanProfile; use itertools::Itertools; use super::physical_plans::AddStreamColumn; -use super::PhysicalRuntimeFilter; +use super::RemoteRuntimeFilterDesc; use crate::binder::MutationType; use crate::executor::explain::PlanStatsInfo; use crate::executor::physical_plans::AggregateExpand; @@ -340,7 +340,7 @@ pub fn format_partial_tree( } struct FormatContext { - scan_id_to_runtime_filters: HashMap>, + scan_id_to_runtime_filters: HashMap>, } #[recursive::recursive] @@ -521,6 +521,12 @@ fn to_format_tree( PhysicalPlan::AsyncFunction(plan) => { async_function_to_format_tree(plan, metadata, profs, context) } + PhysicalPlan::RuntimeFilterSource(_plan) => { + Ok(FormatTreeNode::new("RuntimeFilterSource".to_string())) + } + PhysicalPlan::RuntimeFilterSink(_plan) => { + Ok(FormatTreeNode::new("RuntimeFilterSink".to_string())) + } } } @@ -1517,7 +1523,7 @@ fn hash_join_to_format_tree( profs: &HashMap, context: &mut FormatContext, ) -> Result> { - for rf in plan.runtime_filter.filters.iter() { + for rf in plan.runtime_filter_desc.filters.iter() { context .scan_id_to_runtime_filters .entry(rf.scan_id) @@ -1551,7 +1557,7 @@ fn hash_join_to_format_tree( probe_child.payload = format!("{}(Probe)", probe_child.payload); let mut build_runtime_filters = vec![]; - for rf in plan.runtime_filter.filters.iter() { + for rf in plan.runtime_filter_desc.filters.iter() { let mut s = format!( "filter id:{}, build key:{}, probe key:{}, filter type:", rf.id, diff --git a/src/query/sql/src/executor/mod.rs b/src/query/sql/src/executor/mod.rs index 73f2b65710b82..72925a854d6a8 100644 --- a/src/query/sql/src/executor/mod.rs +++ b/src/query/sql/src/executor/mod.rs @@ -27,6 +27,6 @@ pub use physical_plan::PhysicalPlan; pub use physical_plan_builder::MutationBuildInfo; pub use physical_plan_builder::PhysicalPlanBuilder; pub use physical_plan_visitor::PhysicalPlanReplacer; -pub use physical_plans::PhysicalRuntimeFilter; -pub use physical_plans::PhysicalRuntimeFilters; +pub use physical_plans::RemoteRuntimeFilterDesc; +pub use physical_plans::RemoteRuntimeFiltersDesc; pub use util::*; diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index 0c81455aa489f..bc7191c62cc56 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -31,6 +31,8 @@ use super::physical_plans::MutationManipulate; use super::physical_plans::MutationOrganize; use super::physical_plans::MutationSource; use super::physical_plans::MutationSplit; +use super::physical_plans::RuntimeFilterSink; +use super::physical_plans::RuntimeFilterSource; use crate::executor::physical_plans::AggregateExpand; use crate::executor::physical_plans::AggregateFinal; use crate::executor::physical_plans::AggregatePartial; @@ -155,6 +157,10 @@ pub enum PhysicalPlan { // async function call AsyncFunction(AsyncFunction), + + // runtime filter + RuntimeFilterSource(RuntimeFilterSource), + RuntimeFilterSink(RuntimeFilterSink), } impl PhysicalPlan { @@ -407,6 +413,15 @@ impl PhysicalPlan { *next_id += 1; plan.input.adjust_plan_id(next_id); } + PhysicalPlan::RuntimeFilterSource(plan) => { + plan.plan_id = *next_id; + *next_id += 1; + } + PhysicalPlan::RuntimeFilterSink(plan) => { + plan.plan_id = *next_id; + *next_id += 1; + plan.input.adjust_plan_id(next_id); + } } } @@ -463,6 +478,8 @@ impl PhysicalPlan { PhysicalPlan::ChunkMerge(v) => v.plan_id, PhysicalPlan::ChunkCommitInsert(v) => v.plan_id, PhysicalPlan::RecursiveCteScan(v) => v.plan_id, + PhysicalPlan::RuntimeFilterSource(v) => v.plan_id, + PhysicalPlan::RuntimeFilterSink(v) => v.plan_id, } } @@ -508,6 +525,8 @@ impl PhysicalPlan { | PhysicalPlan::CommitSink(_) | PhysicalPlan::DistributedInsertSelect(_) | PhysicalPlan::Recluster(_) + | PhysicalPlan::RuntimeFilterSource(_) + | PhysicalPlan::RuntimeFilterSink(_) | PhysicalPlan::HilbertPartition(_) => Ok(DataSchemaRef::default()), PhysicalPlan::Duplicate(plan) => plan.input.output_schema(), PhysicalPlan::Shuffle(plan) => plan.input.output_schema(), @@ -579,6 +598,8 @@ impl PhysicalPlan { PhysicalPlan::ChunkAppendData(_) => "WriteData".to_string(), PhysicalPlan::ChunkMerge(_) => "ChunkMerge".to_string(), PhysicalPlan::ChunkCommitInsert(_) => "Commit".to_string(), + PhysicalPlan::RuntimeFilterSource(_) => "RuntimeFilterSource".to_string(), + PhysicalPlan::RuntimeFilterSink(_) => "RuntimeFilterSink".to_string(), } } @@ -591,7 +612,8 @@ impl PhysicalPlan { | PhysicalPlan::CompactSource(_) | PhysicalPlan::ReplaceAsyncSourcer(_) | PhysicalPlan::Recluster(_) - | PhysicalPlan::RecursiveCteScan(_) => Box::new(std::iter::empty()), + | PhysicalPlan::RecursiveCteScan(_) + | PhysicalPlan::RuntimeFilterSource(_) => Box::new(std::iter::empty()), PhysicalPlan::HilbertPartition(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::EvalScalar(plan) => Box::new(std::iter::once(plan.input.as_ref())), @@ -609,6 +631,7 @@ impl PhysicalPlan { PhysicalPlan::ExpressionScan(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::Exchange(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::ExchangeSink(plan) => Box::new(std::iter::once(plan.input.as_ref())), + PhysicalPlan::RuntimeFilterSink(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::UnionAll(plan) => Box::new( std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), ), @@ -663,6 +686,7 @@ impl PhysicalPlan { | PhysicalPlan::CompactSource(_) | PhysicalPlan::ReplaceAsyncSourcer(_) | PhysicalPlan::Recluster(_) + | PhysicalPlan::RuntimeFilterSource(_) | PhysicalPlan::RecursiveCteScan(_) => Box::new(std::iter::empty()), PhysicalPlan::HilbertPartition(plan) => Box::new(std::iter::once(plan.input.as_mut())), PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_mut())), @@ -723,6 +747,7 @@ impl PhysicalPlan { CopyIntoTableSource::Query(v) => Box::new(std::iter::once(v.as_mut())), CopyIntoTableSource::Stage(v) => Box::new(std::iter::once(v.as_mut())), }, + PhysicalPlan::RuntimeFilterSink(plan) => Box::new(std::iter::once(plan.input.as_mut())), } } @@ -778,7 +803,9 @@ impl PhysicalPlan { | PhysicalPlan::ChunkFillAndReorder(_) | PhysicalPlan::ChunkAppendData(_) | PhysicalPlan::ChunkMerge(_) - | PhysicalPlan::ChunkCommitInsert(_) => None, + | PhysicalPlan::ChunkCommitInsert(_) + | PhysicalPlan::RuntimeFilterSource(_) + | PhysicalPlan::RuntimeFilterSink(_) => None, } } diff --git a/src/query/sql/src/executor/physical_plan_builder.rs b/src/query/sql/src/executor/physical_plan_builder.rs index c50adc9f30db4..b25b6a84b9b02 100644 --- a/src/query/sql/src/executor/physical_plan_builder.rs +++ b/src/query/sql/src/executor/physical_plan_builder.rs @@ -39,6 +39,7 @@ pub struct PhysicalPlanBuilder { pub(crate) dry_run: bool, // DataMutation info, used to build MergeInto physical plan pub(crate) mutation_build_info: Option, + pub(crate) next_hash_join_id: u32, } impl PhysicalPlanBuilder { @@ -50,6 +51,7 @@ impl PhysicalPlanBuilder { func_ctx, dry_run, mutation_build_info: None, + next_hash_join_id: 0, } } diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 8a1ced834b01a..031c8359dfc1a 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -22,6 +22,8 @@ use super::physical_plans::MutationManipulate; use super::physical_plans::MutationOrganize; use super::physical_plans::MutationSplit; use super::physical_plans::RecursiveCteScan; +use super::physical_plans::RuntimeFilterSink; +use super::physical_plans::RuntimeFilterSource; use crate::executor::physical_plan::PhysicalPlan; use crate::executor::physical_plans::AggregateExpand; use crate::executor::physical_plans::AggregateFinal; @@ -120,9 +122,27 @@ pub trait PhysicalPlanReplacer { PhysicalPlan::ChunkAppendData(plan) => self.replace_chunk_append_data(plan), PhysicalPlan::ChunkMerge(plan) => self.replace_chunk_merge(plan), PhysicalPlan::ChunkCommitInsert(plan) => self.replace_chunk_commit_insert(plan), + PhysicalPlan::RuntimeFilterSource(plan) => self.replace_runtime_filter_source(plan), + PhysicalPlan::RuntimeFilterSink(plan) => self.replace_runtime_filter_sink(plan), } } + fn replace_runtime_filter_source( + &mut self, + plan: &RuntimeFilterSource, + ) -> Result { + Ok(PhysicalPlan::RuntimeFilterSource(plan.clone())) + } + + fn replace_runtime_filter_sink(&mut self, plan: &RuntimeFilterSink) -> Result { + let input = self.replace(&plan.input)?; + Ok(PhysicalPlan::RuntimeFilterSink(RuntimeFilterSink { + plan_id: plan.plan_id, + join_id: plan.join_id, + input: Box::new(input), + })) + } + fn replace_recluster(&mut self, plan: &Recluster) -> Result { Ok(PhysicalPlan::Recluster(Box::new(plan.clone()))) } @@ -253,6 +273,12 @@ pub trait PhysicalPlanReplacer { let build = self.replace(&plan.build)?; let probe = self.replace(&plan.probe)?; + let runtime_filter_plan = if let Some(runtime_filter_plan) = &plan.runtime_filter_plan { + Some(Box::new(self.replace(runtime_filter_plan)?)) + } else { + None + }; + Ok(PhysicalPlan::HashJoin(HashJoin { plan_id: plan.plan_id, projections: plan.projections.clone(), @@ -271,10 +297,12 @@ pub trait PhysicalPlanReplacer { output_schema: plan.output_schema.clone(), need_hold_hash_table: plan.need_hold_hash_table, stat_info: plan.stat_info.clone(), - runtime_filter: plan.runtime_filter.clone(), + runtime_filter_desc: plan.runtime_filter_desc.clone(), broadcast: plan.broadcast, single_to_inner: plan.single_to_inner.clone(), build_side_cache_info: plan.build_side_cache_info.clone(), + runtime_filter_plan, + join_id: plan.join_id, })) } @@ -646,7 +674,8 @@ impl PhysicalPlan { | PhysicalPlan::HilbertPartition(_) | PhysicalPlan::ExchangeSource(_) | PhysicalPlan::CompactSource(_) - | PhysicalPlan::MutationSource(_) => {} + | PhysicalPlan::MutationSource(_) + | PhysicalPlan::RuntimeFilterSource(_) => {} PhysicalPlan::Filter(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } @@ -772,6 +801,9 @@ impl PhysicalPlan { PhysicalPlan::ChunkCommitInsert(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } + PhysicalPlan::RuntimeFilterSink(plan) => { + Self::traverse(&plan.input, pre_visit, visit, post_visit); + } } post_visit(plan); } diff --git a/src/query/sql/src/executor/physical_plans/mod.rs b/src/query/sql/src/executor/physical_plans/mod.rs index ab3e282abec34..82a40fe356181 100644 --- a/src/query/sql/src/executor/physical_plans/mod.rs +++ b/src/query/sql/src/executor/physical_plans/mod.rs @@ -82,8 +82,10 @@ pub use physical_filter::Filter; pub use physical_hash_join::HashJoin; pub use physical_join::PhysicalJoinType; pub use physical_join_filter::JoinRuntimeFilter; -pub use physical_join_filter::PhysicalRuntimeFilter; -pub use physical_join_filter::PhysicalRuntimeFilters; +pub use physical_join_filter::RemoteRuntimeFilterDesc; +pub use physical_join_filter::RemoteRuntimeFiltersDesc; +pub use physical_join_filter::RuntimeFilterSink; +pub use physical_join_filter::RuntimeFilterSource; pub use physical_limit::Limit; pub use physical_multi_table_insert::*; pub use physical_mutation::*; diff --git a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs index c2b7cc21c1b72..dd9cc320e6cee 100644 --- a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs +++ b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs @@ -27,8 +27,9 @@ use databend_common_expression::DataSchemaRefExt; use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; +use super::physical_join_filter::build_runtime_filter_plan; +use super::physical_join_filter::RemoteRuntimeFiltersDesc; use super::JoinRuntimeFilter; -use super::PhysicalRuntimeFilters; use crate::executor::explain::PlanStatsInfo; use crate::executor::physical_plans::Exchange; use crate::executor::physical_plans::FragmentKind; @@ -107,7 +108,9 @@ pub struct HashJoin { // a HashMap for mapping the column indexes to the BlockEntry indexes in DataBlock. pub build_side_cache_info: Option<(usize, HashMap)>, - pub runtime_filter: PhysicalRuntimeFilters, + pub runtime_filter_desc: RemoteRuntimeFiltersDesc, + pub runtime_filter_plan: Option>, + pub join_id: u32, } impl HashJoin { @@ -815,7 +818,9 @@ impl PhysicalPlanBuilder { probe_to_build: Vec<(usize, (bool, bool))>, output_schema: DataSchemaRef, build_side_cache_info: Option<(usize, HashMap)>, - runtime_filter: PhysicalRuntimeFilters, + runtime_filter_desc: RemoteRuntimeFiltersDesc, + runtime_filter_plan: Option>, + join_id: u32, stat_info: PlanStatsInfo, ) -> Result { Ok(PhysicalPlan::HashJoin(HashJoin { @@ -839,7 +844,9 @@ impl PhysicalPlanBuilder { broadcast: is_broadcast, single_to_inner: join.single_to_inner.clone(), build_side_cache_info, - runtime_filter, + runtime_filter_desc, + runtime_filter_plan, + join_id, })) } @@ -915,7 +922,7 @@ impl PhysicalPlanBuilder { let non_equi_conditions = self.process_non_equi_conditions(join, &merged_schema)?; // Step 11: Build runtime filter - let runtime_filter = self + let runtime_filter_desc = self .build_runtime_filter( join, s_expr, @@ -925,6 +932,18 @@ impl PhysicalPlanBuilder { ) .await?; + let join_id = self.next_hash_join_id; + self.next_hash_join_id += 1; + + let runtime_filter_plan = if !runtime_filter_desc.filters.is_empty() + && !self.ctx.get_cluster().is_empty() + && !is_broadcast + { + Some(build_runtime_filter_plan(join_id)?) + } else { + None + }; + // Step 12: Create and return the HashJoin self.create_hash_join( join, @@ -941,7 +960,9 @@ impl PhysicalPlanBuilder { probe_to_build, output_schema, build_side_cache_info, - runtime_filter, + runtime_filter_desc, + runtime_filter_plan, + join_id, stat_info, ) } @@ -953,8 +974,8 @@ impl PhysicalPlanBuilder { is_broadcast: bool, build_keys: &[RemoteExpr], probe_keys: Vec, usize, usize)>>, - ) -> Result { - JoinRuntimeFilter::build_runtime_filter( + ) -> Result { + JoinRuntimeFilter::build_runtime_filter_desc( self.ctx.clone(), &self.metadata, join, diff --git a/src/query/sql/src/executor/physical_plans/physical_join_filter.rs b/src/query/sql/src/executor/physical_plans/physical_join_filter.rs index b28d0b581bbee..fa13a877f584d 100644 --- a/src/query/sql/src/executor/physical_plans/physical_join_filter.rs +++ b/src/query/sql/src/executor/physical_plans/physical_join_filter.rs @@ -22,6 +22,9 @@ use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_storages_common_table_meta::table::get_change_type; +use super::Exchange; +use super::FragmentKind; +use crate::executor::PhysicalPlan; use crate::optimizer::ir::RelExpr; use crate::optimizer::ir::SExpr; use crate::plans::Join; @@ -30,12 +33,12 @@ use crate::IndexType; use crate::MetadataRef; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)] -pub struct PhysicalRuntimeFilters { - pub filters: Vec, +pub struct RemoteRuntimeFiltersDesc { + pub filters: Vec, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct PhysicalRuntimeFilter { +pub struct RemoteRuntimeFilterDesc { pub id: usize, pub build_key: RemoteExpr, pub probe_key: RemoteExpr, @@ -136,7 +139,7 @@ impl JoinRuntimeFilter { } /// Build runtime filters for a join operation - pub async fn build_runtime_filter( + pub async fn build_runtime_filter_desc( ctx: Arc, metadata: &MetadataRef, join: &Join, @@ -144,18 +147,12 @@ impl JoinRuntimeFilter { is_broadcast: bool, build_keys: &[RemoteExpr], probe_keys: Vec, usize, usize)>>, - ) -> Result { + ) -> Result { // Early return if runtime filters are not supported for this join type if !Self::supported_join_type_for_runtime_filter(&join.join_type) { return Ok(Default::default()); } - // For cluster, only support runtime filter for broadcast join - let is_cluster = !ctx.get_cluster().is_empty(); - if is_cluster && !is_broadcast { - return Ok(Default::default()); - } - let mut filters = Vec::new(); // Process each probe key that has runtime filter information @@ -177,6 +174,8 @@ impl JoinRuntimeFilter { // Determine which filter types to enable based on data type and statistics let enable_bloom_runtime_filter = { + // shuffle join does not support bloom runtime filter for now + let is_shuffle = !ctx.get_cluster().is_empty() && !is_broadcast; let is_supported_type = Self::is_type_supported_for_bloom_filter(&data_type); let enable_bloom_runtime_filter_based_on_stats = Self::adjust_bloom_runtime_filter( ctx.clone(), @@ -185,14 +184,14 @@ impl JoinRuntimeFilter { s_expr, ) .await?; - is_supported_type && enable_bloom_runtime_filter_based_on_stats + !is_shuffle && is_supported_type && enable_bloom_runtime_filter_based_on_stats }; let enable_min_max_runtime_filter = Self::is_type_supported_for_min_max_filter(&data_type); // Create and add the runtime filter - let runtime_filter = PhysicalRuntimeFilter { + let runtime_filter = RemoteRuntimeFilterDesc { id, build_key: build_key.clone(), probe_key, @@ -204,6 +203,40 @@ impl JoinRuntimeFilter { filters.push(runtime_filter); } - Ok(PhysicalRuntimeFilters { filters }) + Ok(RemoteRuntimeFiltersDesc { filters }) } } + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct RuntimeFilterSource { + pub plan_id: u32, + pub join_id: u32, +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct RuntimeFilterSink { + pub plan_id: u32, + pub join_id: u32, + pub input: Box, +} + +pub fn build_runtime_filter_plan(join_id: u32) -> Result> { + let runtime_filter_source = Box::new(PhysicalPlan::RuntimeFilterSource(RuntimeFilterSource { + plan_id: 0, + join_id, + })); + let exchange = Box::new(PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: runtime_filter_source, + kind: FragmentKind::Expansive, + keys: vec![], + allow_adjust_parallelism: true, + ignore_exchange: false, + })); + let runtime_filter_sink = Box::new(PhysicalPlan::RuntimeFilterSink(RuntimeFilterSink { + plan_id: 0, + input: exchange, + join_id, + })); + Ok(runtime_filter_sink) +}