From 71b069b95a39c638915e81ca8f516f9636b3cbec Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 17 Apr 2025 19:51:19 +0800 Subject: [PATCH 01/22] update --- src/query/catalog/src/runtime_filter_info.rs | 8 ++ .../src/pipelines/builders/builder_join.rs | 5 + .../builders/builder_runtime_filter.rs | 41 ++++++ .../service/src/pipelines/builders/mod.rs | 1 + .../service/src/pipelines/pipeline_builder.rs | 2 + .../processors/transforms/hash_join/desc.rs | 14 +- .../hash_join/hash_join_build_state.rs | 14 ++ .../pipelines/processors/transforms/mod.rs | 4 + .../processors/transforms/runtime_filter.rs | 124 ++++++++++++++++++ .../transform_recursive_cte_source.rs | 4 +- .../src/schedulers/fragments/fragmenter.rs | 27 +++- src/query/service/src/schedulers/scheduler.rs | 46 +++++++ .../flight/v1/exchange/exchange_manager.rs | 43 ++++++ src/query/service/src/sessions/query_ctx.rs | 11 ++ .../service/src/sessions/query_ctx_shared.rs | 35 +++++ .../it/pipelines/builders/runtime_filter.rs | 1 + src/query/sql/src/executor/format.rs | 14 +- src/query/sql/src/executor/mod.rs | 4 +- src/query/sql/src/executor/physical_plan.rs | 29 +++- .../sql/src/executor/physical_plan_builder.rs | 2 + .../sql/src/executor/physical_plan_visitor.rs | 35 ++++- .../sql/src/executor/physical_plans/mod.rs | 6 +- .../physical_plans/physical_hash_join.rs | 71 +++++++--- .../physical_plans/physical_runtime_filter.rs | 27 ++++ 24 files changed, 530 insertions(+), 38 deletions(-) create mode 100644 src/query/service/src/pipelines/builders/builder_runtime_filter.rs create mode 100644 src/query/service/src/pipelines/processors/transforms/runtime_filter.rs create mode 100644 src/query/sql/src/executor/physical_plans/physical_runtime_filter.rs diff --git a/src/query/catalog/src/runtime_filter_info.rs b/src/query/catalog/src/runtime_filter_info.rs index 5caeae4090789..e59e3e6ede80e 100644 --- a/src/query/catalog/src/runtime_filter_info.rs +++ b/src/query/catalog/src/runtime_filter_info.rs @@ -58,10 +58,18 @@ impl RuntimeFilterInfo { self.inlist } + pub fn inlists_ref(&self) -> &Vec> { + &self.inlist + } + pub fn min_maxs(self) -> Vec> { self.min_max } + pub fn min_maxs_ref(&self) -> &Vec> { + &self.min_max + } + pub fn is_empty(&self) -> bool { self.inlist.is_empty() && self.bloom.is_empty() && self.min_max.is_empty() } diff --git a/src/query/service/src/pipelines/builders/builder_join.rs b/src/query/service/src/pipelines/builders/builder_join.rs index 5dc183497928d..41e333978f70e 100644 --- a/src/query/service/src/pipelines/builders/builder_join.rs +++ b/src/query/service/src/pipelines/builders/builder_join.rs @@ -102,6 +102,7 @@ impl PipelineBuilder { self.hash_join_states .insert(build_cache_index, state.clone()); } + self.expand_build_side_pipeline(&join.build, join, state.clone())?; self.build_join_probe(join, state)?; @@ -153,6 +154,10 @@ impl PipelineBuilder { &hash_join_plan.build_projections, join_state.clone(), output_len, + hash_join_plan + .runtime_filter_plan + .as_ref() + .map(|_| self.ctx.rf_src_send(hash_join_plan.join_id)), )?; build_state.add_runtime_filter_ready(); diff --git a/src/query/service/src/pipelines/builders/builder_runtime_filter.rs b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs new file mode 100644 index 0000000000000..21333b7da954f --- /dev/null +++ b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs @@ -0,0 +1,41 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_sql::executor::physical_plans::RuntimeFilterSink; +use databend_common_sql::executor::physical_plans::RuntimeFilterSource; + +use crate::pipelines::processors::transforms::RuntimeFilterSinkProcessor; +use crate::pipelines::processors::transforms::RuntimeFilterSourceProcessor; +use crate::pipelines::PipelineBuilder; + +impl PipelineBuilder { + pub(crate) fn build_runtime_filter_source( + &mut self, + _source: &RuntimeFilterSource, + ) -> Result<()> { + let receiver = self.ctx.rf_src_recv(_source.join_id); + self.main_pipeline.add_source( + |output| { + RuntimeFilterSourceProcessor::create(self.ctx.clone(), receiver.clone(), output) + }, + 1, + ) + } + + pub(crate) fn build_runtime_filter_sink(&mut self, _sink: &RuntimeFilterSink) -> Result<()> { + self.main_pipeline + .add_sink(RuntimeFilterSinkProcessor::create) + } +} diff --git a/src/query/service/src/pipelines/builders/mod.rs b/src/query/service/src/pipelines/builders/mod.rs index 8a2e5a481a349..3118b795f8b47 100644 --- a/src/query/service/src/pipelines/builders/mod.rs +++ b/src/query/service/src/pipelines/builders/mod.rs @@ -40,6 +40,7 @@ mod builder_recluster; mod builder_recursive_cte; mod builder_replace_into; mod builder_row_fetch; +mod builder_runtime_filter; mod builder_scalar; mod builder_scan; mod builder_sort; diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 1763a9cc6dfc6..d453a509932af 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -258,6 +258,8 @@ impl PipelineBuilder { PhysicalPlan::ColumnMutation(column_mutation) => { self.build_column_mutation(column_mutation) } + PhysicalPlan::RuntimeFilterSource(source) => self.build_runtime_filter_source(source), + PhysicalPlan::RuntimeFilterSink(sink) => self.build_runtime_filter_sink(sink), }?; self.is_exchange_neighbor = is_exchange_neighbor; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs index 428ad2f03ae38..86c60cf72b992 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs @@ -20,8 +20,8 @@ use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_sql::executor::cast_expr_to_non_null_boolean; use databend_common_sql::executor::physical_plans::HashJoin; -use databend_common_sql::executor::PhysicalRuntimeFilter; -use databend_common_sql::executor::PhysicalRuntimeFilters; +use databend_common_sql::executor::RemoteRuntimeFilterDesc; +use databend_common_sql::executor::RemoteRuntimeFiltersDesc; use parking_lot::RwLock; use crate::sql::plans::JoinType; @@ -67,16 +67,16 @@ pub struct RuntimeFiltersDesc { pub filters: Vec, } -impl From<&PhysicalRuntimeFilters> for RuntimeFiltersDesc { - fn from(runtime_filter: &PhysicalRuntimeFilters) -> Self { +impl From<&RemoteRuntimeFiltersDesc> for RuntimeFiltersDesc { + fn from(runtime_filter: &RemoteRuntimeFiltersDesc) -> Self { Self { filters: runtime_filter.filters.iter().map(|rf| rf.into()).collect(), } } } -impl From<&PhysicalRuntimeFilter> for RuntimeFilterDesc { - fn from(runtime_filter: &PhysicalRuntimeFilter) -> Self { +impl From<&RemoteRuntimeFilterDesc> for RuntimeFilterDesc { + fn from(runtime_filter: &RemoteRuntimeFilterDesc) -> Self { Self { _id: runtime_filter.id, build_key: runtime_filter.build_key.as_expr(&BUILTIN_FUNCTIONS), @@ -117,7 +117,7 @@ impl HashJoinDesc { from_correlated_subquery: join.from_correlated_subquery, broadcast: join.broadcast, single_to_inner: join.single_to_inner.clone(), - runtime_filter: (&join.runtime_filter).into(), + runtime_filter: (&join.runtime_filter_desc).into(), }) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index 0af13ee98f003..dbb17ee0395d3 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -20,6 +20,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; +use async_channel::Sender; use databend_common_base::base::tokio::sync::Barrier; use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; @@ -78,6 +79,7 @@ use crate::pipelines::processors::transforms::hash_join::FixedKeyHashJoinHashTab use crate::pipelines::processors::transforms::hash_join::HashJoinHashTable; use crate::pipelines::processors::transforms::hash_join::SerializerHashJoinHashTable; use crate::pipelines::processors::transforms::hash_join::SingleBinaryHashJoinHashTable; +use crate::pipelines::processors::transforms::RuntimeFilterMeta; use crate::pipelines::processors::HashJoinState; use crate::sessions::QueryContext; @@ -117,6 +119,7 @@ pub struct HashJoinBuildState { /// Spill related states. pub(crate) memory_settings: MemorySettings, + pub(crate) runtime_filter_sender: Option>, } impl HashJoinBuildState { @@ -128,6 +131,7 @@ impl HashJoinBuildState { build_projections: &ColumnSet, hash_join_state: Arc, num_threads: usize, + rf_src_send: Option>, ) -> Result> { let hash_key_types = build_keys .iter() @@ -164,6 +168,7 @@ impl HashJoinBuildState { build_hash_table_tasks: Default::default(), mutex: Default::default(), memory_settings, + runtime_filter_sender: rf_src_send, })) } @@ -865,6 +870,7 @@ impl HashJoinBuildState { } if !runtime_filter.is_empty() { bloom_filter_ready |= !runtime_filter.is_blooms_empty(); + self.send_runtime_filter_meta(&runtime_filter)?; self.ctx.set_runtime_filter((rf.scan_id, runtime_filter)); } } @@ -1068,4 +1074,12 @@ impl HashJoinBuildState { .iter() .any(|rf| rf.enable_min_max_runtime_filter) } + + fn send_runtime_filter_meta(&self, runtime_filter_info: &RuntimeFilterInfo) -> Result<()> { + if let Some(sender) = self.runtime_filter_sender.as_ref() { + sender.send_blocking(runtime_filter_info.into()).unwrap(); + sender.close(); + } + Ok(()) + } } diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index 1b144a4f01050..c83acf6168dca 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -15,6 +15,7 @@ pub mod aggregator; mod hash_join; pub(crate) mod range_join; +mod runtime_filter; mod runtime_pool; mod transform_add_computed_columns; mod transform_add_const_columns; @@ -41,6 +42,9 @@ mod transform_udf_server; mod window; pub use hash_join::*; +pub use runtime_filter::RuntimeFilterMeta; +pub use runtime_filter::RuntimeFilterSinkProcessor; +pub use runtime_filter::RuntimeFilterSourceProcessor; pub use transform_add_computed_columns::TransformAddComputedColumns; pub use transform_add_const_columns::TransformAddConstColumns; pub use transform_add_internal_columns::TransformAddInternalColumns; diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs new file mode 100644 index 0000000000000..cf59c7cd1e38a --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs @@ -0,0 +1,124 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_channel::Receiver; +use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfo; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_expression::RemoteExpr; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_sinks::AsyncSink; +use databend_common_pipeline_sinks::AsyncSinker; +use databend_common_pipeline_sources::AsyncSource; +use databend_common_pipeline_sources::AsyncSourcer; + +pub struct RuntimeFilterSourceProcessor { + pub meta_receiver: Receiver, +} + +impl RuntimeFilterSourceProcessor { + pub fn create( + ctx: Arc, + receiver: Receiver, + output_port: Arc, + ) -> Result { + AsyncSourcer::create(ctx, output_port, Self { + meta_receiver: receiver, + }) + } +} + +#[async_trait::async_trait] +impl AsyncSource for RuntimeFilterSourceProcessor { + const NAME: &'static str = "RuntimeFilterSource"; + const SKIP_EMPTY_DATA_BLOCK: bool = false; + + #[async_backtrace::framed] + async fn generate(&mut self) -> Result> { + match self.meta_receiver.recv().await { + Ok(runtime_filter) => Ok(Some(DataBlock::empty_with_meta(Box::new( + RuntimeFilterMeta { + inlist: runtime_filter.inlist, + min_max: runtime_filter.min_max, + }, + )))), + Err(_) => { + // The channel is closed, we should return None to stop generating + Ok(None) + } + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct RuntimeFilterMeta { + inlist: Vec>, + min_max: Vec>, +} + +#[typetag::serde(name = "runtime_filter_meta")] +impl BlockMetaInfo for RuntimeFilterMeta {} + +impl From<&RuntimeFilterInfo> for RuntimeFilterMeta { + fn from(runtime_filter_info: &RuntimeFilterInfo) -> Self { + Self { + inlist: runtime_filter_info + .inlists_ref() + .iter() + .map(|expr| expr.as_remote_expr()) + .collect(), + min_max: runtime_filter_info + .min_maxs_ref() + .iter() + .map(|expr| expr.as_remote_expr()) + .collect(), + } + } +} +pub struct RuntimeFilterSinkProcessor {} + +impl RuntimeFilterSinkProcessor { + pub fn create(input: Arc) -> Result { + Ok(ProcessorPtr::create(AsyncSinker::create(input, Self {}))) + } +} + +impl RuntimeFilterSinkProcessor {} + +#[async_trait::async_trait] +impl AsyncSink for RuntimeFilterSinkProcessor { + const NAME: &'static str = "RuntimeFilterSink"; + + async fn on_finish(&mut self) -> Result<()> { + Ok(()) + } + + async fn consume(&mut self, mut data_block: DataBlock) -> Result { + let ptr = data_block + .take_meta() + .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to RuntimeFilterMeta"))?; + let runtime_filter = RuntimeFilterMeta::downcast_from(ptr) + .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to RuntimeFilterMeta"))?; + println!("runtime_filter: {:?}", runtime_filter); + Ok(false) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs index f357612ae7ed6..f146dd5b8671c 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs @@ -348,7 +348,9 @@ async fn create_memory_table_for_cte_scan( | PhysicalPlan::ChunkFillAndReorder(_) | PhysicalPlan::ChunkAppendData(_) | PhysicalPlan::ChunkMerge(_) - | PhysicalPlan::ChunkCommitInsert(_) => {} + | PhysicalPlan::ChunkCommitInsert(_) + | PhysicalPlan::RuntimeFilterSource(_) + | PhysicalPlan::RuntimeFilterSink(_) => {} } Ok(()) } diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index a205a4ec6e963..80132824aa545 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -29,6 +29,7 @@ use databend_common_sql::executor::physical_plans::HashJoin; use databend_common_sql::executor::physical_plans::MutationSource; use databend_common_sql::executor::physical_plans::Recluster; use databend_common_sql::executor::physical_plans::ReplaceInto; +use databend_common_sql::executor::physical_plans::RuntimeFilterSink; use databend_common_sql::executor::physical_plans::TableScan; use databend_common_sql::executor::physical_plans::UnionAll; use databend_common_sql::executor::PhysicalPlanReplacer; @@ -210,6 +211,23 @@ impl PhysicalPlanReplacer for Fragmenter { Ok(PhysicalPlan::CompactSource(Box::new(plan.clone()))) } + fn replace_runtime_filter_sink(&mut self, plan: &RuntimeFilterSink) -> Result { + let input = self.replace(&plan.input)?; + let sink = PhysicalPlan::RuntimeFilterSink(RuntimeFilterSink { + plan_id: plan.plan_id, + input: Box::new(input), + }); + self.fragments.push(PlanFragment { + plan: sink.clone(), + fragment_type: FragmentType::Intermediate, + fragment_id: self.ctx.get_fragment_id(), + exchange: None, + query_id: self.query_id.clone(), + source_fragments: vec![self.fragments.last().unwrap().clone()], + }); + Ok(sink) + } + fn replace_hash_join(&mut self, plan: &HashJoin) -> Result { let mut fragments = vec![]; let build_input = self.replace(plan.build.as_ref())?; @@ -218,6 +236,11 @@ impl PhysicalPlanReplacer for Fragmenter { fragments.append(&mut self.fragments); let probe_input = self.replace(plan.probe.as_ref())?; fragments.append(&mut self.fragments); + + let runtime_filter_plan = match &plan.runtime_filter_plan { + Some(runtime_filter_plan) => Some(Box::new(self.replace(runtime_filter_plan)?)), + None => None, + }; self.fragments = fragments; Ok(PhysicalPlan::HashJoin(HashJoin { @@ -241,7 +264,9 @@ impl PhysicalPlanReplacer for Fragmenter { broadcast: plan.broadcast, single_to_inner: plan.single_to_inner.clone(), build_side_cache_info: plan.build_side_cache_info.clone(), - runtime_filter: plan.runtime_filter.clone(), + runtime_filter_desc: plan.runtime_filter_desc.clone(), + runtime_filter_plan, + join_id: plan.join_id, })) } diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs index f3259edae81c6..cd635066f6a6a 100644 --- a/src/query/service/src/schedulers/scheduler.rs +++ b/src/query/service/src/schedulers/scheduler.rs @@ -20,6 +20,7 @@ use databend_common_expression::DataBlock; use databend_common_sql::planner::QueryExecutor; use databend_common_sql::Planner; use futures_util::TryStreamExt; +use log::info; use crate::interpreters::InterpreterFactory; use crate::pipelines::executor::ExecutorSettings; @@ -99,6 +100,8 @@ pub async fn build_distributed_pipeline( ctx: &Arc, plan: &PhysicalPlan, ) -> Result { + let runtime_filter_broadcast_plans = collect_runtime_filter_broadcast_plans(plan)?; + start_runtime_filter_broadcast(ctx, &runtime_filter_broadcast_plans).await?; let fragmenter = Fragmenter::try_create(ctx.clone())?; let root_fragment = fragmenter.build_fragment(plan)?; @@ -123,6 +126,49 @@ pub async fn build_distributed_pipeline( } } +async fn start_runtime_filter_broadcast( + ctx: &Arc, + runtime_filter_broadcast_plans: &[PhysicalPlan], +) -> Result<()> { + if runtime_filter_broadcast_plans.is_empty() { + return Ok(()); + } + + for plan in runtime_filter_broadcast_plans { + let fragmenter = Fragmenter::try_create(ctx.clone())?; + let root_fragment = fragmenter.build_fragment(plan)?; + let mut fragments_actions = QueryFragmentsActions::create(ctx.clone()); + root_fragment.get_actions(ctx.clone(), &mut fragments_actions)?; + + let exchange_manager = ctx.get_exchange_manager(); + exchange_manager + .commit_actions_without_return_pipeline(ctx.clone(), fragments_actions) + .await?; + } + Ok(()) +} + +fn collect_runtime_filter_broadcast_plans(plan: &PhysicalPlan) -> Result> { + let mut runtime_filter_broadcast_plans = Vec::new(); + + let mut collect_runtime_filter_broadcast_plans = |plan: &PhysicalPlan| { + if let PhysicalPlan::HashJoin(hash_join) = plan { + if let Some(runtime_filter_plan) = &hash_join.runtime_filter_plan { + runtime_filter_broadcast_plans.push(runtime_filter_plan.as_ref().clone()); + } + } + }; + + PhysicalPlan::traverse( + &plan, + &mut |_| true, + &mut collect_runtime_filter_broadcast_plans, + &mut |_| {}, + ); + + Ok(runtime_filter_broadcast_plans) +} + pub struct ServiceQueryExecutor { ctx: Arc, } diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index d158ccf3c9b89..4bd58630c6b80 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -409,6 +409,49 @@ impl DataExchangeManager { } } + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn commit_actions_without_return_pipeline( + &self, + ctx: Arc, + actions: QueryFragmentsActions, + ) -> Result<()> { + let settings = ctx.get_settings(); + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_flight_max_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; + let root_actions = actions.get_root_actions()?; + let conf = GlobalConfig::instance(); + + // Initialize query env between cluster nodes + let query_env = actions.get_query_env()?; + query_env.init(&ctx, flight_params).await?; + + // Submit distributed tasks to all nodes. + let cluster = ctx.get_cluster(); + let mut query_fragments = actions.get_query_fragments()?; + + let local_fragments = query_fragments.remove(&conf.query.node_id); + + let _: HashMap = cluster + .do_action(INIT_QUERY_FRAGMENTS, query_fragments, flight_params) + .await?; + + self.set_ctx(&ctx.get_id(), ctx.clone())?; + if let Some(query_fragments) = local_fragments { + init_query_fragments(query_fragments).await?; + } + + let prepared_query = actions.prepared_query()?; + let _: HashMap = cluster + .do_action(START_PREPARED_QUERY, prepared_query, flight_params) + .await?; + + Ok(()) + } + #[async_backtrace::framed] #[fastrace::trace] pub async fn commit_actions( diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 1442c1ffb7231..9d3855f78967f 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -28,6 +28,8 @@ use std::time::Instant; use std::time::SystemTime; use std::time::UNIX_EPOCH; +use async_channel::Receiver; +use async_channel::Sender; use chrono_tz::Tz; use dashmap::mapref::multiple::RefMulti; use dashmap::DashMap; @@ -132,6 +134,7 @@ use crate::clusters::Cluster; use crate::clusters::ClusterHelper; use crate::locks::LockManager; use crate::pipelines::executor::PipelineExecutor; +use crate::pipelines::processors::transforms::RuntimeFilterMeta; use crate::servers::flight::v1::exchange::DataExchangeManager; use crate::sessions::query_affect::QueryAffect; use crate::sessions::query_ctx_shared::MemoryUpdater; @@ -274,6 +277,14 @@ impl QueryContext { Ok(()) } + pub fn rf_src_recv(&self, join_id: u32) -> Receiver { + self.shared.rf_src_recv(join_id) + } + + pub fn rf_src_send(&self, join_id: u32) -> Sender { + self.shared.rf_src_send(join_id) + } + pub fn attach_table(&self, catalog: &str, database: &str, name: &str, table: Arc) { self.shared.attach_table(catalog, database, name, table) } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index cbaa01d22c3b9..8d348ed6c7134 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -23,6 +23,8 @@ use std::sync::Weak; use std::time::Duration; use std::time::SystemTime; +use async_channel::Receiver; +use async_channel::Sender; use dashmap::DashMap; use databend_common_base::base::short_sql; use databend_common_base::base::Progress; @@ -67,6 +69,7 @@ use uuid::Uuid; use crate::clusters::Cluster; use crate::clusters::ClusterDiscovery; use crate::pipelines::executor::PipelineExecutor; +use crate::pipelines::processors::transforms::RuntimeFilterMeta; use crate::sessions::query_affect::QueryAffect; use crate::sessions::Session; use crate::storages::Table; @@ -167,8 +170,16 @@ pub struct QueryContextShared { // Used by hilbert clustering when do recluster. pub(in crate::sessions) selected_segment_locs: Arc>>, + + // join_id -> (sender, receiver) + pub(in crate::sessions) rf_source: Arc>>, } +type RuntimeFilterChannel = ( + Option>, + Option>, +); + impl QueryContextShared { pub fn try_create( session: Arc, @@ -232,9 +243,33 @@ impl QueryContextShared { mem_stat: Arc::new(RwLock::new(None)), node_memory_usage: Arc::new(RwLock::new(HashMap::new())), selected_segment_locs: Default::default(), + rf_source: Arc::new(Mutex::new(HashMap::new())), })) } + pub fn rf_src_recv(&self, join_id: u32) -> Receiver { + let mut rf_source = self.rf_source.lock(); + match rf_source.get_mut(&join_id).map(|(_, receiver)| receiver) { + Some(receiver) => receiver.take().unwrap(), + None => { + let (sender, receiver) = async_channel::unbounded(); + rf_source.insert(join_id, (Some(sender), None)); + receiver + } + } + } + pub fn rf_src_send(&self, join_id: u32) -> Sender { + let mut rf_source = self.rf_source.lock(); + match rf_source.get_mut(&join_id).map(|(sender, _)| sender) { + Some(sender) => sender.take().unwrap(), + None => { + let (sender, receiver) = async_channel::unbounded(); + rf_source.insert(join_id, (None, Some(receiver))); + sender + } + } + } + pub fn set_error(&self, err: ErrorCode) { let err = err.with_context("query context error"); diff --git a/src/query/service/tests/it/pipelines/builders/runtime_filter.rs b/src/query/service/tests/it/pipelines/builders/runtime_filter.rs index ce1e81871f3f1..b739612ea5a44 100644 --- a/src/query/service/tests/it/pipelines/builders/runtime_filter.rs +++ b/src/query/service/tests/it/pipelines/builders/runtime_filter.rs @@ -100,6 +100,7 @@ async fn join_build_state( &join.build_projections, join_state.clone(), 1, + None, )?; Ok(build_state) } diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index b79d008827d21..8f71b65ee9f29 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -26,7 +26,7 @@ use databend_common_pipeline_core::processors::PlanProfile; use itertools::Itertools; use super::physical_plans::AddStreamColumn; -use super::PhysicalRuntimeFilter; +use super::RemoteRuntimeFilterDesc; use crate::binder::MutationType; use crate::executor::explain::PlanStatsInfo; use crate::executor::physical_plans::AggregateExpand; @@ -340,7 +340,7 @@ pub fn format_partial_tree( } struct FormatContext { - scan_id_to_runtime_filters: HashMap>, + scan_id_to_runtime_filters: HashMap>, } #[recursive::recursive] @@ -521,6 +521,12 @@ fn to_format_tree( PhysicalPlan::AsyncFunction(plan) => { async_function_to_format_tree(plan, metadata, profs, context) } + PhysicalPlan::RuntimeFilterSource(_plan) => { + Ok(FormatTreeNode::new("RuntimeFilterSource".to_string())) + } + PhysicalPlan::RuntimeFilterSink(_plan) => { + Ok(FormatTreeNode::new("RuntimeFilterSink".to_string())) + } } } @@ -1537,7 +1543,7 @@ fn hash_join_to_format_tree( profs: &HashMap, context: &mut FormatContext, ) -> Result> { - for rf in plan.runtime_filter.filters.iter() { + for rf in plan.runtime_filter_desc.filters.iter() { context .scan_id_to_runtime_filters .entry(rf.scan_id) @@ -1571,7 +1577,7 @@ fn hash_join_to_format_tree( probe_child.payload = format!("{}(Probe)", probe_child.payload); let mut build_runtime_filters = vec![]; - for rf in plan.runtime_filter.filters.iter() { + for rf in plan.runtime_filter_desc.filters.iter() { let mut s = format!( "filter id:{}, build key:{}, probe key:{}, filter type:", rf.id, diff --git a/src/query/sql/src/executor/mod.rs b/src/query/sql/src/executor/mod.rs index 73f2b65710b82..72925a854d6a8 100644 --- a/src/query/sql/src/executor/mod.rs +++ b/src/query/sql/src/executor/mod.rs @@ -27,6 +27,6 @@ pub use physical_plan::PhysicalPlan; pub use physical_plan_builder::MutationBuildInfo; pub use physical_plan_builder::PhysicalPlanBuilder; pub use physical_plan_visitor::PhysicalPlanReplacer; -pub use physical_plans::PhysicalRuntimeFilter; -pub use physical_plans::PhysicalRuntimeFilters; +pub use physical_plans::RemoteRuntimeFilterDesc; +pub use physical_plans::RemoteRuntimeFiltersDesc; pub use util::*; diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index 5f14843f193d7..495ec69d0203a 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -30,6 +30,8 @@ use super::physical_plans::MutationManipulate; use super::physical_plans::MutationOrganize; use super::physical_plans::MutationSource; use super::physical_plans::MutationSplit; +use super::physical_plans::RuntimeFilterSink; +use super::physical_plans::RuntimeFilterSource; use crate::executor::physical_plans::AggregateExpand; use crate::executor::physical_plans::AggregateFinal; use crate::executor::physical_plans::AggregatePartial; @@ -154,6 +156,10 @@ pub enum PhysicalPlan { // async function call AsyncFunction(AsyncFunction), + + // runtime filter + RuntimeFilterSource(RuntimeFilterSource), + RuntimeFilterSink(RuntimeFilterSink), } impl PhysicalPlan { @@ -406,6 +412,15 @@ impl PhysicalPlan { *next_id += 1; plan.input.adjust_plan_id(next_id); } + PhysicalPlan::RuntimeFilterSource(plan) => { + plan.plan_id = *next_id; + *next_id += 1; + } + PhysicalPlan::RuntimeFilterSink(plan) => { + plan.plan_id = *next_id; + *next_id += 1; + plan.input.adjust_plan_id(next_id); + } } } @@ -462,6 +477,8 @@ impl PhysicalPlan { PhysicalPlan::ChunkMerge(v) => v.plan_id, PhysicalPlan::ChunkCommitInsert(v) => v.plan_id, PhysicalPlan::RecursiveCteScan(v) => v.plan_id, + PhysicalPlan::RuntimeFilterSource(v) => v.plan_id, + PhysicalPlan::RuntimeFilterSink(v) => v.plan_id, } } @@ -507,6 +524,8 @@ impl PhysicalPlan { | PhysicalPlan::CommitSink(_) | PhysicalPlan::DistributedInsertSelect(_) | PhysicalPlan::Recluster(_) + | PhysicalPlan::RuntimeFilterSource(_) + | PhysicalPlan::RuntimeFilterSink(_) | PhysicalPlan::HilbertPartition(_) => Ok(DataSchemaRef::default()), PhysicalPlan::Duplicate(plan) => plan.input.output_schema(), PhysicalPlan::Shuffle(plan) => plan.input.output_schema(), @@ -578,6 +597,8 @@ impl PhysicalPlan { PhysicalPlan::ChunkAppendData(_) => "WriteData".to_string(), PhysicalPlan::ChunkMerge(_) => "ChunkMerge".to_string(), PhysicalPlan::ChunkCommitInsert(_) => "Commit".to_string(), + PhysicalPlan::RuntimeFilterSource(_) => "RuntimeFilterSource".to_string(), + PhysicalPlan::RuntimeFilterSink(_) => "RuntimeFilterSink".to_string(), } } @@ -590,7 +611,8 @@ impl PhysicalPlan { | PhysicalPlan::CompactSource(_) | PhysicalPlan::ReplaceAsyncSourcer(_) | PhysicalPlan::Recluster(_) - | PhysicalPlan::RecursiveCteScan(_) => Box::new(std::iter::empty()), + | PhysicalPlan::RecursiveCteScan(_) + | PhysicalPlan::RuntimeFilterSource(_) => Box::new(std::iter::empty()), PhysicalPlan::HilbertPartition(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::EvalScalar(plan) => Box::new(std::iter::once(plan.input.as_ref())), @@ -608,6 +630,7 @@ impl PhysicalPlan { PhysicalPlan::ExpressionScan(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::Exchange(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::ExchangeSink(plan) => Box::new(std::iter::once(plan.input.as_ref())), + PhysicalPlan::RuntimeFilterSink(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::UnionAll(plan) => Box::new( std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), ), @@ -705,7 +728,9 @@ impl PhysicalPlan { | PhysicalPlan::ChunkFillAndReorder(_) | PhysicalPlan::ChunkAppendData(_) | PhysicalPlan::ChunkMerge(_) - | PhysicalPlan::ChunkCommitInsert(_) => None, + | PhysicalPlan::ChunkCommitInsert(_) + | PhysicalPlan::RuntimeFilterSource(_) + | PhysicalPlan::RuntimeFilterSink(_) => None, } } diff --git a/src/query/sql/src/executor/physical_plan_builder.rs b/src/query/sql/src/executor/physical_plan_builder.rs index c50adc9f30db4..b25b6a84b9b02 100644 --- a/src/query/sql/src/executor/physical_plan_builder.rs +++ b/src/query/sql/src/executor/physical_plan_builder.rs @@ -39,6 +39,7 @@ pub struct PhysicalPlanBuilder { pub(crate) dry_run: bool, // DataMutation info, used to build MergeInto physical plan pub(crate) mutation_build_info: Option, + pub(crate) next_hash_join_id: u32, } impl PhysicalPlanBuilder { @@ -50,6 +51,7 @@ impl PhysicalPlanBuilder { func_ctx, dry_run, mutation_build_info: None, + next_hash_join_id: 0, } } diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 8a1ced834b01a..2b3bbf5c3703e 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -22,6 +22,8 @@ use super::physical_plans::MutationManipulate; use super::physical_plans::MutationOrganize; use super::physical_plans::MutationSplit; use super::physical_plans::RecursiveCteScan; +use super::physical_plans::RuntimeFilterSink; +use super::physical_plans::RuntimeFilterSource; use crate::executor::physical_plan::PhysicalPlan; use crate::executor::physical_plans::AggregateExpand; use crate::executor::physical_plans::AggregateFinal; @@ -120,9 +122,26 @@ pub trait PhysicalPlanReplacer { PhysicalPlan::ChunkAppendData(plan) => self.replace_chunk_append_data(plan), PhysicalPlan::ChunkMerge(plan) => self.replace_chunk_merge(plan), PhysicalPlan::ChunkCommitInsert(plan) => self.replace_chunk_commit_insert(plan), + PhysicalPlan::RuntimeFilterSource(plan) => self.replace_runtime_filter_source(plan), + PhysicalPlan::RuntimeFilterSink(plan) => self.replace_runtime_filter_sink(plan), } } + fn replace_runtime_filter_source( + &mut self, + plan: &RuntimeFilterSource, + ) -> Result { + Ok(PhysicalPlan::RuntimeFilterSource(plan.clone())) + } + + fn replace_runtime_filter_sink(&mut self, plan: &RuntimeFilterSink) -> Result { + let input = self.replace(&plan.input)?; + Ok(PhysicalPlan::RuntimeFilterSink(RuntimeFilterSink { + plan_id: plan.plan_id, + input: Box::new(input), + })) + } + fn replace_recluster(&mut self, plan: &Recluster) -> Result { Ok(PhysicalPlan::Recluster(Box::new(plan.clone()))) } @@ -253,6 +272,12 @@ pub trait PhysicalPlanReplacer { let build = self.replace(&plan.build)?; let probe = self.replace(&plan.probe)?; + let runtime_filter_plan = if let Some(runtime_filter_plan) = &plan.runtime_filter_plan { + Some(Box::new(self.replace(runtime_filter_plan)?)) + } else { + None + }; + Ok(PhysicalPlan::HashJoin(HashJoin { plan_id: plan.plan_id, projections: plan.projections.clone(), @@ -271,10 +296,12 @@ pub trait PhysicalPlanReplacer { output_schema: plan.output_schema.clone(), need_hold_hash_table: plan.need_hold_hash_table, stat_info: plan.stat_info.clone(), - runtime_filter: plan.runtime_filter.clone(), + runtime_filter_desc: plan.runtime_filter_desc.clone(), broadcast: plan.broadcast, single_to_inner: plan.single_to_inner.clone(), build_side_cache_info: plan.build_side_cache_info.clone(), + runtime_filter_plan, + join_id: plan.join_id, })) } @@ -646,7 +673,8 @@ impl PhysicalPlan { | PhysicalPlan::HilbertPartition(_) | PhysicalPlan::ExchangeSource(_) | PhysicalPlan::CompactSource(_) - | PhysicalPlan::MutationSource(_) => {} + | PhysicalPlan::MutationSource(_) + | PhysicalPlan::RuntimeFilterSource(_) => {} PhysicalPlan::Filter(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } @@ -772,6 +800,9 @@ impl PhysicalPlan { PhysicalPlan::ChunkCommitInsert(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } + PhysicalPlan::RuntimeFilterSink(plan) => { + Self::traverse(&plan.input, pre_visit, visit, post_visit); + } } post_visit(plan); } diff --git a/src/query/sql/src/executor/physical_plans/mod.rs b/src/query/sql/src/executor/physical_plans/mod.rs index 27fdfadefd44f..29e973ed0bac9 100644 --- a/src/query/sql/src/executor/physical_plans/mod.rs +++ b/src/query/sql/src/executor/physical_plans/mod.rs @@ -50,6 +50,7 @@ mod physical_replace_async_source; mod physical_replace_deduplicate; mod physical_replace_into; mod physical_row_fetch; +mod physical_runtime_filter; mod physical_sort; mod physical_table_scan; mod physical_udf; @@ -79,8 +80,8 @@ pub use physical_exchange_source::ExchangeSource; pub use physical_expression_scan::ExpressionScan; pub use physical_filter::Filter; pub use physical_hash_join::HashJoin; -pub use physical_hash_join::PhysicalRuntimeFilter; -pub use physical_hash_join::PhysicalRuntimeFilters; +pub use physical_hash_join::RemoteRuntimeFilterDesc; +pub use physical_hash_join::RemoteRuntimeFiltersDesc; pub use physical_join::PhysicalJoinType; pub use physical_limit::Limit; pub use physical_multi_table_insert::*; @@ -99,6 +100,7 @@ pub use physical_replace_async_source::ReplaceAsyncSourcer; pub use physical_replace_deduplicate::*; pub use physical_replace_into::ReplaceInto; pub use physical_row_fetch::RowFetch; +pub use physical_runtime_filter::*; pub use physical_sort::Sort; pub use physical_table_scan::TableScan; pub use physical_udf::Udf; diff --git a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs index 3b1d9c83aa2f8..ec1f33174ca95 100644 --- a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs +++ b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs @@ -30,6 +30,8 @@ use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_storages_common_table_meta::table::get_change_type; +use super::RuntimeFilterSink; +use super::RuntimeFilterSource; use crate::executor::explain::PlanStatsInfo; use crate::executor::physical_plans::Exchange; use crate::executor::physical_plans::FragmentKind; @@ -88,16 +90,19 @@ pub struct HashJoin { // a HashMap for mapping the column indexes to the BlockEntry indexes in DataBlock. pub build_side_cache_info: Option<(usize, HashMap)>, - pub runtime_filter: PhysicalRuntimeFilters, + pub runtime_filter_desc: RemoteRuntimeFiltersDesc, + pub runtime_filter_plan: Option>, + pub join_id: u32, } +/// Runtime filter description with `RemoteExpr` #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)] -pub struct PhysicalRuntimeFilters { - pub filters: Vec, +pub struct RemoteRuntimeFiltersDesc { + pub filters: Vec, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct PhysicalRuntimeFilter { +pub struct RemoteRuntimeFilterDesc { pub id: usize, pub build_key: RemoteExpr, pub probe_key: RemoteExpr, @@ -519,7 +524,7 @@ impl PhysicalPlanBuilder { let output_schema = DataSchemaRefExt::create(output_fields); let runtime_filter = self - .build_runtime_filter( + .build_runtime_filter_desc( join, s_expr, is_broadcast, @@ -527,6 +532,19 @@ impl PhysicalPlanBuilder { left_join_conditions_rt, ) .await?; + + let join_id = self.next_hash_join_id; + self.next_hash_join_id += 1; + + let runtime_filter_plan = if !runtime_filter.filters.is_empty() + && !self.ctx.get_cluster().is_empty() + && !is_broadcast + { + Some(Self::build_runtime_filter_plan(join_id)?) + } else { + None + }; + Ok(PhysicalPlan::HashJoin(HashJoin { plan_id: 0, projections, @@ -560,28 +578,45 @@ impl PhysicalPlanBuilder { broadcast: is_broadcast, single_to_inner: join.single_to_inner.clone(), build_side_cache_info, - runtime_filter, + runtime_filter_desc: runtime_filter, + runtime_filter_plan, + join_id, })) } - async fn build_runtime_filter( + fn build_runtime_filter_plan(join_id: u32) -> Result> { + let runtime_filter_source = + Box::new(PhysicalPlan::RuntimeFilterSource(RuntimeFilterSource { + plan_id: 0, + join_id, + })); + let exchange = Box::new(PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: runtime_filter_source, + kind: FragmentKind::Expansive, + keys: vec![], + allow_adjust_parallelism: true, + ignore_exchange: false, + })); + let runtime_filter_sink = Box::new(PhysicalPlan::RuntimeFilterSink(RuntimeFilterSink { + plan_id: 0, + input: exchange, + })); + Ok(runtime_filter_sink) + } + + async fn build_runtime_filter_desc( &self, join: &Join, s_expr: &SExpr, is_broadcast: bool, build_keys: &[RemoteExpr], probe_keys: Vec, usize, usize)>>, - ) -> Result { + ) -> Result { if !supported_join_type_for_runtime_filter(&join.join_type) { return Ok(Default::default()); } - let is_cluster = !self.ctx.get_cluster().is_empty(); - if is_cluster && !is_broadcast { - // For cluster, only support runtime filter for broadcast join. - return Ok(Default::default()); - } - let mut filters = Vec::new(); for (build_key, probe_key, scan_id, table_index) in build_keys .iter() @@ -599,6 +634,8 @@ impl PhysicalPlanBuilder { let id = self.metadata.write().next_runtime_filter_id(); let enable_bloom_runtime_filter = { + // shuffle join does not support bloom runtime filter for now + let is_shuffle = self.ctx.get_cluster().is_empty() && !is_broadcast; let is_supported_type = data_type.is_number() || data_type.is_string(); let enable_bloom_runtime_filter_based_on_stats = adjust_bloom_runtime_filter( self.ctx.clone(), @@ -607,11 +644,11 @@ impl PhysicalPlanBuilder { s_expr, ) .await?; - is_supported_type && enable_bloom_runtime_filter_based_on_stats + !is_shuffle && is_supported_type && enable_bloom_runtime_filter_based_on_stats }; let enable_min_max_runtime_filter = data_type.is_number() || data_type.is_date() || data_type.is_string(); - let runtime_filter = PhysicalRuntimeFilter { + let runtime_filter = RemoteRuntimeFilterDesc { id, build_key: build_key.clone(), probe_key, @@ -622,7 +659,7 @@ impl PhysicalPlanBuilder { }; filters.push(runtime_filter); } - Ok(PhysicalRuntimeFilters { filters }) + Ok(RemoteRuntimeFiltersDesc { filters }) } } diff --git a/src/query/sql/src/executor/physical_plans/physical_runtime_filter.rs b/src/query/sql/src/executor/physical_plans/physical_runtime_filter.rs new file mode 100644 index 0000000000000..89fe24ead7893 --- /dev/null +++ b/src/query/sql/src/executor/physical_plans/physical_runtime_filter.rs @@ -0,0 +1,27 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::executor::PhysicalPlan; + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct RuntimeFilterSource { + pub plan_id: u32, + pub join_id: u32, +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct RuntimeFilterSink { + pub plan_id: u32, + pub input: Box, +} From c99099390599a72939d87044a378c9cb089fc7d2 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Fri, 18 Apr 2025 14:25:40 +0800 Subject: [PATCH 02/22] avoid committing multiple times --- src/query/service/src/schedulers/scheduler.rs | 39 ++++------------- .../flight/v1/exchange/exchange_manager.rs | 43 ------------------- 2 files changed, 9 insertions(+), 73 deletions(-) diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs index cd635066f6a6a..6a6224d788341 100644 --- a/src/query/service/src/schedulers/scheduler.rs +++ b/src/query/service/src/schedulers/scheduler.rs @@ -20,7 +20,6 @@ use databend_common_expression::DataBlock; use databend_common_sql::planner::QueryExecutor; use databend_common_sql::Planner; use futures_util::TryStreamExt; -use log::info; use crate::interpreters::InterpreterFactory; use crate::pipelines::executor::ExecutorSettings; @@ -100,13 +99,15 @@ pub async fn build_distributed_pipeline( ctx: &Arc, plan: &PhysicalPlan, ) -> Result { - let runtime_filter_broadcast_plans = collect_runtime_filter_broadcast_plans(plan)?; - start_runtime_filter_broadcast(ctx, &runtime_filter_broadcast_plans).await?; - let fragmenter = Fragmenter::try_create(ctx.clone())?; - - let root_fragment = fragmenter.build_fragment(plan)?; let mut fragments_actions = QueryFragmentsActions::create(ctx.clone()); - root_fragment.get_actions(ctx.clone(), &mut fragments_actions)?; + for plan in collect_runtime_filter_broadcast_plans(plan)? + .iter() + .chain(std::iter::once(plan)) + { + let fragmenter = Fragmenter::try_create(ctx.clone())?; + let root_fragment = fragmenter.build_fragment(plan)?; + root_fragment.get_actions(ctx.clone(), &mut fragments_actions)?; + } let exchange_manager = ctx.get_exchange_manager(); @@ -126,28 +127,6 @@ pub async fn build_distributed_pipeline( } } -async fn start_runtime_filter_broadcast( - ctx: &Arc, - runtime_filter_broadcast_plans: &[PhysicalPlan], -) -> Result<()> { - if runtime_filter_broadcast_plans.is_empty() { - return Ok(()); - } - - for plan in runtime_filter_broadcast_plans { - let fragmenter = Fragmenter::try_create(ctx.clone())?; - let root_fragment = fragmenter.build_fragment(plan)?; - let mut fragments_actions = QueryFragmentsActions::create(ctx.clone()); - root_fragment.get_actions(ctx.clone(), &mut fragments_actions)?; - - let exchange_manager = ctx.get_exchange_manager(); - exchange_manager - .commit_actions_without_return_pipeline(ctx.clone(), fragments_actions) - .await?; - } - Ok(()) -} - fn collect_runtime_filter_broadcast_plans(plan: &PhysicalPlan) -> Result> { let mut runtime_filter_broadcast_plans = Vec::new(); @@ -160,7 +139,7 @@ fn collect_runtime_filter_broadcast_plans(plan: &PhysicalPlan) -> Result, - actions: QueryFragmentsActions, - ) -> Result<()> { - let settings = ctx.get_settings(); - let flight_params = FlightParams { - timeout: settings.get_flight_client_timeout()?, - retry_times: settings.get_flight_max_retry_times()?, - retry_interval: settings.get_flight_retry_interval()?, - }; - let root_actions = actions.get_root_actions()?; - let conf = GlobalConfig::instance(); - - // Initialize query env between cluster nodes - let query_env = actions.get_query_env()?; - query_env.init(&ctx, flight_params).await?; - - // Submit distributed tasks to all nodes. - let cluster = ctx.get_cluster(); - let mut query_fragments = actions.get_query_fragments()?; - - let local_fragments = query_fragments.remove(&conf.query.node_id); - - let _: HashMap = cluster - .do_action(INIT_QUERY_FRAGMENTS, query_fragments, flight_params) - .await?; - - self.set_ctx(&ctx.get_id(), ctx.clone())?; - if let Some(query_fragments) = local_fragments { - init_query_fragments(query_fragments).await?; - } - - let prepared_query = actions.prepared_query()?; - let _: HashMap = cluster - .do_action(START_PREPARED_QUERY, prepared_query, flight_params) - .await?; - - Ok(()) - } - #[async_backtrace::framed] #[fastrace::trace] pub async fn commit_actions( From 59be6a3ba363e89f008ffa46fd2e63c0bfc743c1 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Fri, 18 Apr 2025 14:41:30 +0800 Subject: [PATCH 03/22] fix merge --- src/query/sql/src/executor/physical_plan.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index 53399dbbe6da4..bc7191c62cc56 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -686,6 +686,7 @@ impl PhysicalPlan { | PhysicalPlan::CompactSource(_) | PhysicalPlan::ReplaceAsyncSourcer(_) | PhysicalPlan::Recluster(_) + | PhysicalPlan::RuntimeFilterSource(_) | PhysicalPlan::RecursiveCteScan(_) => Box::new(std::iter::empty()), PhysicalPlan::HilbertPartition(plan) => Box::new(std::iter::once(plan.input.as_mut())), PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_mut())), @@ -746,6 +747,7 @@ impl PhysicalPlan { CopyIntoTableSource::Query(v) => Box::new(std::iter::once(v.as_mut())), CopyIntoTableSource::Stage(v) => Box::new(std::iter::once(v.as_mut())), }, + PhysicalPlan::RuntimeFilterSink(plan) => Box::new(std::iter::once(plan.input.as_mut())), } } From 8dafec8afe3207e89a0df1be7baf4d95c0331ba2 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Fri, 18 Apr 2025 14:49:32 +0800 Subject: [PATCH 04/22] fix fragment type --- src/query/service/src/schedulers/fragments/fragmenter.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index 80132824aa545..4a765fe264fa0 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -124,9 +124,13 @@ impl Fragmenter { pub fn build_fragment(mut self, plan: &PhysicalPlan) -> Result { let root = self.replace(plan)?; + let fragment_type = match plan { + PhysicalPlan::RuntimeFilterSink(_) => FragmentType::Intermediate, + _ => FragmentType::Root, + }; let mut root_fragment = PlanFragment { plan: root, - fragment_type: FragmentType::Root, + fragment_type, fragment_id: self.ctx.get_fragment_id(), exchange: None, query_id: self.query_id.clone(), From 0060d4675dace91bd6934b54a75bd9caa4c13d32 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Fri, 18 Apr 2025 14:59:03 +0800 Subject: [PATCH 05/22] rm unused modify --- .../src/schedulers/fragments/fragmenter.rs | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index 4a765fe264fa0..764117eeac90f 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -215,23 +215,6 @@ impl PhysicalPlanReplacer for Fragmenter { Ok(PhysicalPlan::CompactSource(Box::new(plan.clone()))) } - fn replace_runtime_filter_sink(&mut self, plan: &RuntimeFilterSink) -> Result { - let input = self.replace(&plan.input)?; - let sink = PhysicalPlan::RuntimeFilterSink(RuntimeFilterSink { - plan_id: plan.plan_id, - input: Box::new(input), - }); - self.fragments.push(PlanFragment { - plan: sink.clone(), - fragment_type: FragmentType::Intermediate, - fragment_id: self.ctx.get_fragment_id(), - exchange: None, - query_id: self.query_id.clone(), - source_fragments: vec![self.fragments.last().unwrap().clone()], - }); - Ok(sink) - } - fn replace_hash_join(&mut self, plan: &HashJoin) -> Result { let mut fragments = vec![]; let build_input = self.replace(plan.build.as_ref())?; From 097f94c1ce56020ad4e85ad56d6e2ca7fcef143d Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Fri, 18 Apr 2025 15:30:49 +0800 Subject: [PATCH 06/22] fix --- src/query/service/src/schedulers/fragments/fragmenter.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index 764117eeac90f..161117159256d 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -29,7 +29,6 @@ use databend_common_sql::executor::physical_plans::HashJoin; use databend_common_sql::executor::physical_plans::MutationSource; use databend_common_sql::executor::physical_plans::Recluster; use databend_common_sql::executor::physical_plans::ReplaceInto; -use databend_common_sql::executor::physical_plans::RuntimeFilterSink; use databend_common_sql::executor::physical_plans::TableScan; use databend_common_sql::executor::physical_plans::UnionAll; use databend_common_sql::executor::PhysicalPlanReplacer; From 413fd1c2ea529ef37c312dd7f2ca4106dfa21d86 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Mon, 21 Apr 2025 09:36:50 +0800 Subject: [PATCH 07/22] improve err msg --- .../src/servers/flight/v1/exchange/exchange_manager.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index d158ccf3c9b89..9578cd412b056 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -469,7 +469,14 @@ impl DataExchangeManager { match queries_coordinator.get_mut(&query_id) { None => Err(ErrorCode::Internal("Query not exists.")), Some(query_coordinator) => { - assert!(query_coordinator.fragment_exchanges.is_empty()); + assert!( + query_coordinator.fragment_exchanges.is_empty(), + "query_coordinator.fragment_exchanges is not empty: {:?}", + query_coordinator + .fragment_exchanges + .keys() + .collect::>() + ); let injector = DefaultExchangeInjector::create(); let mut build_res = query_coordinator.subscribe_fragment(&ctx, fragment_id, injector)?; From 45dc770906f6f7b274e619b642797fd9b18319a6 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Tue, 22 Apr 2025 16:05:18 +0800 Subject: [PATCH 08/22] fix build pipeline --- .../service/src/pipelines/builders/builder_runtime_filter.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/builders/builder_runtime_filter.rs b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs index 21333b7da954f..4f8009f98ec8f 100644 --- a/src/query/service/src/pipelines/builders/builder_runtime_filter.rs +++ b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs @@ -34,7 +34,8 @@ impl PipelineBuilder { ) } - pub(crate) fn build_runtime_filter_sink(&mut self, _sink: &RuntimeFilterSink) -> Result<()> { + pub(crate) fn build_runtime_filter_sink(&mut self, sink: &RuntimeFilterSink) -> Result<()> { + self.build_pipeline(&sink.input)?; self.main_pipeline .add_sink(RuntimeFilterSinkProcessor::create) } From 7939cad1279074badd1ee54e2c13f4841d026b5c Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Tue, 22 Apr 2025 16:39:01 +0800 Subject: [PATCH 09/22] fix exhchange --- .../flight/v1/exchange/exchange_manager.rs | 111 +++++++++--------- 1 file changed, 55 insertions(+), 56 deletions(-) diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 9578cd412b056..163d4fdb5a1a4 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -761,16 +761,18 @@ impl QueryCoordinator { return Ok(fragment_coordinator.pipeline_build_res.unwrap()); } - let exchange_params = fragment_coordinator.create_exchange_params( - info, - fragment_coordinator - .pipeline_build_res - .as_ref() - .map(|x| x.exchange_injector.clone()) - .ok_or_else(|| { - ErrorCode::Internal("Pipeline build result is none, It's a bug") - })?, - )?; + let exchange_params = fragment_coordinator + .create_exchange_params( + info, + fragment_coordinator + .pipeline_build_res + .as_ref() + .map(|x| x.exchange_injector.clone()) + .ok_or_else(|| { + ErrorCode::Internal("Pipeline build result is none, It's a bug") + })?, + )? + .unwrap(); let mut build_res = fragment_coordinator.pipeline_build_res.unwrap(); // Add exchange data transform. @@ -840,13 +842,11 @@ impl QueryCoordinator { if let Some(mut build_res) = coordinator.pipeline_build_res.take() { build_res.set_max_threads(max_threads as usize); - if !build_res.main_pipeline.is_pulling_pipeline()? { - return Err(ErrorCode::Internal("Logical error, It's a bug")); + if let Some(params) = params { + // Add exchange data publisher. + ExchangeSink::via(&info.query_ctx, ¶ms, &mut build_res.main_pipeline)?; } - // Add exchange data publisher. - ExchangeSink::via(&info.query_ctx, ¶ms, &mut build_res.main_pipeline)?; - if !build_res.main_pipeline.is_complete_pipeline()? { return Err(ErrorCode::Internal("Logical error, It's a bug")); } @@ -924,48 +924,47 @@ impl FragmentCoordinator { &self, info: &QueryInfo, exchange_injector: Arc, - ) -> Result { - if let Some(data_exchange) = &self.data_exchange { - return match data_exchange { - DataExchange::Merge(exchange) => { - Ok(ExchangeParams::MergeExchange(MergeExchangeParams { - exchange_injector: exchange_injector.clone(), - schema: self.physical_plan.output_schema()?, - fragment_id: self.fragment_id, - query_id: info.query_id.to_string(), - destination_id: exchange.destination_id.clone(), - allow_adjust_parallelism: exchange.allow_adjust_parallelism, - ignore_exchange: exchange.ignore_exchange, - })) - } - DataExchange::Broadcast(exchange) => { - Ok(ExchangeParams::ShuffleExchange(ShuffleExchangeParams { - exchange_injector: exchange_injector.clone(), - schema: self.physical_plan.output_schema()?, - fragment_id: self.fragment_id, - query_id: info.query_id.to_string(), - executor_id: info.current_executor.to_string(), - destination_ids: exchange.destination_ids.to_owned(), - shuffle_scatter: exchange_injector - .flight_scatter(&info.query_ctx, data_exchange)?, - })) - } - DataExchange::ShuffleDataExchange(exchange) => { - Ok(ExchangeParams::ShuffleExchange(ShuffleExchangeParams { - exchange_injector: exchange_injector.clone(), - schema: self.physical_plan.output_schema()?, - fragment_id: self.fragment_id, - query_id: info.query_id.to_string(), - executor_id: info.current_executor.to_string(), - destination_ids: exchange.destination_ids.to_owned(), - shuffle_scatter: exchange_injector - .flight_scatter(&info.query_ctx, data_exchange)?, - })) - } - }; + ) -> Result> { + let Some(data_exchange) = &self.data_exchange else { + return Ok(None); + }; + match data_exchange { + DataExchange::Merge(exchange) => { + Ok(Some(ExchangeParams::MergeExchange(MergeExchangeParams { + exchange_injector: exchange_injector.clone(), + schema: self.physical_plan.output_schema()?, + fragment_id: self.fragment_id, + query_id: info.query_id.to_string(), + destination_id: exchange.destination_id.clone(), + allow_adjust_parallelism: exchange.allow_adjust_parallelism, + ignore_exchange: exchange.ignore_exchange, + }))) + } + DataExchange::Broadcast(exchange) => Ok(Some(ExchangeParams::ShuffleExchange( + ShuffleExchangeParams { + exchange_injector: exchange_injector.clone(), + schema: self.physical_plan.output_schema()?, + fragment_id: self.fragment_id, + query_id: info.query_id.to_string(), + executor_id: info.current_executor.to_string(), + destination_ids: exchange.destination_ids.to_owned(), + shuffle_scatter: exchange_injector + .flight_scatter(&info.query_ctx, data_exchange)?, + }, + ))), + DataExchange::ShuffleDataExchange(exchange) => Ok(Some( + ExchangeParams::ShuffleExchange(ShuffleExchangeParams { + exchange_injector: exchange_injector.clone(), + schema: self.physical_plan.output_schema()?, + fragment_id: self.fragment_id, + query_id: info.query_id.to_string(), + executor_id: info.current_executor.to_string(), + destination_ids: exchange.destination_ids.to_owned(), + shuffle_scatter: exchange_injector + .flight_scatter(&info.query_ctx, data_exchange)?, + }), + )), } - - Err(ErrorCode::Internal("Cannot find data exchange.")) } pub fn prepare_pipeline(&mut self, ctx: Arc) -> Result<()> { From 509fa2dac2ebfa5266582ef33308d7fed662ca46 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Tue, 22 Apr 2025 17:24:02 +0800 Subject: [PATCH 10/22] refactor --- .../servers/flight/v1/exchange/exchange_manager.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 163d4fdb5a1a4..f5d577e32edcc 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -842,10 +842,19 @@ impl QueryCoordinator { if let Some(mut build_res) = coordinator.pipeline_build_res.take() { build_res.set_max_threads(max_threads as usize); - if let Some(params) = params { + if build_res.main_pipeline.is_pulling_pipeline()? { + let Some(params) = params else { + return Err(ErrorCode::Internal( + "pipeline is pulling pipeline, but exchange params is none", + )); + }; // Add exchange data publisher. ExchangeSink::via(&info.query_ctx, ¶ms, &mut build_res.main_pipeline)?; - } + } else if build_res.main_pipeline.is_complete_pipeline()? && params.is_some() { + return Err(ErrorCode::Internal( + "pipeline is complete pipeline, but exchange params is some", + )); + }; if !build_res.main_pipeline.is_complete_pipeline()? { return Err(ErrorCode::Internal("Logical error, It's a bug")); From 5711316f71859227a8d643931afecd8bec8ef5a0 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 24 Apr 2025 10:47:54 +0800 Subject: [PATCH 11/22] fix root pipeline --- .../processors/transforms/runtime_filter.rs | 8 +++-- .../fragments/query_fragment_actions.rs | 16 ++++++---- .../flight/v1/exchange/exchange_manager.rs | 31 ++++++++++++++----- 3 files changed, 40 insertions(+), 15 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs index cf59c7cd1e38a..f838c994cb630 100644 --- a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs +++ b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs @@ -69,14 +69,18 @@ impl AsyncSource for RuntimeFilterSourceProcessor { } } -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct RuntimeFilterMeta { inlist: Vec>, min_max: Vec>, } #[typetag::serde(name = "runtime_filter_meta")] -impl BlockMetaInfo for RuntimeFilterMeta {} +impl BlockMetaInfo for RuntimeFilterMeta { + fn clone_self(&self) -> Box { + Box::new(self.clone()) + } +} impl From<&RuntimeFilterInfo> for RuntimeFilterMeta { fn from(runtime_filter_info: &RuntimeFilterInfo) -> Self { diff --git a/src/query/service/src/schedulers/fragments/query_fragment_actions.rs b/src/query/service/src/schedulers/fragments/query_fragment_actions.rs index dece1fdfb21c5..9fa10c1585bfc 100644 --- a/src/query/service/src/schedulers/fragments/query_fragment_actions.rs +++ b/src/query/service/src/schedulers/fragments/query_fragment_actions.rs @@ -126,12 +126,16 @@ impl QueryFragmentsActions { self.ctx.get_cluster().local_id() } - pub fn get_root_actions(&self) -> Result<&QueryFragmentActions> { - self.fragments_actions.last().ok_or_else(|| { - ErrorCode::Internal( - "Logical error, call get_root_actions in empty QueryFragmentsActions", - ) - }) + pub fn get_root_fragment_ids(&self) -> Result> { + let mut fragment_ids = Vec::new(); + for fragment_actions in &self.fragments_actions { + let plan = &fragment_actions.fragment_actions[0].physical_plan; + if !matches!(plan, PhysicalPlan::ExchangeSink(_)) { + fragment_ids.push(fragment_actions.fragment_id); + } + } + + Ok(fragment_ids) } pub fn pop_root_actions(&mut self) -> Option { diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index f5d577e32edcc..0613f6da72d70 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -57,7 +57,6 @@ use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; use crate::pipelines::PipelineBuilder; -use crate::schedulers::QueryFragmentActions; use crate::schedulers::QueryFragmentsActions; use crate::servers::flight::v1::actions::init_query_fragments; use crate::servers::flight::v1::actions::INIT_QUERY_FRAGMENTS; @@ -422,7 +421,7 @@ impl DataExchangeManager { retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, }; - let root_actions = actions.get_root_actions()?; + let mut root_fragment_ids = actions.get_root_fragment_ids()?; let conf = GlobalConfig::instance(); // Initialize query env between cluster nodes @@ -445,7 +444,8 @@ impl DataExchangeManager { } // Get local pipeline of local task - let build_res = self.get_root_pipeline(ctx, root_actions)?; + let main_fragment_id = root_fragment_ids.pop().unwrap(); + let build_res = self.get_root_pipeline(ctx, main_fragment_id, root_fragment_ids)?; let prepared_query = actions.prepared_query()?; let _: HashMap = cluster @@ -458,10 +458,10 @@ impl DataExchangeManager { fn get_root_pipeline( &self, ctx: Arc, - root_actions: &QueryFragmentActions, + main_fragment_id: usize, + fragment_ids: Vec, ) -> Result { let query_id = ctx.get_id(); - let fragment_id = root_actions.fragment_id; let queries_coordinator_guard = self.queries_coordinator.lock(); let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() }; @@ -478,8 +478,25 @@ impl DataExchangeManager { .collect::>() ); let injector = DefaultExchangeInjector::create(); - let mut build_res = - query_coordinator.subscribe_fragment(&ctx, fragment_id, injector)?; + let mut build_res = query_coordinator.subscribe_fragment( + &ctx, + main_fragment_id, + injector.clone(), + )?; + + for fragment_id in fragment_ids { + let sub_build_res = query_coordinator.subscribe_fragment( + &ctx, + fragment_id, + injector.clone(), + )?; + build_res + .sources_pipelines + .push(sub_build_res.main_pipeline); + build_res + .sources_pipelines + .extend(sub_build_res.sources_pipelines); + } let exchanges = std::mem::take(&mut query_coordinator.statistics_exchanges); let statistics_receiver = StatisticsReceiver::spawn_receiver(&ctx, exchanges)?; From baa0f22485e2fd95c226095f0dd4a2591ed345d6 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 24 Apr 2025 11:11:13 +0800 Subject: [PATCH 12/22] fix pipeline --- .../service/src/pipelines/builders/builder_runtime_filter.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/service/src/pipelines/builders/builder_runtime_filter.rs b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs index 4f8009f98ec8f..e1671f111c109 100644 --- a/src/query/service/src/pipelines/builders/builder_runtime_filter.rs +++ b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs @@ -36,6 +36,7 @@ impl PipelineBuilder { pub(crate) fn build_runtime_filter_sink(&mut self, sink: &RuntimeFilterSink) -> Result<()> { self.build_pipeline(&sink.input)?; + self.main_pipeline.resize(1, true); self.main_pipeline .add_sink(RuntimeFilterSinkProcessor::create) } From 0c2e75706ca8ce7fba978a79b1c6ed1263d5d4ca Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 24 Apr 2025 11:36:44 +0800 Subject: [PATCH 13/22] improve error msg --- .../src/pipelines/builders/builder_runtime_filter.rs | 2 +- .../flight/v1/exchange/exchange_sink_writer.rs | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_runtime_filter.rs b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs index e1671f111c109..5a8713795795f 100644 --- a/src/query/service/src/pipelines/builders/builder_runtime_filter.rs +++ b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs @@ -36,7 +36,7 @@ impl PipelineBuilder { pub(crate) fn build_runtime_filter_sink(&mut self, sink: &RuntimeFilterSink) -> Result<()> { self.build_pipeline(&sink.input)?; - self.main_pipeline.resize(1, true); + self.main_pipeline.resize(1, true)?; self.main_pipeline .add_sink(RuntimeFilterSinkProcessor::create) } diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs index abebc2ba6a254..92128beb3a84b 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs @@ -70,11 +70,15 @@ impl AsyncSink for ExchangeWriterSink { async fn consume(&mut self, mut data_block: DataBlock) -> Result { let serialize_meta = match data_block.take_meta() { None => Err(ErrorCode::Internal( - "ExchangeWriterSink only recv ExchangeSerializeMeta.", + "ExchangeWriterSink only recv ExchangeSerializeMeta, but got none.", )), - Some(block_meta) => ExchangeSerializeMeta::downcast_from(block_meta).ok_or_else(|| { - ErrorCode::Internal("ExchangeWriterSink only recv ExchangeSerializeMeta.") - }), + Some(block_meta) => ExchangeSerializeMeta::downcast_from(block_meta.clone()) + .ok_or_else(|| { + ErrorCode::Internal(format!( + "ExchangeWriterSink only recv ExchangeSerializeMeta, but got {:?}", + block_meta + )) + }), }?; let mut bytes = 0; From ec1c88b98756548e957e898030a5ec5626743a19 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 24 Apr 2025 15:37:53 +0800 Subject: [PATCH 14/22] fix serialize block --- .../pipelines/processors/transforms/runtime_filter.rs | 5 ++++- .../servers/flight/v1/exchange/exchange_sink_writer.rs | 10 +++------- .../flight/v1/exchange/serde/exchange_serializer.rs | 5 ----- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs index f838c994cb630..0b7e33e8dc2bc 100644 --- a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs +++ b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs @@ -122,7 +122,10 @@ impl AsyncSink for RuntimeFilterSinkProcessor { .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to RuntimeFilterMeta"))?; let runtime_filter = RuntimeFilterMeta::downcast_from(ptr) .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to RuntimeFilterMeta"))?; - println!("runtime_filter: {:?}", runtime_filter); + log::info!( + "RuntimeFilterSinkProcessor recv runtime filter: {:?}", + runtime_filter + ); Ok(false) } } diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs index 92128beb3a84b..91bdf774f9bf8 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs @@ -72,13 +72,9 @@ impl AsyncSink for ExchangeWriterSink { None => Err(ErrorCode::Internal( "ExchangeWriterSink only recv ExchangeSerializeMeta, but got none.", )), - Some(block_meta) => ExchangeSerializeMeta::downcast_from(block_meta.clone()) - .ok_or_else(|| { - ErrorCode::Internal(format!( - "ExchangeWriterSink only recv ExchangeSerializeMeta, but got {:?}", - block_meta - )) - }), + Some(block_meta) => ExchangeSerializeMeta::downcast_from(block_meta).ok_or_else(|| { + ErrorCode::Internal("ExchangeWriterSink only recv ExchangeSerializeMeta") + }), }?; let mut bytes = 0; diff --git a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs index 5a757f37ba299..11d17ff6641fe 100644 --- a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs +++ b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs @@ -159,11 +159,6 @@ impl BlockMetaTransform for TransformScatterExchangeSeriali fn transform(&mut self, meta: ExchangeShuffleMeta) -> Result> { let mut new_blocks = Vec::with_capacity(meta.blocks.len()); for (index, block) in meta.blocks.into_iter().enumerate() { - if block.is_empty() { - new_blocks.push(block); - continue; - } - new_blocks.push(match self.local_pos == index { true => block, false => serialize_block(0, block, &self.options)?, From aa7e5afb9483561b6dd1600c9bd01b961b551f86 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 24 Apr 2025 16:54:14 +0800 Subject: [PATCH 15/22] fix sink --- .../src/pipelines/processors/transforms/runtime_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs index 0b7e33e8dc2bc..ac77c751ffd8a 100644 --- a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs +++ b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs @@ -126,6 +126,6 @@ impl AsyncSink for RuntimeFilterSinkProcessor { "RuntimeFilterSinkProcessor recv runtime filter: {:?}", runtime_filter ); - Ok(false) + Ok(true) } } From cbbfb38024637d1efb74aee5bc15274b15f8b3cc Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Fri, 25 Apr 2025 10:04:20 +0800 Subject: [PATCH 16/22] add log --- Cargo.lock | 1 + src/query/pipeline/sources/Cargo.toml | 1 + src/query/pipeline/sources/src/async_source.rs | 4 +++- .../pipelines/processors/transforms/runtime_filter.rs | 10 +++++++++- 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2fa7b7101579..b13a44b07f764 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3950,6 +3950,7 @@ dependencies = [ "databend-common-expression", "databend-common-pipeline-core", "futures", + "log", "parking_lot 0.12.3", ] diff --git a/src/query/pipeline/sources/Cargo.toml b/src/query/pipeline/sources/Cargo.toml index c1b0a014c8157..7a6f6ea131ed5 100644 --- a/src/query/pipeline/sources/Cargo.toml +++ b/src/query/pipeline/sources/Cargo.toml @@ -15,6 +15,7 @@ databend-common-exception = { workspace = true } databend-common-expression = { workspace = true } databend-common-pipeline-core = { workspace = true } futures = { workspace = true } +log = { workspace = true } parking_lot = { workspace = true } [lints] diff --git a/src/query/pipeline/sources/src/async_source.rs b/src/query/pipeline/sources/src/async_source.rs index 77360d4045e2d..72904a145b124 100644 --- a/src/query/pipeline/sources/src/async_source.rs +++ b/src/query/pipeline/sources/src/async_source.rs @@ -131,7 +131,9 @@ impl Processor for AsyncSourcer { return Ok(()); } match self.inner.generate().await? { - None => self.is_finish = true, + None => { + self.is_finish = true; + } Some(data_block) => { if !data_block.is_empty() { let progress_values = ProgressValues { diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs index ac77c751ffd8a..aa6c7c12d54fb 100644 --- a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs +++ b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs @@ -54,7 +54,15 @@ impl AsyncSource for RuntimeFilterSourceProcessor { #[async_backtrace::framed] async fn generate(&mut self) -> Result> { - match self.meta_receiver.recv().await { + let start = std::time::Instant::now(); + log::info!("RuntimeFilterSource recv() start"); + let rf = self.meta_receiver.recv().await; + log::info!( + "RuntimeFilterSource recv() take {:?},get {}", + start.elapsed(), + rf.is_ok() + ); + match rf { Ok(runtime_filter) => Ok(Some(DataBlock::empty_with_meta(Box::new( RuntimeFilterMeta { inlist: runtime_filter.inlist, From ef53984abd02b0726482455f6c34fbe8600fb007 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Fri, 25 Apr 2025 15:32:49 +0800 Subject: [PATCH 17/22] fix send --- .../builders/builder_runtime_filter.rs | 4 +- .../hash_join/hash_join_build_state.rs | 16 ++++--- .../pipelines/processors/transforms/mod.rs | 2 +- .../processors/transforms/runtime_filter.rs | 48 +++++++++++-------- src/query/service/src/sessions/query_ctx.rs | 6 +-- .../service/src/sessions/query_ctx_shared.rs | 10 ++-- 6 files changed, 51 insertions(+), 35 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_runtime_filter.rs b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs index 5a8713795795f..b04f73000c0e9 100644 --- a/src/query/service/src/pipelines/builders/builder_runtime_filter.rs +++ b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs @@ -15,6 +15,7 @@ use databend_common_exception::Result; use databend_common_sql::executor::physical_plans::RuntimeFilterSink; use databend_common_sql::executor::physical_plans::RuntimeFilterSource; +use databend_common_storages_fuse::TableContext; use crate::pipelines::processors::transforms::RuntimeFilterSinkProcessor; use crate::pipelines::processors::transforms::RuntimeFilterSourceProcessor; @@ -37,7 +38,8 @@ impl PipelineBuilder { pub(crate) fn build_runtime_filter_sink(&mut self, sink: &RuntimeFilterSink) -> Result<()> { self.build_pipeline(&sink.input)?; self.main_pipeline.resize(1, true)?; + let node_num = self.ctx.get_cluster().nodes.len(); self.main_pipeline - .add_sink(RuntimeFilterSinkProcessor::create) + .add_sink(|input| RuntimeFilterSinkProcessor::create(input, node_num)) } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index dbb17ee0395d3..f82ef94d9d1cb 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -79,7 +79,7 @@ use crate::pipelines::processors::transforms::hash_join::FixedKeyHashJoinHashTab use crate::pipelines::processors::transforms::hash_join::HashJoinHashTable; use crate::pipelines::processors::transforms::hash_join::SerializerHashJoinHashTable; use crate::pipelines::processors::transforms::hash_join::SingleBinaryHashJoinHashTable; -use crate::pipelines::processors::transforms::RuntimeFilterMeta; +use crate::pipelines::processors::transforms::RuntimeFiltersMeta; use crate::pipelines::processors::HashJoinState; use crate::sessions::QueryContext; @@ -119,7 +119,7 @@ pub struct HashJoinBuildState { /// Spill related states. pub(crate) memory_settings: MemorySettings, - pub(crate) runtime_filter_sender: Option>, + pub(crate) runtime_filter_sender: Option>, } impl HashJoinBuildState { @@ -131,7 +131,7 @@ impl HashJoinBuildState { build_projections: &ColumnSet, hash_join_state: Arc, num_threads: usize, - rf_src_send: Option>, + rf_src_send: Option>, ) -> Result> { let hash_key_types = build_keys .iter() @@ -291,6 +291,7 @@ impl HashJoinBuildState { .build_watcher .send(HashTableType::Empty) .map_err(|_| ErrorCode::TokioError("build_watcher channel is closed"))?; + self.send_runtime_filter_meta(Default::default())?; self.set_bloom_filter_ready(false)?; return Ok(()); } @@ -310,6 +311,7 @@ impl HashJoinBuildState { if self.hash_join_state.spilled_partitions.read().is_empty() { self.add_runtime_filter(&build_chunks, build_num_rows)?; } else { + self.send_runtime_filter_meta(Default::default())?; self.set_bloom_filter_ready(false)?; } @@ -842,6 +844,7 @@ impl HashJoinBuildState { fn add_runtime_filter(&self, build_chunks: &[DataBlock], build_num_rows: usize) -> Result<()> { let mut bloom_filter_ready = false; + let mut runtime_filters_meta = RuntimeFiltersMeta::default(); for rf in self.runtime_filter_desc() { let mut runtime_filter = RuntimeFilterInfo::default(); if rf.enable_inlist_runtime_filter && build_num_rows < INLIST_RUNTIME_FILTER_THRESHOLD { @@ -870,10 +873,11 @@ impl HashJoinBuildState { } if !runtime_filter.is_empty() { bloom_filter_ready |= !runtime_filter.is_blooms_empty(); - self.send_runtime_filter_meta(&runtime_filter)?; + runtime_filters_meta.add(rf.scan_id, &runtime_filter); self.ctx.set_runtime_filter((rf.scan_id, runtime_filter)); } } + self.send_runtime_filter_meta(runtime_filters_meta)?; self.set_bloom_filter_ready(bloom_filter_ready)?; Ok(()) } @@ -1075,9 +1079,9 @@ impl HashJoinBuildState { .any(|rf| rf.enable_min_max_runtime_filter) } - fn send_runtime_filter_meta(&self, runtime_filter_info: &RuntimeFilterInfo) -> Result<()> { + fn send_runtime_filter_meta(&self, runtime_filter: RuntimeFiltersMeta) -> Result<()> { if let Some(sender) = self.runtime_filter_sender.as_ref() { - sender.send_blocking(runtime_filter_info.into()).unwrap(); + sender.send_blocking(runtime_filter).unwrap(); sender.close(); } Ok(()) diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index 9c8e63d221d67..df0dd69f4d85a 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -42,9 +42,9 @@ mod transform_udf_server; mod window; pub use hash_join::*; -pub use runtime_filter::RuntimeFilterMeta; pub use runtime_filter::RuntimeFilterSinkProcessor; pub use runtime_filter::RuntimeFilterSourceProcessor; +pub use runtime_filter::RuntimeFiltersMeta; pub use transform_add_computed_columns::TransformAddComputedColumns; pub use transform_add_const_columns::TransformAddConstColumns; pub use transform_add_internal_columns::TransformAddInternalColumns; diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs index aa6c7c12d54fb..ec4800fbeea4b 100644 --- a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs +++ b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs @@ -32,13 +32,13 @@ use databend_common_pipeline_sources::AsyncSource; use databend_common_pipeline_sources::AsyncSourcer; pub struct RuntimeFilterSourceProcessor { - pub meta_receiver: Receiver, + pub meta_receiver: Receiver, } impl RuntimeFilterSourceProcessor { pub fn create( ctx: Arc, - receiver: Receiver, + receiver: Receiver, output_port: Arc, ) -> Result { AsyncSourcer::create(ctx, output_port, Self { @@ -63,12 +63,7 @@ impl AsyncSource for RuntimeFilterSourceProcessor { rf.is_ok() ); match rf { - Ok(runtime_filter) => Ok(Some(DataBlock::empty_with_meta(Box::new( - RuntimeFilterMeta { - inlist: runtime_filter.inlist, - min_max: runtime_filter.min_max, - }, - )))), + Ok(runtime_filter) => Ok(Some(DataBlock::empty_with_meta(Box::new(runtime_filter)))), Err(_) => { // The channel is closed, we should return None to stop generating Ok(None) @@ -77,22 +72,29 @@ impl AsyncSource for RuntimeFilterSourceProcessor { } } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)] +pub struct RuntimeFiltersMeta { + runtime_filters: Vec, +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct RuntimeFilterMeta { + scan_id: usize, inlist: Vec>, min_max: Vec>, } -#[typetag::serde(name = "runtime_filter_meta")] -impl BlockMetaInfo for RuntimeFilterMeta { +#[typetag::serde(name = "runtime_filters_meta")] +impl BlockMetaInfo for RuntimeFiltersMeta { fn clone_self(&self) -> Box { Box::new(self.clone()) } } -impl From<&RuntimeFilterInfo> for RuntimeFilterMeta { - fn from(runtime_filter_info: &RuntimeFilterInfo) -> Self { - Self { +impl RuntimeFiltersMeta { + pub fn add(&mut self, scan_id: usize, runtime_filter_info: &RuntimeFilterInfo) { + let rf = RuntimeFilterMeta { + scan_id, inlist: runtime_filter_info .inlists_ref() .iter() @@ -103,14 +105,21 @@ impl From<&RuntimeFilterInfo> for RuntimeFilterMeta { .iter() .map(|expr| expr.as_remote_expr()) .collect(), - } + }; + self.runtime_filters.push(rf); } } -pub struct RuntimeFilterSinkProcessor {} +pub struct RuntimeFilterSinkProcessor { + node_num: usize, + recv_num: usize, +} impl RuntimeFilterSinkProcessor { - pub fn create(input: Arc) -> Result { - Ok(ProcessorPtr::create(AsyncSinker::create(input, Self {}))) + pub fn create(input: Arc, node_num: usize) -> Result { + Ok(ProcessorPtr::create(AsyncSinker::create(input, Self { + node_num, + recv_num: 0, + }))) } } @@ -128,12 +137,13 @@ impl AsyncSink for RuntimeFilterSinkProcessor { let ptr = data_block .take_meta() .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to RuntimeFilterMeta"))?; - let runtime_filter = RuntimeFilterMeta::downcast_from(ptr) + let runtime_filter = RuntimeFiltersMeta::downcast_from(ptr) .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to RuntimeFilterMeta"))?; log::info!( "RuntimeFilterSinkProcessor recv runtime filter: {:?}", runtime_filter ); - Ok(true) + self.recv_num += 1; + Ok(self.node_num == self.recv_num) } } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 9f681dc96f470..fcf49ea7e3cb0 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -137,7 +137,7 @@ use crate::clusters::Cluster; use crate::clusters::ClusterHelper; use crate::locks::LockManager; use crate::pipelines::executor::PipelineExecutor; -use crate::pipelines::processors::transforms::RuntimeFilterMeta; +use crate::pipelines::processors::transforms::RuntimeFiltersMeta; use crate::servers::flight::v1::exchange::DataExchangeManager; use crate::sessions::query_affect::QueryAffect; use crate::sessions::query_ctx_shared::MemoryUpdater; @@ -284,11 +284,11 @@ impl QueryContext { } } - pub fn rf_src_recv(&self, join_id: u32) -> Receiver { + pub fn rf_src_recv(&self, join_id: u32) -> Receiver { self.shared.rf_src_recv(join_id) } - pub fn rf_src_send(&self, join_id: u32) -> Sender { + pub fn rf_src_send(&self, join_id: u32) -> Sender { self.shared.rf_src_send(join_id) } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 872217a66ba98..8fc2f6d0df200 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -70,7 +70,7 @@ use uuid::Uuid; use crate::clusters::Cluster; use crate::clusters::ClusterDiscovery; use crate::pipelines::executor::PipelineExecutor; -use crate::pipelines::processors::transforms::RuntimeFilterMeta; +use crate::pipelines::processors::transforms::RuntimeFiltersMeta; use crate::sessions::query_affect::QueryAffect; use crate::sessions::Session; use crate::storages::Table; @@ -179,8 +179,8 @@ pub struct QueryContextShared { } type RuntimeFilterChannel = ( - Option>, - Option>, + Option>, + Option>, ); impl QueryContextShared { @@ -251,7 +251,7 @@ impl QueryContextShared { })) } - pub fn rf_src_recv(&self, join_id: u32) -> Receiver { + pub fn rf_src_recv(&self, join_id: u32) -> Receiver { let mut rf_source = self.rf_source.lock(); match rf_source.get_mut(&join_id).map(|(_, receiver)| receiver) { Some(receiver) => receiver.take().unwrap(), @@ -262,7 +262,7 @@ impl QueryContextShared { } } } - pub fn rf_src_send(&self, join_id: u32) -> Sender { + pub fn rf_src_send(&self, join_id: u32) -> Sender { let mut rf_source = self.rf_source.lock(); match rf_source.get_mut(&join_id).map(|(sender, _)| sender) { Some(sender) => sender.take().unwrap(), From 95f767b01ba49659c5191ce3d9955ed44451483e Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Sun, 27 Apr 2025 21:40:47 +0800 Subject: [PATCH 18/22] merge rf --- Cargo.lock | 1 - src/query/catalog/src/runtime_filter_info.rs | 56 ++--- src/query/catalog/src/table_context.rs | 4 +- src/query/pipeline/sources/Cargo.toml | 1 - .../src/pipelines/builders/builder_join.rs | 6 +- .../builders/builder_runtime_filter.rs | 5 +- .../processors/transforms/hash_join/desc.rs | 4 +- .../hash_join/hash_join_build_state.rs | 69 ++++-- .../processors/transforms/hash_join/mod.rs | 1 + .../pipelines/processors/transforms/mod.rs | 2 +- .../processors/transforms/runtime_filter.rs | 209 ++++++++++++++---- src/query/service/src/sessions/query_ctx.rs | 38 ++-- .../service/src/sessions/query_ctx_shared.rs | 44 +++- .../tests/it/sql/exec/get_table_bind_test.rs | 4 +- .../it/storages/fuse/operations/commit.rs | 4 +- .../sql/src/executor/physical_plan_visitor.rs | 1 + .../physical_plans/physical_hash_join.rs | 1 + .../physical_plans/physical_runtime_filter.rs | 1 + 18 files changed, 302 insertions(+), 149 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b13a44b07f764..e2fa7b7101579 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3950,7 +3950,6 @@ dependencies = [ "databend-common-expression", "databend-common-pipeline-core", "futures", - "log", "parking_lot 0.12.3", ] diff --git a/src/query/catalog/src/runtime_filter_info.rs b/src/query/catalog/src/runtime_filter_info.rs index e59e3e6ede80e..8bffe222cb0a2 100644 --- a/src/query/catalog/src/runtime_filter_info.rs +++ b/src/query/catalog/src/runtime_filter_info.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use databend_common_base::base::tokio::sync::watch; use databend_common_base::base::tokio::sync::watch::Receiver; use databend_common_base::base::tokio::sync::watch::Sender; @@ -19,55 +21,23 @@ use databend_common_expression::Expr; use xorf::BinaryFuse16; #[derive(Clone, Debug, Default)] -pub struct RuntimeFilterInfo { - inlist: Vec>, - min_max: Vec>, - bloom: Vec<(String, BinaryFuse16)>, +pub struct RuntimeFiltersForScan { + pub inlist: HashMap>, + pub min_max: HashMap>, + pub bloom: HashMap, } -impl RuntimeFilterInfo { - pub fn add_inlist(&mut self, expr: Expr) { - self.inlist.push(expr); - } - - pub fn add_bloom(&mut self, bloom: (String, BinaryFuse16)) { - self.bloom.push(bloom); - } - - pub fn add_min_max(&mut self, expr: Expr) { - self.min_max.push(expr); - } - - pub fn get_inlist(&self) -> &Vec> { - &self.inlist - } - - pub fn get_bloom(&self) -> &Vec<(String, BinaryFuse16)> { - &self.bloom - } - - pub fn get_min_max(&self) -> &Vec> { - &self.min_max - } - - pub fn blooms(self) -> Vec<(String, BinaryFuse16)> { - self.bloom - } - - pub fn inlists(self) -> Vec> { - self.inlist - } - - pub fn inlists_ref(&self) -> &Vec> { - &self.inlist +impl RuntimeFiltersForScan { + pub fn add_inlist(&mut self, rf_id: usize, expr: Expr) { + self.inlist.insert(rf_id, expr); } - pub fn min_maxs(self) -> Vec> { - self.min_max + pub fn add_bloom(&mut self, rf_id: usize, bloom: (String, BinaryFuse16)) { + self.bloom.insert(rf_id, bloom); } - pub fn min_maxs_ref(&self) -> &Vec> { - &self.min_max + pub fn add_min_max(&mut self, rf_id: usize, expr: Expr) { + self.min_max.insert(rf_id, expr); } pub fn is_empty(&self) -> bool { diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 5eb826974ef68..346d1a959364e 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -74,8 +74,8 @@ use crate::plan::PartInfoPtr; use crate::plan::PartStatistics; use crate::plan::Partitions; use crate::query_kind::QueryKind; -use crate::runtime_filter_info::RuntimeFilterInfo; use crate::runtime_filter_info::RuntimeFilterReady; +use crate::runtime_filter_info::RuntimeFiltersForScan; use crate::statistics::data_cache_statistics::DataCacheMetrics; use crate::table::Table; @@ -324,7 +324,7 @@ pub trait TableContext: Send + Sync { fn get_query_profiles(&self) -> Vec; - fn set_runtime_filter(&self, filters: (usize, RuntimeFilterInfo)); + fn set_runtime_filter(&self, filters: (usize, RuntimeFiltersForScan)); fn set_runtime_filter_ready(&self, table_index: usize, ready: Arc); diff --git a/src/query/pipeline/sources/Cargo.toml b/src/query/pipeline/sources/Cargo.toml index 7a6f6ea131ed5..c1b0a014c8157 100644 --- a/src/query/pipeline/sources/Cargo.toml +++ b/src/query/pipeline/sources/Cargo.toml @@ -15,7 +15,6 @@ databend-common-exception = { workspace = true } databend-common-expression = { workspace = true } databend-common-pipeline-core = { workspace = true } futures = { workspace = true } -log = { workspace = true } parking_lot = { workspace = true } [lints] diff --git a/src/query/service/src/pipelines/builders/builder_join.rs b/src/query/service/src/pipelines/builders/builder_join.rs index 41e333978f70e..d4d7d5cfa3639 100644 --- a/src/query/service/src/pipelines/builders/builder_join.rs +++ b/src/query/service/src/pipelines/builders/builder_join.rs @@ -27,6 +27,7 @@ use crate::pipelines::processors::transforms::range_join::TransformRangeJoinLeft use crate::pipelines::processors::transforms::range_join::TransformRangeJoinRight; use crate::pipelines::processors::transforms::HashJoinBuildState; use crate::pipelines::processors::transforms::HashJoinProbeState; +use crate::pipelines::processors::transforms::RuntimeFilterChannels; use crate::pipelines::processors::transforms::TransformHashJoinBuild; use crate::pipelines::processors::transforms::TransformHashJoinProbe; use crate::pipelines::processors::HashJoinDesc; @@ -157,7 +158,10 @@ impl PipelineBuilder { hash_join_plan .runtime_filter_plan .as_ref() - .map(|_| self.ctx.rf_src_send(hash_join_plan.join_id)), + .map(|_| RuntimeFilterChannels { + rf_src_send: self.ctx.rf_src_send(hash_join_plan.join_id), + rf_sink_recv: self.ctx.rf_sink_recv(hash_join_plan.join_id), + }), )?; build_state.add_runtime_filter_ready(); diff --git a/src/query/service/src/pipelines/builders/builder_runtime_filter.rs b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs index b04f73000c0e9..4be2d23e42699 100644 --- a/src/query/service/src/pipelines/builders/builder_runtime_filter.rs +++ b/src/query/service/src/pipelines/builders/builder_runtime_filter.rs @@ -39,7 +39,8 @@ impl PipelineBuilder { self.build_pipeline(&sink.input)?; self.main_pipeline.resize(1, true)?; let node_num = self.ctx.get_cluster().nodes.len(); - self.main_pipeline - .add_sink(|input| RuntimeFilterSinkProcessor::create(input, node_num)) + self.main_pipeline.add_sink(|input| { + RuntimeFilterSinkProcessor::create(input, node_num, self.ctx.rf_sink_send(sink.join_id)) + }) } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs index 86c60cf72b992..f464b3c358846 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs @@ -54,7 +54,7 @@ pub struct HashJoinDesc { } pub struct RuntimeFilterDesc { - pub _id: usize, + pub id: usize, pub build_key: Expr, pub probe_key: Expr, pub scan_id: usize, @@ -78,7 +78,7 @@ impl From<&RemoteRuntimeFiltersDesc> for RuntimeFiltersDesc { impl From<&RemoteRuntimeFilterDesc> for RuntimeFilterDesc { fn from(runtime_filter: &RemoteRuntimeFilterDesc) -> Self { Self { - _id: runtime_filter.id, + id: runtime_filter.id, build_key: runtime_filter.build_key.as_expr(&BUILTIN_FUNCTIONS), probe_key: runtime_filter.probe_key.as_expr(&BUILTIN_FUNCTIONS), scan_id: runtime_filter.scan_id, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index f82ef94d9d1cb..951f1fe0a7085 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::collections::HashSet; use std::collections::VecDeque; use std::ops::ControlFlow; @@ -20,10 +21,11 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; +use async_channel::Receiver; use async_channel::Sender; use databend_common_base::base::tokio::sync::Barrier; -use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; +use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan; use databend_common_catalog::table_context::TableContext; use databend_common_column::bitmap::Bitmap; use databend_common_exception::ErrorCode; @@ -79,7 +81,7 @@ use crate::pipelines::processors::transforms::hash_join::FixedKeyHashJoinHashTab use crate::pipelines::processors::transforms::hash_join::HashJoinHashTable; use crate::pipelines::processors::transforms::hash_join::SerializerHashJoinHashTable; use crate::pipelines::processors::transforms::hash_join::SingleBinaryHashJoinHashTable; -use crate::pipelines::processors::transforms::RuntimeFiltersMeta; +use crate::pipelines::processors::transforms::RemoteRuntimeFilters; use crate::pipelines::processors::HashJoinState; use crate::sessions::QueryContext; @@ -119,7 +121,14 @@ pub struct HashJoinBuildState { /// Spill related states. pub(crate) memory_settings: MemorySettings, - pub(crate) runtime_filter_sender: Option>, + pub(crate) rf_channels: Option, +} + +pub struct RuntimeFilterChannels { + /// send runtime filter to `RuntimeFilterSourceProcessor` + pub(crate) rf_src_send: Sender, + /// receive runtime filter from `RuntimeFilterSinkProcessor` + pub(crate) rf_sink_recv: Receiver, } impl HashJoinBuildState { @@ -131,7 +140,7 @@ impl HashJoinBuildState { build_projections: &ColumnSet, hash_join_state: Arc, num_threads: usize, - rf_src_send: Option>, + rf_channels: Option, ) -> Result> { let hash_key_types = build_keys .iter() @@ -168,7 +177,7 @@ impl HashJoinBuildState { build_hash_table_tasks: Default::default(), mutex: Default::default(), memory_settings, - runtime_filter_sender: rf_src_send, + rf_channels, })) } @@ -311,7 +320,6 @@ impl HashJoinBuildState { if self.hash_join_state.spilled_partitions.read().is_empty() { self.add_runtime_filter(&build_chunks, build_num_rows)?; } else { - self.send_runtime_filter_meta(Default::default())?; self.set_bloom_filter_ready(false)?; } @@ -844,15 +852,16 @@ impl HashJoinBuildState { fn add_runtime_filter(&self, build_chunks: &[DataBlock], build_num_rows: usize) -> Result<()> { let mut bloom_filter_ready = false; - let mut runtime_filters_meta = RuntimeFiltersMeta::default(); + let mut runtime_filters = HashMap::new(); for rf in self.runtime_filter_desc() { - let mut runtime_filter = RuntimeFilterInfo::default(); + let mut runtime_filter = RuntimeFiltersForScan::default(); if rf.enable_inlist_runtime_filter && build_num_rows < INLIST_RUNTIME_FILTER_THRESHOLD { self.inlist_runtime_filter( &mut runtime_filter, build_chunks, &rf.build_key, &rf.probe_key, + rf.id, )?; } if rf.enable_bloom_runtime_filter { @@ -861,6 +870,7 @@ impl HashJoinBuildState { &mut runtime_filter, &rf.build_key, &rf.probe_key, + rf.id, )?; } if rf.enable_min_max_runtime_filter { @@ -869,15 +879,15 @@ impl HashJoinBuildState { &mut runtime_filter, &rf.build_key, &rf.probe_key, + rf.id, )?; } if !runtime_filter.is_empty() { bloom_filter_ready |= !runtime_filter.is_blooms_empty(); - runtime_filters_meta.add(rf.scan_id, &runtime_filter); - self.ctx.set_runtime_filter((rf.scan_id, runtime_filter)); + runtime_filters.insert(rf.scan_id, runtime_filter); } } - self.send_runtime_filter_meta(runtime_filters_meta)?; + self.send_runtime_filter_meta(runtime_filters)?; self.set_bloom_filter_ready(bloom_filter_ready)?; Ok(()) } @@ -885,9 +895,10 @@ impl HashJoinBuildState { fn bloom_runtime_filter( &self, data_blocks: &[DataBlock], - runtime_filter: &mut RuntimeFilterInfo, + runtime_filter: &mut RuntimeFiltersForScan, build_key: &Expr, probe_key: &Expr, + rf_id: usize, ) -> Result<()> { if !build_key.data_type().remove_nullable().is_number() && !build_key.data_type().remove_nullable().is_string() @@ -925,23 +936,24 @@ impl HashJoinBuildState { hashes_vec.push(hash); }); let filter = BinaryFuse16::try_from(&hashes_vec)?; - runtime_filter.add_bloom((id.to_string(), filter)); + runtime_filter.add_bloom(rf_id, (id.to_string(), filter)); Ok(()) } fn inlist_runtime_filter( &self, - runtime_filter: &mut RuntimeFilterInfo, + runtime_filter: &mut RuntimeFiltersForScan, data_blocks: &[DataBlock], build_key: &Expr, probe_key: &Expr, + rf_id: usize, ) -> Result<()> { if let Some(distinct_build_column) = dedup_build_key_column(&self.func_ctx, data_blocks, build_key)? { if let Some(filter) = inlist_filter(probe_key, distinct_build_column.clone())? { info!("inlist_filter: {:?}", filter.sql_display()); - runtime_filter.add_inlist(filter); + runtime_filter.add_inlist(rf_id, filter); } } Ok(()) @@ -950,9 +962,10 @@ impl HashJoinBuildState { fn min_max_runtime_filter( &self, data_blocks: &[DataBlock], - runtime_filter: &mut RuntimeFilterInfo, + runtime_filter: &mut RuntimeFiltersForScan, build_key: &Expr, probe_key: &Expr, + rf_id: usize, ) -> Result<()> { if !build_key.runtime_filter_supported_types() { return Ok(()); @@ -1045,7 +1058,7 @@ impl HashJoinBuildState { }; if let Some(min_max_filter) = min_max_filter { info!("min_max_filter: {:?}", min_max_filter.sql_display()); - runtime_filter.add_min_max(min_max_filter); + runtime_filter.add_min_max(rf_id, min_max_filter); } } Ok(()) @@ -1079,10 +1092,24 @@ impl HashJoinBuildState { .any(|rf| rf.enable_min_max_runtime_filter) } - fn send_runtime_filter_meta(&self, runtime_filter: RuntimeFiltersMeta) -> Result<()> { - if let Some(sender) = self.runtime_filter_sender.as_ref() { - sender.send_blocking(runtime_filter).unwrap(); - sender.close(); + fn send_runtime_filter_meta( + &self, + mut rf: HashMap, + ) -> Result<()> { + if let Some(channels) = self.rf_channels.as_ref() { + channels + .rf_src_send + .send_blocking(rf.into()) + .map_err(|_| ErrorCode::TokioError("send runtime filter meta failed"))?; + channels.rf_src_send.close(); + let merged_rf = channels + .rf_sink_recv + .recv_blocking() + .map_err(|_| ErrorCode::TokioError("receive runtime filter meta failed"))?; + rf = merged_rf.into(); + } + for (scan_id, runtime_filter) in rf.into_iter() { + self.ctx.set_runtime_filter((scan_id, runtime_filter)); } Ok(()) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs index d50ee45b166eb..1337e15f35cf3 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs @@ -31,6 +31,7 @@ mod util; pub use desc::HashJoinDesc; pub use hash_join_build_state::HashJoinBuildState; +pub use hash_join_build_state::RuntimeFilterChannels; pub use hash_join_probe_state::HashJoinProbeState; pub use hash_join_spiller::HashJoinSpiller; pub use hash_join_state::*; diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index df0dd69f4d85a..904af8803524b 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -42,9 +42,9 @@ mod transform_udf_server; mod window; pub use hash_join::*; +pub use runtime_filter::RemoteRuntimeFilters; pub use runtime_filter::RuntimeFilterSinkProcessor; pub use runtime_filter::RuntimeFilterSourceProcessor; -pub use runtime_filter::RuntimeFiltersMeta; pub use transform_add_computed_columns::TransformAddComputedColumns; pub use transform_add_const_columns::TransformAddConstColumns; pub use transform_add_internal_columns::TransformAddInternalColumns; diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs index ec4800fbeea4b..9629b4de24f92 100644 --- a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs +++ b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs @@ -12,17 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use async_channel::Receiver; -use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; +use async_channel::Sender; +use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::types::DataType; use databend_common_expression::BlockMetaInfo; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; +use databend_common_expression::FunctionID; use databend_common_expression::RemoteExpr; +use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::ProcessorPtr; @@ -32,18 +37,16 @@ use databend_common_pipeline_sources::AsyncSource; use databend_common_pipeline_sources::AsyncSourcer; pub struct RuntimeFilterSourceProcessor { - pub meta_receiver: Receiver, + pub receiver: Receiver, } impl RuntimeFilterSourceProcessor { pub fn create( ctx: Arc, - receiver: Receiver, + receiver: Receiver, output_port: Arc, ) -> Result { - AsyncSourcer::create(ctx, output_port, Self { - meta_receiver: receiver, - }) + AsyncSourcer::create(ctx, output_port, Self { receiver }) } } @@ -56,7 +59,7 @@ impl AsyncSource for RuntimeFilterSourceProcessor { async fn generate(&mut self) -> Result> { let start = std::time::Instant::now(); log::info!("RuntimeFilterSource recv() start"); - let rf = self.meta_receiver.recv().await; + let rf = self.receiver.recv().await; log::info!( "RuntimeFilterSource recv() take {:?},get {}", start.elapsed(), @@ -72,53 +75,24 @@ impl AsyncSource for RuntimeFilterSourceProcessor { } } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)] -pub struct RuntimeFiltersMeta { - runtime_filters: Vec, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct RuntimeFilterMeta { - scan_id: usize, - inlist: Vec>, - min_max: Vec>, -} - -#[typetag::serde(name = "runtime_filters_meta")] -impl BlockMetaInfo for RuntimeFiltersMeta { - fn clone_self(&self) -> Box { - Box::new(self.clone()) - } -} - -impl RuntimeFiltersMeta { - pub fn add(&mut self, scan_id: usize, runtime_filter_info: &RuntimeFilterInfo) { - let rf = RuntimeFilterMeta { - scan_id, - inlist: runtime_filter_info - .inlists_ref() - .iter() - .map(|expr| expr.as_remote_expr()) - .collect(), - min_max: runtime_filter_info - .min_maxs_ref() - .iter() - .map(|expr| expr.as_remote_expr()) - .collect(), - }; - self.runtime_filters.push(rf); - } -} pub struct RuntimeFilterSinkProcessor { node_num: usize, recv_num: usize, + rf: Vec, + sender: Sender, } impl RuntimeFilterSinkProcessor { - pub fn create(input: Arc, node_num: usize) -> Result { + pub fn create( + input: Arc, + node_num: usize, + sender: Sender, + ) -> Result { Ok(ProcessorPtr::create(AsyncSinker::create(input, Self { node_num, recv_num: 0, + rf: vec![], + sender, }))) } } @@ -137,13 +111,154 @@ impl AsyncSink for RuntimeFilterSinkProcessor { let ptr = data_block .take_meta() .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to RuntimeFilterMeta"))?; - let runtime_filter = RuntimeFiltersMeta::downcast_from(ptr) + let runtime_filter = RemoteRuntimeFilters::downcast_from(ptr) .ok_or_else(|| ErrorCode::Internal("Cannot downcast meta to RuntimeFilterMeta"))?; log::info!( "RuntimeFilterSinkProcessor recv runtime filter: {:?}", runtime_filter ); self.recv_num += 1; - Ok(self.node_num == self.recv_num) + self.rf.push(runtime_filter); + let all_recv = self.node_num == self.recv_num; + if all_recv { + let merged_rf = RemoteRuntimeFilters::merge(&self.rf); + self.sender.send(merged_rf).await.map_err(|_| { + ErrorCode::Internal("RuntimeFilterSinkProcessor failed to send runtime filter") + })?; + } + Ok(all_recv) + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)] +pub struct RemoteRuntimeFilters { + scan_id_to_runtime_filter: HashMap, +} + +impl From> for RemoteRuntimeFilters { + fn from(rfs: HashMap) -> Self { + let mut remote_runtime_filters = RemoteRuntimeFilters::default(); + for (scan_id, runtime_filter) in rfs { + remote_runtime_filters.add(scan_id, &runtime_filter); + } + remote_runtime_filters + } +} + +impl From for HashMap { + fn from(rfs: RemoteRuntimeFilters) -> Self { + rfs.scan_id_to_runtime_filter + .into_iter() + .map(|(scan_id, runtime_filter)| (scan_id, runtime_filter.into())) + .collect() + } +} + +impl From for RuntimeFiltersForScan { + fn from(rfs: RemoteRuntimeFiltersForScan) -> Self { + Self { + inlist: rfs + .rf_id_to_inlist + .into_iter() + .map(|(id, expr)| (id, expr.as_expr(&BUILTIN_FUNCTIONS))) + .collect(), + min_max: rfs + .rf_id_to_min_max + .into_iter() + .map(|(id, expr)| (id, expr.as_expr(&BUILTIN_FUNCTIONS))) + .collect(), + bloom: Default::default(), + } + } +} + +impl RemoteRuntimeFilters { + pub fn merge(rfs: &[RemoteRuntimeFilters]) -> Self { + if rfs.is_empty() { + return RemoteRuntimeFilters::default(); + } + + let mut common_scans: Vec = + rfs[0].scan_id_to_runtime_filter.keys().cloned().collect(); + for rf in &rfs[1..] { + common_scans.retain(|scan_id| rf.scan_id_to_runtime_filter.contains_key(scan_id)); + } + + let mut merged = RemoteRuntimeFilters::default(); + + for scan_id in common_scans { + let mut merged_for_scan = RemoteRuntimeFiltersForScan::default(); + let first_scan = &rfs[0].scan_id_to_runtime_filter[&scan_id]; + + let mut common_inlist_ids: Vec = + first_scan.rf_id_to_inlist.keys().cloned().collect(); + let mut common_min_max_ids: Vec = + first_scan.rf_id_to_min_max.keys().cloned().collect(); + for rf in &rfs[1..] { + let scan_filter = &rf.scan_id_to_runtime_filter[&scan_id]; + common_inlist_ids.retain(|id| scan_filter.rf_id_to_inlist.contains_key(id)); + common_min_max_ids.retain(|id| scan_filter.rf_id_to_min_max.contains_key(id)); + } + + for rf_id in &common_inlist_ids { + let mut exprs = Vec::new(); + for rf in rfs.iter() { + exprs.push( + rf.scan_id_to_runtime_filter[&scan_id].rf_id_to_inlist[rf_id].clone(), + ); + } + let merged_expr = exprs + .into_iter() + .reduce(|acc, expr| RemoteExpr::FunctionCall { + span: None, + id: Box::new(FunctionID::Builtin { + name: "or".to_string(), + id: 0, + }), + generics: vec![], + args: vec![acc, expr], + return_type: DataType::Boolean, + }) + .unwrap(); + merged_for_scan.rf_id_to_inlist.insert(*rf_id, merged_expr); + } + + merged + .scan_id_to_runtime_filter + .insert(scan_id, merged_for_scan); + } + + merged + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)] +pub struct RemoteRuntimeFiltersForScan { + rf_id_to_inlist: HashMap>, + rf_id_to_min_max: HashMap>, +} + +#[typetag::serde(name = "runtime_filters_for_join")] +impl BlockMetaInfo for RemoteRuntimeFilters { + fn clone_self(&self) -> Box { + Box::new(self.clone()) + } +} + +impl RemoteRuntimeFilters { + pub fn add(&mut self, scan_id: usize, r: &RuntimeFiltersForScan) { + let rf = RemoteRuntimeFiltersForScan { + rf_id_to_inlist: r + .inlist + .iter() + .map(|(id, expr)| (*id, expr.as_remote_expr())) + .collect(), + rf_id_to_min_max: r + .min_max + .iter() + .map(|(id, expr)| (*id, expr.as_remote_expr())) + .collect(), + }; + self.scan_id_to_runtime_filter.insert(scan_id, rf); } } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index fcf49ea7e3cb0..e44a7ad6c3914 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -54,8 +54,8 @@ use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::StageTableInfo; use databend_common_catalog::query_kind::QueryKind; -use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; +use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table_args::TableArgs; use databend_common_catalog::table_context::ContextError; @@ -137,7 +137,7 @@ use crate::clusters::Cluster; use crate::clusters::ClusterHelper; use crate::locks::LockManager; use crate::pipelines::executor::PipelineExecutor; -use crate::pipelines::processors::transforms::RuntimeFiltersMeta; +use crate::pipelines::processors::transforms::RemoteRuntimeFilters; use crate::servers::flight::v1::exchange::DataExchangeManager; use crate::sessions::query_affect::QueryAffect; use crate::sessions::query_ctx_shared::MemoryUpdater; @@ -284,14 +284,22 @@ impl QueryContext { } } - pub fn rf_src_recv(&self, join_id: u32) -> Receiver { + pub fn rf_src_recv(&self, join_id: u32) -> Receiver { self.shared.rf_src_recv(join_id) } - pub fn rf_src_send(&self, join_id: u32) -> Sender { + pub fn rf_src_send(&self, join_id: u32) -> Sender { self.shared.rf_src_send(join_id) } + pub fn rf_sink_recv(&self, join_id: u32) -> Receiver { + self.shared.rf_sink_recv(join_id) + } + + pub fn rf_sink_send(&self, join_id: u32) -> Sender { + self.shared.rf_sink_send(join_id) + } + pub fn attach_table(&self, catalog: &str, database: &str, name: &str, table: Arc) { self.shared.attach_table(catalog, database, name, table) } @@ -1392,21 +1400,21 @@ impl TableContext for QueryContext { runtime_filters.clear(); } - fn set_runtime_filter(&self, filters: (IndexType, RuntimeFilterInfo)) { + fn set_runtime_filter(&self, filters: (IndexType, RuntimeFiltersForScan)) { let mut runtime_filters = self.shared.runtime_filters.write(); match runtime_filters.entry(filters.0) { Entry::Vacant(v) => { v.insert(filters.1); } Entry::Occupied(mut v) => { - for filter in filters.1.get_inlist() { - v.get_mut().add_inlist(filter.clone()); + for (rf_id, filter) in filters.1.inlist.into_iter() { + v.get_mut().add_inlist(rf_id, filter); } - for filter in filters.1.get_min_max() { - v.get_mut().add_min_max(filter.clone()); + for (rf_id, filter) in filters.1.min_max.into_iter() { + v.get_mut().add_min_max(rf_id, filter); } - for filter in filters.1.blooms() { - v.get_mut().add_bloom(filter); + for (rf_id, filter) in filters.1.bloom.into_iter() { + v.get_mut().add_bloom(rf_id, filter); } } } @@ -1457,7 +1465,7 @@ impl TableContext for QueryContext { fn get_bloom_runtime_filter_with_id(&self, id: IndexType) -> Vec<(String, BinaryFuse16)> { let runtime_filters = self.shared.runtime_filters.read(); match runtime_filters.get(&id) { - Some(v) => (v.get_bloom()).clone(), + Some(v) => v.bloom.values().cloned().collect(), None => vec![], } } @@ -1465,7 +1473,7 @@ impl TableContext for QueryContext { fn get_inlist_runtime_filter_with_id(&self, id: IndexType) -> Vec> { let runtime_filters = self.shared.runtime_filters.read(); match runtime_filters.get(&id) { - Some(v) => (v.get_inlist()).clone(), + Some(v) => v.inlist.values().cloned().collect(), None => vec![], } } @@ -1473,14 +1481,14 @@ impl TableContext for QueryContext { fn get_min_max_runtime_filter_with_id(&self, id: IndexType) -> Vec> { let runtime_filters = self.shared.runtime_filters.read(); match runtime_filters.get(&id) { - Some(v) => (v.get_min_max()).clone(), + Some(v) => v.min_max.values().cloned().collect(), None => vec![], } } fn has_bloom_runtime_filters(&self, id: usize) -> bool { if let Some(runtime_filter) = self.shared.runtime_filters.read().get(&id) { - return !runtime_filter.get_bloom().is_empty(); + return !runtime_filter.bloom.is_empty(); } false } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 8fc2f6d0df200..bedce1a8a04a0 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -37,8 +37,8 @@ use databend_common_catalog::catalog::CatalogManager; use databend_common_catalog::merge_into_join::MergeIntoJoin; use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::query_kind::QueryKind; -use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; +use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table_context::ContextError; use databend_common_catalog::table_context::StageAttachment; @@ -70,7 +70,7 @@ use uuid::Uuid; use crate::clusters::Cluster; use crate::clusters::ClusterDiscovery; use crate::pipelines::executor::PipelineExecutor; -use crate::pipelines::processors::transforms::RuntimeFiltersMeta; +use crate::pipelines::processors::transforms::RemoteRuntimeFilters; use crate::sessions::query_affect::QueryAffect; use crate::sessions::Session; use crate::storages::Table; @@ -147,7 +147,7 @@ pub struct QueryContextShared { pub(in crate::sessions) query_profiles: Arc, PlanProfile>>>, - pub(in crate::sessions) runtime_filters: Arc>>, + pub(in crate::sessions) runtime_filters: Arc>>, pub(in crate::sessions) runtime_filter_ready: Arc>>>>, @@ -173,14 +173,16 @@ pub struct QueryContextShared { pub(in crate::sessions) selected_segment_locs: Arc>>, // join_id -> (sender, receiver) - pub(in crate::sessions) rf_source: Arc>>, + pub(in crate::sessions) rf_source: Arc>>, + + pub(in crate::sessions) rf_sink: Arc>>, pub(in crate::sessions) pruned_partitions_stats: Arc>>, } -type RuntimeFilterChannel = ( - Option>, - Option>, +type RuntimeFilterSourceChannel = ( + Option>, + Option>, ); impl QueryContextShared { @@ -247,11 +249,12 @@ impl QueryContextShared { node_memory_usage: Arc::new(RwLock::new(HashMap::new())), selected_segment_locs: Default::default(), rf_source: Arc::new(Mutex::new(HashMap::new())), + rf_sink: Arc::new(Mutex::new(HashMap::new())), pruned_partitions_stats: Arc::new(RwLock::new(None)), })) } - pub fn rf_src_recv(&self, join_id: u32) -> Receiver { + pub fn rf_src_recv(&self, join_id: u32) -> Receiver { let mut rf_source = self.rf_source.lock(); match rf_source.get_mut(&join_id).map(|(_, receiver)| receiver) { Some(receiver) => receiver.take().unwrap(), @@ -262,7 +265,7 @@ impl QueryContextShared { } } } - pub fn rf_src_send(&self, join_id: u32) -> Sender { + pub fn rf_src_send(&self, join_id: u32) -> Sender { let mut rf_source = self.rf_source.lock(); match rf_source.get_mut(&join_id).map(|(sender, _)| sender) { Some(sender) => sender.take().unwrap(), @@ -274,6 +277,29 @@ impl QueryContextShared { } } + pub fn rf_sink_recv(&self, join_id: u32) -> Receiver { + let mut rf_sink = self.rf_sink.lock(); + match rf_sink.get_mut(&join_id).map(|(_, receiver)| receiver) { + Some(receiver) => receiver.take().unwrap(), + None => { + let (sender, receiver) = async_channel::unbounded(); + rf_sink.insert(join_id, (Some(sender), None)); + receiver + } + } + } + pub fn rf_sink_send(&self, join_id: u32) -> Sender { + let mut rf_sink = self.rf_sink.lock(); + match rf_sink.get_mut(&join_id).map(|(sender, _)| sender) { + Some(sender) => sender.take().unwrap(), + None => { + let (sender, receiver) = async_channel::unbounded(); + rf_sink.insert(join_id, (None, Some(receiver))); + sender + } + } + } + pub fn set_error(&self, err: ErrorCode) { let err = err.with_context("query context error"); diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index 0c2bebe146f42..272c96e97b748 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -31,8 +31,8 @@ use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::Partitions; use databend_common_catalog::query_kind::QueryKind; -use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; +use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::ContextError; @@ -924,7 +924,7 @@ impl TableContext for CtxDelegation { todo!() } - fn set_runtime_filter(&self, _filters: (IndexType, RuntimeFilterInfo)) { + fn set_runtime_filter(&self, _filters: (IndexType, RuntimeFiltersForScan)) { todo!() } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index e41d6c7c2fb73..eeed126767ae1 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -30,8 +30,8 @@ use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::Partitions; use databend_common_catalog::query_kind::QueryKind; -use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; +use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::ContextError; @@ -789,7 +789,7 @@ impl TableContext for CtxDelegation { todo!() } - fn set_runtime_filter(&self, _filters: (IndexType, RuntimeFilterInfo)) { + fn set_runtime_filter(&self, _filters: (IndexType, RuntimeFiltersForScan)) { todo!() } diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 2b3bbf5c3703e..031c8359dfc1a 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -138,6 +138,7 @@ pub trait PhysicalPlanReplacer { let input = self.replace(&plan.input)?; Ok(PhysicalPlan::RuntimeFilterSink(RuntimeFilterSink { plan_id: plan.plan_id, + join_id: plan.join_id, input: Box::new(input), })) } diff --git a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs index ec1f33174ca95..c9d6467f507df 100644 --- a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs +++ b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs @@ -601,6 +601,7 @@ impl PhysicalPlanBuilder { let runtime_filter_sink = Box::new(PhysicalPlan::RuntimeFilterSink(RuntimeFilterSink { plan_id: 0, input: exchange, + join_id, })); Ok(runtime_filter_sink) } diff --git a/src/query/sql/src/executor/physical_plans/physical_runtime_filter.rs b/src/query/sql/src/executor/physical_plans/physical_runtime_filter.rs index 89fe24ead7893..09186b62139e5 100644 --- a/src/query/sql/src/executor/physical_plans/physical_runtime_filter.rs +++ b/src/query/sql/src/executor/physical_plans/physical_runtime_filter.rs @@ -23,5 +23,6 @@ pub struct RuntimeFilterSource { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct RuntimeFilterSink { pub plan_id: u32, + pub join_id: u32, pub input: Box, } From f668d1c70327702d74df5e0d41bdd43c3b6d0a7e Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Mon, 28 Apr 2025 09:56:25 +0800 Subject: [PATCH 19/22] fix --- src/query/sql/src/executor/physical_plans/physical_hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs index c9d6467f507df..e3ddfaf6f06d9 100644 --- a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs +++ b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs @@ -636,7 +636,7 @@ impl PhysicalPlanBuilder { let enable_bloom_runtime_filter = { // shuffle join does not support bloom runtime filter for now - let is_shuffle = self.ctx.get_cluster().is_empty() && !is_broadcast; + let is_shuffle = !self.ctx.get_cluster().is_empty() && !is_broadcast; let is_supported_type = data_type.is_number() || data_type.is_string(); let enable_bloom_runtime_filter_based_on_stats = adjust_bloom_runtime_filter( self.ctx.clone(), From 10e0fae5a4bbfa8f77864d1aa26be2ec8a62eef9 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Mon, 28 Apr 2025 16:08:58 +0800 Subject: [PATCH 20/22] fix --- .../processors/transforms/runtime_filter.rs | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs index 9629b4de24f92..e66a7aa57019a 100644 --- a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs +++ b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs @@ -174,6 +174,7 @@ impl From for RuntimeFiltersForScan { impl RemoteRuntimeFilters { pub fn merge(rfs: &[RemoteRuntimeFilters]) -> Self { + log::info!("start merge runtime filters: {:?}", rfs); if rfs.is_empty() { return RemoteRuntimeFilters::default(); } @@ -207,22 +208,47 @@ impl RemoteRuntimeFilters { rf.scan_id_to_runtime_filter[&scan_id].rf_id_to_inlist[rf_id].clone(), ); } + log::info!("merge inlist: {:?}, rf_id: {:?}", exprs, rf_id); let merged_expr = exprs .into_iter() .reduce(|acc, expr| RemoteExpr::FunctionCall { span: None, id: Box::new(FunctionID::Builtin { name: "or".to_string(), - id: 0, + id: 1, }), generics: vec![], args: vec![acc, expr], - return_type: DataType::Boolean, + return_type: DataType::Nullable(Box::new(DataType::Boolean)), }) .unwrap(); merged_for_scan.rf_id_to_inlist.insert(*rf_id, merged_expr); } + for rf_id in &common_min_max_ids { + let mut exprs = Vec::new(); + for rf in rfs.iter() { + exprs.push( + rf.scan_id_to_runtime_filter[&scan_id].rf_id_to_min_max[rf_id].clone(), + ); + } + log::info!("merge min_max: {:?}, rf_id: {:?}", exprs, rf_id); + let merged_expr = exprs + .into_iter() + .reduce(|acc, expr| RemoteExpr::FunctionCall { + span: None, + id: Box::new(FunctionID::Builtin { + name: "or".to_string(), + id: 1, + }), + generics: vec![], + args: vec![acc, expr], + return_type: DataType::Nullable(Box::new(DataType::Boolean)), + }) + .unwrap(); + merged_for_scan.rf_id_to_min_max.insert(*rf_id, merged_expr); + } + merged .scan_id_to_runtime_filter .insert(scan_id, merged_for_scan); From 00b834aac0beff0f007b5b50683115f41b5a9dfa Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Mon, 28 Apr 2025 20:47:10 +0800 Subject: [PATCH 21/22] fix --- .../hash_join/hash_join_build_state.rs | 8 +- .../processors/transforms/runtime_filter.rs | 96 ++++++++++--------- 2 files changed, 54 insertions(+), 50 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index 951f1fe0a7085..f4a1459499f67 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -300,7 +300,7 @@ impl HashJoinBuildState { .build_watcher .send(HashTableType::Empty) .map_err(|_| ErrorCode::TokioError("build_watcher channel is closed"))?; - self.send_runtime_filter_meta(Default::default())?; + self.send_runtime_filter_meta(None)?; self.set_bloom_filter_ready(false)?; return Ok(()); } @@ -887,7 +887,7 @@ impl HashJoinBuildState { runtime_filters.insert(rf.scan_id, runtime_filter); } } - self.send_runtime_filter_meta(runtime_filters)?; + self.send_runtime_filter_meta(Some(runtime_filters))?; self.set_bloom_filter_ready(bloom_filter_ready)?; Ok(()) } @@ -1094,7 +1094,7 @@ impl HashJoinBuildState { fn send_runtime_filter_meta( &self, - mut rf: HashMap, + mut rf: Option>, ) -> Result<()> { if let Some(channels) = self.rf_channels.as_ref() { channels @@ -1108,7 +1108,7 @@ impl HashJoinBuildState { .map_err(|_| ErrorCode::TokioError("receive runtime filter meta failed"))?; rf = merged_rf.into(); } - for (scan_id, runtime_filter) in rf.into_iter() { + for (scan_id, runtime_filter) in rf.unwrap_or_default().into_iter() { self.ctx.set_runtime_filter((scan_id, runtime_filter)); } Ok(()) diff --git a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs index e66a7aa57019a..4f29c19377521 100644 --- a/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs +++ b/src/query/service/src/pipelines/processors/transforms/runtime_filter.rs @@ -130,27 +130,33 @@ impl AsyncSink for RuntimeFilterSinkProcessor { } } +/// One-to-one correspondence with HashJoin operator. +/// +/// When the build side is empty, `scan_id_to_runtime_filter` is `None`. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)] pub struct RemoteRuntimeFilters { - scan_id_to_runtime_filter: HashMap, + scan_id_to_runtime_filter: Option>, } -impl From> for RemoteRuntimeFilters { - fn from(rfs: HashMap) -> Self { - let mut remote_runtime_filters = RemoteRuntimeFilters::default(); - for (scan_id, runtime_filter) in rfs { - remote_runtime_filters.add(scan_id, &runtime_filter); +impl From>> for RemoteRuntimeFilters { + fn from(rfs: Option>) -> Self { + RemoteRuntimeFilters { + scan_id_to_runtime_filter: rfs.map(|rfs| { + rfs.into_iter() + .map(|(scan_id, runtime_filter)| (scan_id, runtime_filter.into())) + .collect() + }), } - remote_runtime_filters } } -impl From for HashMap { +impl From for Option> { fn from(rfs: RemoteRuntimeFilters) -> Self { - rfs.scan_id_to_runtime_filter - .into_iter() - .map(|(scan_id, runtime_filter)| (scan_id, runtime_filter.into())) - .collect() + rfs.scan_id_to_runtime_filter.map(|rfs| { + rfs.into_iter() + .map(|(scan_id, runtime_filter)| (scan_id, runtime_filter.into())) + .collect() + }) } } @@ -172,31 +178,51 @@ impl From for RuntimeFiltersForScan { } } +impl From for RemoteRuntimeFiltersForScan { + fn from(rfs: RuntimeFiltersForScan) -> Self { + Self { + rf_id_to_inlist: rfs + .inlist + .iter() + .map(|(id, expr)| (*id, expr.as_remote_expr())) + .collect(), + rf_id_to_min_max: rfs + .min_max + .iter() + .map(|(id, expr)| (*id, expr.as_remote_expr())) + .collect(), + } + } +} impl RemoteRuntimeFilters { pub fn merge(rfs: &[RemoteRuntimeFilters]) -> Self { log::info!("start merge runtime filters: {:?}", rfs); + let rfs = rfs + .iter() + .filter_map(|rfs| rfs.scan_id_to_runtime_filter.as_ref()) + .collect::>(); + if rfs.is_empty() { return RemoteRuntimeFilters::default(); } - let mut common_scans: Vec = - rfs[0].scan_id_to_runtime_filter.keys().cloned().collect(); + let mut common_scans: Vec = rfs[0].keys().cloned().collect(); for rf in &rfs[1..] { - common_scans.retain(|scan_id| rf.scan_id_to_runtime_filter.contains_key(scan_id)); + common_scans.retain(|scan_id| rf.contains_key(scan_id)); } - let mut merged = RemoteRuntimeFilters::default(); + let mut merged = HashMap::new(); for scan_id in common_scans { let mut merged_for_scan = RemoteRuntimeFiltersForScan::default(); - let first_scan = &rfs[0].scan_id_to_runtime_filter[&scan_id]; + let first_scan = &rfs[0][&scan_id]; let mut common_inlist_ids: Vec = first_scan.rf_id_to_inlist.keys().cloned().collect(); let mut common_min_max_ids: Vec = first_scan.rf_id_to_min_max.keys().cloned().collect(); for rf in &rfs[1..] { - let scan_filter = &rf.scan_id_to_runtime_filter[&scan_id]; + let scan_filter = &rf[&scan_id]; common_inlist_ids.retain(|id| scan_filter.rf_id_to_inlist.contains_key(id)); common_min_max_ids.retain(|id| scan_filter.rf_id_to_min_max.contains_key(id)); } @@ -204,9 +230,7 @@ impl RemoteRuntimeFilters { for rf_id in &common_inlist_ids { let mut exprs = Vec::new(); for rf in rfs.iter() { - exprs.push( - rf.scan_id_to_runtime_filter[&scan_id].rf_id_to_inlist[rf_id].clone(), - ); + exprs.push(rf[&scan_id].rf_id_to_inlist[rf_id].clone()); } log::info!("merge inlist: {:?}, rf_id: {:?}", exprs, rf_id); let merged_expr = exprs @@ -228,9 +252,7 @@ impl RemoteRuntimeFilters { for rf_id in &common_min_max_ids { let mut exprs = Vec::new(); for rf in rfs.iter() { - exprs.push( - rf.scan_id_to_runtime_filter[&scan_id].rf_id_to_min_max[rf_id].clone(), - ); + exprs.push(rf[&scan_id].rf_id_to_min_max[rf_id].clone()); } log::info!("merge min_max: {:?}, rf_id: {:?}", exprs, rf_id); let merged_expr = exprs @@ -249,12 +271,12 @@ impl RemoteRuntimeFilters { merged_for_scan.rf_id_to_min_max.insert(*rf_id, merged_expr); } - merged - .scan_id_to_runtime_filter - .insert(scan_id, merged_for_scan); + merged.insert(scan_id, merged_for_scan); } - merged + RemoteRuntimeFilters { + scan_id_to_runtime_filter: Some(merged), + } } } @@ -270,21 +292,3 @@ impl BlockMetaInfo for RemoteRuntimeFilters { Box::new(self.clone()) } } - -impl RemoteRuntimeFilters { - pub fn add(&mut self, scan_id: usize, r: &RuntimeFiltersForScan) { - let rf = RemoteRuntimeFiltersForScan { - rf_id_to_inlist: r - .inlist - .iter() - .map(|(id, expr)| (*id, expr.as_remote_expr())) - .collect(), - rf_id_to_min_max: r - .min_max - .iter() - .map(|(id, expr)| (*id, expr.as_remote_expr())) - .collect(), - }; - self.scan_id_to_runtime_filter.insert(scan_id, rf); - } -} From beff896ea5e549f7edbe81cef53e54ef4aff9e14 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Mon, 28 Apr 2025 21:19:46 +0800 Subject: [PATCH 22/22] fix conflict --- .../sql/src/executor/physical_plans/mod.rs | 6 +- .../physical_plans/physical_hash_join.rs | 37 ++++++++--- .../physical_plans/physical_join_filter.rs | 61 ++++++++++++++----- .../physical_plans/physical_runtime_filter.rs | 28 --------- 4 files changed, 80 insertions(+), 52 deletions(-) delete mode 100644 src/query/sql/src/executor/physical_plans/physical_runtime_filter.rs diff --git a/src/query/sql/src/executor/physical_plans/mod.rs b/src/query/sql/src/executor/physical_plans/mod.rs index ab3e282abec34..82a40fe356181 100644 --- a/src/query/sql/src/executor/physical_plans/mod.rs +++ b/src/query/sql/src/executor/physical_plans/mod.rs @@ -82,8 +82,10 @@ pub use physical_filter::Filter; pub use physical_hash_join::HashJoin; pub use physical_join::PhysicalJoinType; pub use physical_join_filter::JoinRuntimeFilter; -pub use physical_join_filter::PhysicalRuntimeFilter; -pub use physical_join_filter::PhysicalRuntimeFilters; +pub use physical_join_filter::RemoteRuntimeFilterDesc; +pub use physical_join_filter::RemoteRuntimeFiltersDesc; +pub use physical_join_filter::RuntimeFilterSink; +pub use physical_join_filter::RuntimeFilterSource; pub use physical_limit::Limit; pub use physical_multi_table_insert::*; pub use physical_mutation::*; diff --git a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs index c2b7cc21c1b72..dd9cc320e6cee 100644 --- a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs +++ b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs @@ -27,8 +27,9 @@ use databend_common_expression::DataSchemaRefExt; use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; +use super::physical_join_filter::build_runtime_filter_plan; +use super::physical_join_filter::RemoteRuntimeFiltersDesc; use super::JoinRuntimeFilter; -use super::PhysicalRuntimeFilters; use crate::executor::explain::PlanStatsInfo; use crate::executor::physical_plans::Exchange; use crate::executor::physical_plans::FragmentKind; @@ -107,7 +108,9 @@ pub struct HashJoin { // a HashMap for mapping the column indexes to the BlockEntry indexes in DataBlock. pub build_side_cache_info: Option<(usize, HashMap)>, - pub runtime_filter: PhysicalRuntimeFilters, + pub runtime_filter_desc: RemoteRuntimeFiltersDesc, + pub runtime_filter_plan: Option>, + pub join_id: u32, } impl HashJoin { @@ -815,7 +818,9 @@ impl PhysicalPlanBuilder { probe_to_build: Vec<(usize, (bool, bool))>, output_schema: DataSchemaRef, build_side_cache_info: Option<(usize, HashMap)>, - runtime_filter: PhysicalRuntimeFilters, + runtime_filter_desc: RemoteRuntimeFiltersDesc, + runtime_filter_plan: Option>, + join_id: u32, stat_info: PlanStatsInfo, ) -> Result { Ok(PhysicalPlan::HashJoin(HashJoin { @@ -839,7 +844,9 @@ impl PhysicalPlanBuilder { broadcast: is_broadcast, single_to_inner: join.single_to_inner.clone(), build_side_cache_info, - runtime_filter, + runtime_filter_desc, + runtime_filter_plan, + join_id, })) } @@ -915,7 +922,7 @@ impl PhysicalPlanBuilder { let non_equi_conditions = self.process_non_equi_conditions(join, &merged_schema)?; // Step 11: Build runtime filter - let runtime_filter = self + let runtime_filter_desc = self .build_runtime_filter( join, s_expr, @@ -925,6 +932,18 @@ impl PhysicalPlanBuilder { ) .await?; + let join_id = self.next_hash_join_id; + self.next_hash_join_id += 1; + + let runtime_filter_plan = if !runtime_filter_desc.filters.is_empty() + && !self.ctx.get_cluster().is_empty() + && !is_broadcast + { + Some(build_runtime_filter_plan(join_id)?) + } else { + None + }; + // Step 12: Create and return the HashJoin self.create_hash_join( join, @@ -941,7 +960,9 @@ impl PhysicalPlanBuilder { probe_to_build, output_schema, build_side_cache_info, - runtime_filter, + runtime_filter_desc, + runtime_filter_plan, + join_id, stat_info, ) } @@ -953,8 +974,8 @@ impl PhysicalPlanBuilder { is_broadcast: bool, build_keys: &[RemoteExpr], probe_keys: Vec, usize, usize)>>, - ) -> Result { - JoinRuntimeFilter::build_runtime_filter( + ) -> Result { + JoinRuntimeFilter::build_runtime_filter_desc( self.ctx.clone(), &self.metadata, join, diff --git a/src/query/sql/src/executor/physical_plans/physical_join_filter.rs b/src/query/sql/src/executor/physical_plans/physical_join_filter.rs index b28d0b581bbee..fa13a877f584d 100644 --- a/src/query/sql/src/executor/physical_plans/physical_join_filter.rs +++ b/src/query/sql/src/executor/physical_plans/physical_join_filter.rs @@ -22,6 +22,9 @@ use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_storages_common_table_meta::table::get_change_type; +use super::Exchange; +use super::FragmentKind; +use crate::executor::PhysicalPlan; use crate::optimizer::ir::RelExpr; use crate::optimizer::ir::SExpr; use crate::plans::Join; @@ -30,12 +33,12 @@ use crate::IndexType; use crate::MetadataRef; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)] -pub struct PhysicalRuntimeFilters { - pub filters: Vec, +pub struct RemoteRuntimeFiltersDesc { + pub filters: Vec, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct PhysicalRuntimeFilter { +pub struct RemoteRuntimeFilterDesc { pub id: usize, pub build_key: RemoteExpr, pub probe_key: RemoteExpr, @@ -136,7 +139,7 @@ impl JoinRuntimeFilter { } /// Build runtime filters for a join operation - pub async fn build_runtime_filter( + pub async fn build_runtime_filter_desc( ctx: Arc, metadata: &MetadataRef, join: &Join, @@ -144,18 +147,12 @@ impl JoinRuntimeFilter { is_broadcast: bool, build_keys: &[RemoteExpr], probe_keys: Vec, usize, usize)>>, - ) -> Result { + ) -> Result { // Early return if runtime filters are not supported for this join type if !Self::supported_join_type_for_runtime_filter(&join.join_type) { return Ok(Default::default()); } - // For cluster, only support runtime filter for broadcast join - let is_cluster = !ctx.get_cluster().is_empty(); - if is_cluster && !is_broadcast { - return Ok(Default::default()); - } - let mut filters = Vec::new(); // Process each probe key that has runtime filter information @@ -177,6 +174,8 @@ impl JoinRuntimeFilter { // Determine which filter types to enable based on data type and statistics let enable_bloom_runtime_filter = { + // shuffle join does not support bloom runtime filter for now + let is_shuffle = !ctx.get_cluster().is_empty() && !is_broadcast; let is_supported_type = Self::is_type_supported_for_bloom_filter(&data_type); let enable_bloom_runtime_filter_based_on_stats = Self::adjust_bloom_runtime_filter( ctx.clone(), @@ -185,14 +184,14 @@ impl JoinRuntimeFilter { s_expr, ) .await?; - is_supported_type && enable_bloom_runtime_filter_based_on_stats + !is_shuffle && is_supported_type && enable_bloom_runtime_filter_based_on_stats }; let enable_min_max_runtime_filter = Self::is_type_supported_for_min_max_filter(&data_type); // Create and add the runtime filter - let runtime_filter = PhysicalRuntimeFilter { + let runtime_filter = RemoteRuntimeFilterDesc { id, build_key: build_key.clone(), probe_key, @@ -204,6 +203,40 @@ impl JoinRuntimeFilter { filters.push(runtime_filter); } - Ok(PhysicalRuntimeFilters { filters }) + Ok(RemoteRuntimeFiltersDesc { filters }) } } + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct RuntimeFilterSource { + pub plan_id: u32, + pub join_id: u32, +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct RuntimeFilterSink { + pub plan_id: u32, + pub join_id: u32, + pub input: Box, +} + +pub fn build_runtime_filter_plan(join_id: u32) -> Result> { + let runtime_filter_source = Box::new(PhysicalPlan::RuntimeFilterSource(RuntimeFilterSource { + plan_id: 0, + join_id, + })); + let exchange = Box::new(PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: runtime_filter_source, + kind: FragmentKind::Expansive, + keys: vec![], + allow_adjust_parallelism: true, + ignore_exchange: false, + })); + let runtime_filter_sink = Box::new(PhysicalPlan::RuntimeFilterSink(RuntimeFilterSink { + plan_id: 0, + input: exchange, + join_id, + })); + Ok(runtime_filter_sink) +} diff --git a/src/query/sql/src/executor/physical_plans/physical_runtime_filter.rs b/src/query/sql/src/executor/physical_plans/physical_runtime_filter.rs deleted file mode 100644 index 09186b62139e5..0000000000000 --- a/src/query/sql/src/executor/physical_plans/physical_runtime_filter.rs +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::executor::PhysicalPlan; - -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct RuntimeFilterSource { - pub plan_id: u32, - pub join_id: u32, -} - -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct RuntimeFilterSink { - pub plan_id: u32, - pub join_id: u32, - pub input: Box, -}