Skip to content

Commit 95f767b

Browse files
committed
merge rf
1 parent ef53984 commit 95f767b

File tree

18 files changed

+302
-149
lines changed

18 files changed

+302
-149
lines changed

Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/catalog/src/runtime_filter_info.rs

+13-43
Original file line numberDiff line numberDiff line change
@@ -12,62 +12,32 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
16+
1517
use databend_common_base::base::tokio::sync::watch;
1618
use databend_common_base::base::tokio::sync::watch::Receiver;
1719
use databend_common_base::base::tokio::sync::watch::Sender;
1820
use databend_common_expression::Expr;
1921
use xorf::BinaryFuse16;
2022

2123
#[derive(Clone, Debug, Default)]
22-
pub struct RuntimeFilterInfo {
23-
inlist: Vec<Expr<String>>,
24-
min_max: Vec<Expr<String>>,
25-
bloom: Vec<(String, BinaryFuse16)>,
24+
pub struct RuntimeFiltersForScan {
25+
pub inlist: HashMap<usize, Expr<String>>,
26+
pub min_max: HashMap<usize, Expr<String>>,
27+
pub bloom: HashMap<usize, (String, BinaryFuse16)>,
2628
}
2729

28-
impl RuntimeFilterInfo {
29-
pub fn add_inlist(&mut self, expr: Expr<String>) {
30-
self.inlist.push(expr);
31-
}
32-
33-
pub fn add_bloom(&mut self, bloom: (String, BinaryFuse16)) {
34-
self.bloom.push(bloom);
35-
}
36-
37-
pub fn add_min_max(&mut self, expr: Expr<String>) {
38-
self.min_max.push(expr);
39-
}
40-
41-
pub fn get_inlist(&self) -> &Vec<Expr<String>> {
42-
&self.inlist
43-
}
44-
45-
pub fn get_bloom(&self) -> &Vec<(String, BinaryFuse16)> {
46-
&self.bloom
47-
}
48-
49-
pub fn get_min_max(&self) -> &Vec<Expr<String>> {
50-
&self.min_max
51-
}
52-
53-
pub fn blooms(self) -> Vec<(String, BinaryFuse16)> {
54-
self.bloom
55-
}
56-
57-
pub fn inlists(self) -> Vec<Expr<String>> {
58-
self.inlist
59-
}
60-
61-
pub fn inlists_ref(&self) -> &Vec<Expr<String>> {
62-
&self.inlist
30+
impl RuntimeFiltersForScan {
31+
pub fn add_inlist(&mut self, rf_id: usize, expr: Expr<String>) {
32+
self.inlist.insert(rf_id, expr);
6333
}
6434

65-
pub fn min_maxs(self) -> Vec<Expr<String>> {
66-
self.min_max
35+
pub fn add_bloom(&mut self, rf_id: usize, bloom: (String, BinaryFuse16)) {
36+
self.bloom.insert(rf_id, bloom);
6737
}
6838

69-
pub fn min_maxs_ref(&self) -> &Vec<Expr<String>> {
70-
&self.min_max
39+
pub fn add_min_max(&mut self, rf_id: usize, expr: Expr<String>) {
40+
self.min_max.insert(rf_id, expr);
7141
}
7242

7343
pub fn is_empty(&self) -> bool {

src/query/catalog/src/table_context.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ use crate::plan::PartInfoPtr;
7474
use crate::plan::PartStatistics;
7575
use crate::plan::Partitions;
7676
use crate::query_kind::QueryKind;
77-
use crate::runtime_filter_info::RuntimeFilterInfo;
7877
use crate::runtime_filter_info::RuntimeFilterReady;
78+
use crate::runtime_filter_info::RuntimeFiltersForScan;
7979
use crate::statistics::data_cache_statistics::DataCacheMetrics;
8080
use crate::table::Table;
8181

@@ -324,7 +324,7 @@ pub trait TableContext: Send + Sync {
324324

325325
fn get_query_profiles(&self) -> Vec<PlanProfile>;
326326

327-
fn set_runtime_filter(&self, filters: (usize, RuntimeFilterInfo));
327+
fn set_runtime_filter(&self, filters: (usize, RuntimeFiltersForScan));
328328

329329
fn set_runtime_filter_ready(&self, table_index: usize, ready: Arc<RuntimeFilterReady>);
330330

src/query/pipeline/sources/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ databend-common-exception = { workspace = true }
1515
databend-common-expression = { workspace = true }
1616
databend-common-pipeline-core = { workspace = true }
1717
futures = { workspace = true }
18-
log = { workspace = true }
1918
parking_lot = { workspace = true }
2019

2120
[lints]

src/query/service/src/pipelines/builders/builder_join.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::pipelines::processors::transforms::range_join::TransformRangeJoinLeft
2727
use crate::pipelines::processors::transforms::range_join::TransformRangeJoinRight;
2828
use crate::pipelines::processors::transforms::HashJoinBuildState;
2929
use crate::pipelines::processors::transforms::HashJoinProbeState;
30+
use crate::pipelines::processors::transforms::RuntimeFilterChannels;
3031
use crate::pipelines::processors::transforms::TransformHashJoinBuild;
3132
use crate::pipelines::processors::transforms::TransformHashJoinProbe;
3233
use crate::pipelines::processors::HashJoinDesc;
@@ -157,7 +158,10 @@ impl PipelineBuilder {
157158
hash_join_plan
158159
.runtime_filter_plan
159160
.as_ref()
160-
.map(|_| self.ctx.rf_src_send(hash_join_plan.join_id)),
161+
.map(|_| RuntimeFilterChannels {
162+
rf_src_send: self.ctx.rf_src_send(hash_join_plan.join_id),
163+
rf_sink_recv: self.ctx.rf_sink_recv(hash_join_plan.join_id),
164+
}),
161165
)?;
162166
build_state.add_runtime_filter_ready();
163167

src/query/service/src/pipelines/builders/builder_runtime_filter.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ impl PipelineBuilder {
3939
self.build_pipeline(&sink.input)?;
4040
self.main_pipeline.resize(1, true)?;
4141
let node_num = self.ctx.get_cluster().nodes.len();
42-
self.main_pipeline
43-
.add_sink(|input| RuntimeFilterSinkProcessor::create(input, node_num))
42+
self.main_pipeline.add_sink(|input| {
43+
RuntimeFilterSinkProcessor::create(input, node_num, self.ctx.rf_sink_send(sink.join_id))
44+
})
4445
}
4546
}

src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub struct HashJoinDesc {
5454
}
5555

5656
pub struct RuntimeFilterDesc {
57-
pub _id: usize,
57+
pub id: usize,
5858
pub build_key: Expr,
5959
pub probe_key: Expr<String>,
6060
pub scan_id: usize,
@@ -78,7 +78,7 @@ impl From<&RemoteRuntimeFiltersDesc> for RuntimeFiltersDesc {
7878
impl From<&RemoteRuntimeFilterDesc> for RuntimeFilterDesc {
7979
fn from(runtime_filter: &RemoteRuntimeFilterDesc) -> Self {
8080
Self {
81-
_id: runtime_filter.id,
81+
id: runtime_filter.id,
8282
build_key: runtime_filter.build_key.as_expr(&BUILTIN_FUNCTIONS),
8383
probe_key: runtime_filter.probe_key.as_expr(&BUILTIN_FUNCTIONS),
8484
scan_id: runtime_filter.scan_id,

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs

+48-21
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::collections::HashSet;
1617
use std::collections::VecDeque;
1718
use std::ops::ControlFlow;
@@ -20,10 +21,11 @@ use std::sync::atomic::AtomicUsize;
2021
use std::sync::atomic::Ordering;
2122
use std::sync::Arc;
2223

24+
use async_channel::Receiver;
2325
use async_channel::Sender;
2426
use databend_common_base::base::tokio::sync::Barrier;
25-
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
2627
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
28+
use databend_common_catalog::runtime_filter_info::RuntimeFiltersForScan;
2729
use databend_common_catalog::table_context::TableContext;
2830
use databend_common_column::bitmap::Bitmap;
2931
use databend_common_exception::ErrorCode;
@@ -79,7 +81,7 @@ use crate::pipelines::processors::transforms::hash_join::FixedKeyHashJoinHashTab
7981
use crate::pipelines::processors::transforms::hash_join::HashJoinHashTable;
8082
use crate::pipelines::processors::transforms::hash_join::SerializerHashJoinHashTable;
8183
use crate::pipelines::processors::transforms::hash_join::SingleBinaryHashJoinHashTable;
82-
use crate::pipelines::processors::transforms::RuntimeFiltersMeta;
84+
use crate::pipelines::processors::transforms::RemoteRuntimeFilters;
8385
use crate::pipelines::processors::HashJoinState;
8486
use crate::sessions::QueryContext;
8587

@@ -119,7 +121,14 @@ pub struct HashJoinBuildState {
119121

120122
/// Spill related states.
121123
pub(crate) memory_settings: MemorySettings,
122-
pub(crate) runtime_filter_sender: Option<Sender<RuntimeFiltersMeta>>,
124+
pub(crate) rf_channels: Option<RuntimeFilterChannels>,
125+
}
126+
127+
pub struct RuntimeFilterChannels {
128+
/// send runtime filter to `RuntimeFilterSourceProcessor`
129+
pub(crate) rf_src_send: Sender<RemoteRuntimeFilters>,
130+
/// receive runtime filter from `RuntimeFilterSinkProcessor`
131+
pub(crate) rf_sink_recv: Receiver<RemoteRuntimeFilters>,
123132
}
124133

125134
impl HashJoinBuildState {
@@ -131,7 +140,7 @@ impl HashJoinBuildState {
131140
build_projections: &ColumnSet,
132141
hash_join_state: Arc<HashJoinState>,
133142
num_threads: usize,
134-
rf_src_send: Option<Sender<RuntimeFiltersMeta>>,
143+
rf_channels: Option<RuntimeFilterChannels>,
135144
) -> Result<Arc<HashJoinBuildState>> {
136145
let hash_key_types = build_keys
137146
.iter()
@@ -168,7 +177,7 @@ impl HashJoinBuildState {
168177
build_hash_table_tasks: Default::default(),
169178
mutex: Default::default(),
170179
memory_settings,
171-
runtime_filter_sender: rf_src_send,
180+
rf_channels,
172181
}))
173182
}
174183

@@ -311,7 +320,6 @@ impl HashJoinBuildState {
311320
if self.hash_join_state.spilled_partitions.read().is_empty() {
312321
self.add_runtime_filter(&build_chunks, build_num_rows)?;
313322
} else {
314-
self.send_runtime_filter_meta(Default::default())?;
315323
self.set_bloom_filter_ready(false)?;
316324
}
317325

@@ -844,15 +852,16 @@ impl HashJoinBuildState {
844852

845853
fn add_runtime_filter(&self, build_chunks: &[DataBlock], build_num_rows: usize) -> Result<()> {
846854
let mut bloom_filter_ready = false;
847-
let mut runtime_filters_meta = RuntimeFiltersMeta::default();
855+
let mut runtime_filters = HashMap::new();
848856
for rf in self.runtime_filter_desc() {
849-
let mut runtime_filter = RuntimeFilterInfo::default();
857+
let mut runtime_filter = RuntimeFiltersForScan::default();
850858
if rf.enable_inlist_runtime_filter && build_num_rows < INLIST_RUNTIME_FILTER_THRESHOLD {
851859
self.inlist_runtime_filter(
852860
&mut runtime_filter,
853861
build_chunks,
854862
&rf.build_key,
855863
&rf.probe_key,
864+
rf.id,
856865
)?;
857866
}
858867
if rf.enable_bloom_runtime_filter {
@@ -861,6 +870,7 @@ impl HashJoinBuildState {
861870
&mut runtime_filter,
862871
&rf.build_key,
863872
&rf.probe_key,
873+
rf.id,
864874
)?;
865875
}
866876
if rf.enable_min_max_runtime_filter {
@@ -869,25 +879,26 @@ impl HashJoinBuildState {
869879
&mut runtime_filter,
870880
&rf.build_key,
871881
&rf.probe_key,
882+
rf.id,
872883
)?;
873884
}
874885
if !runtime_filter.is_empty() {
875886
bloom_filter_ready |= !runtime_filter.is_blooms_empty();
876-
runtime_filters_meta.add(rf.scan_id, &runtime_filter);
877-
self.ctx.set_runtime_filter((rf.scan_id, runtime_filter));
887+
runtime_filters.insert(rf.scan_id, runtime_filter);
878888
}
879889
}
880-
self.send_runtime_filter_meta(runtime_filters_meta)?;
890+
self.send_runtime_filter_meta(runtime_filters)?;
881891
self.set_bloom_filter_ready(bloom_filter_ready)?;
882892
Ok(())
883893
}
884894

885895
fn bloom_runtime_filter(
886896
&self,
887897
data_blocks: &[DataBlock],
888-
runtime_filter: &mut RuntimeFilterInfo,
898+
runtime_filter: &mut RuntimeFiltersForScan,
889899
build_key: &Expr,
890900
probe_key: &Expr<String>,
901+
rf_id: usize,
891902
) -> Result<()> {
892903
if !build_key.data_type().remove_nullable().is_number()
893904
&& !build_key.data_type().remove_nullable().is_string()
@@ -925,23 +936,24 @@ impl HashJoinBuildState {
925936
hashes_vec.push(hash);
926937
});
927938
let filter = BinaryFuse16::try_from(&hashes_vec)?;
928-
runtime_filter.add_bloom((id.to_string(), filter));
939+
runtime_filter.add_bloom(rf_id, (id.to_string(), filter));
929940
Ok(())
930941
}
931942

932943
fn inlist_runtime_filter(
933944
&self,
934-
runtime_filter: &mut RuntimeFilterInfo,
945+
runtime_filter: &mut RuntimeFiltersForScan,
935946
data_blocks: &[DataBlock],
936947
build_key: &Expr,
937948
probe_key: &Expr<String>,
949+
rf_id: usize,
938950
) -> Result<()> {
939951
if let Some(distinct_build_column) =
940952
dedup_build_key_column(&self.func_ctx, data_blocks, build_key)?
941953
{
942954
if let Some(filter) = inlist_filter(probe_key, distinct_build_column.clone())? {
943955
info!("inlist_filter: {:?}", filter.sql_display());
944-
runtime_filter.add_inlist(filter);
956+
runtime_filter.add_inlist(rf_id, filter);
945957
}
946958
}
947959
Ok(())
@@ -950,9 +962,10 @@ impl HashJoinBuildState {
950962
fn min_max_runtime_filter(
951963
&self,
952964
data_blocks: &[DataBlock],
953-
runtime_filter: &mut RuntimeFilterInfo,
965+
runtime_filter: &mut RuntimeFiltersForScan,
954966
build_key: &Expr,
955967
probe_key: &Expr<String>,
968+
rf_id: usize,
956969
) -> Result<()> {
957970
if !build_key.runtime_filter_supported_types() {
958971
return Ok(());
@@ -1045,7 +1058,7 @@ impl HashJoinBuildState {
10451058
};
10461059
if let Some(min_max_filter) = min_max_filter {
10471060
info!("min_max_filter: {:?}", min_max_filter.sql_display());
1048-
runtime_filter.add_min_max(min_max_filter);
1061+
runtime_filter.add_min_max(rf_id, min_max_filter);
10491062
}
10501063
}
10511064
Ok(())
@@ -1079,10 +1092,24 @@ impl HashJoinBuildState {
10791092
.any(|rf| rf.enable_min_max_runtime_filter)
10801093
}
10811094

1082-
fn send_runtime_filter_meta(&self, runtime_filter: RuntimeFiltersMeta) -> Result<()> {
1083-
if let Some(sender) = self.runtime_filter_sender.as_ref() {
1084-
sender.send_blocking(runtime_filter).unwrap();
1085-
sender.close();
1095+
fn send_runtime_filter_meta(
1096+
&self,
1097+
mut rf: HashMap<usize, RuntimeFiltersForScan>,
1098+
) -> Result<()> {
1099+
if let Some(channels) = self.rf_channels.as_ref() {
1100+
channels
1101+
.rf_src_send
1102+
.send_blocking(rf.into())
1103+
.map_err(|_| ErrorCode::TokioError("send runtime filter meta failed"))?;
1104+
channels.rf_src_send.close();
1105+
let merged_rf = channels
1106+
.rf_sink_recv
1107+
.recv_blocking()
1108+
.map_err(|_| ErrorCode::TokioError("receive runtime filter meta failed"))?;
1109+
rf = merged_rf.into();
1110+
}
1111+
for (scan_id, runtime_filter) in rf.into_iter() {
1112+
self.ctx.set_runtime_filter((scan_id, runtime_filter));
10861113
}
10871114
Ok(())
10881115
}

src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ mod util;
3131

3232
pub use desc::HashJoinDesc;
3333
pub use hash_join_build_state::HashJoinBuildState;
34+
pub use hash_join_build_state::RuntimeFilterChannels;
3435
pub use hash_join_probe_state::HashJoinProbeState;
3536
pub use hash_join_spiller::HashJoinSpiller;
3637
pub use hash_join_state::*;

src/query/service/src/pipelines/processors/transforms/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ mod transform_udf_server;
4242
mod window;
4343

4444
pub use hash_join::*;
45+
pub use runtime_filter::RemoteRuntimeFilters;
4546
pub use runtime_filter::RuntimeFilterSinkProcessor;
4647
pub use runtime_filter::RuntimeFilterSourceProcessor;
47-
pub use runtime_filter::RuntimeFiltersMeta;
4848
pub use transform_add_computed_columns::TransformAddComputedColumns;
4949
pub use transform_add_const_columns::TransformAddConstColumns;
5050
pub use transform_add_internal_columns::TransformAddInternalColumns;

0 commit comments

Comments
 (0)