Skip to content

Commit 1c14898

Browse files
Serialize hash join dynamic filter routing mode
Remove temporary proto debug logging while wiring routing mode through protobuf encode/decode.
1 parent 8a6a4bc commit 1c14898

5 files changed

Lines changed: 178 additions & 60 deletions

File tree

datafusion/proto/proto/datafusion.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,6 +1126,11 @@ enum PartitionMode {
11261126
AUTO = 2;
11271127
}
11281128

1129+
enum HashJoinDynamicFilterRoutingMode {
1130+
CASE_HASH = 0;
1131+
PARTITION_INDEX = 1;
1132+
}
1133+
11291134
message HashJoinExecNode {
11301135
PhysicalPlanNode left = 1;
11311136
PhysicalPlanNode right = 2;
@@ -1137,6 +1142,8 @@ message HashJoinExecNode {
11371142
repeated uint32 projection = 9;
11381143
// Optional dynamic filter expression for pushing down to the probe side.
11391144
PhysicalExprNode dynamic_filter = 10;
1145+
// Selected routing strategy for partitioned dynamic filter expressions.
1146+
HashJoinDynamicFilterRoutingMode dynamic_filter_routing_mode = 11;
11401147
}
11411148

11421149
enum StreamPartitionMode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 91 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 31 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ use datafusion_physical_plan::expressions::PhysicalSortExpr;
6767
use datafusion_physical_plan::filter::FilterExec;
6868
use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
6969
use datafusion_physical_plan::joins::{
70-
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
71-
StreamJoinPartitionMode, SymmetricHashJoinExec,
70+
CrossJoinExec, DynamicFilterRoutingMode, HashJoinExec, NestedLoopJoinExec,
71+
PartitionMode, SortMergeJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
7272
};
7373
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
7474
use datafusion_physical_plan::memory::LazyMemoryExec;
@@ -1353,6 +1353,24 @@ impl protobuf::PhysicalPlanNode {
13531353
protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
13541354
protobuf::PartitionMode::Auto => PartitionMode::Auto,
13551355
};
1356+
let dynamic_filter_routing_mode =
1357+
protobuf::HashJoinDynamicFilterRoutingMode::try_from(
1358+
hashjoin.dynamic_filter_routing_mode,
1359+
)
1360+
.map_err(|_| {
1361+
proto_error(format!(
1362+
"Received a HashJoinNode message with unknown HashJoinDynamicFilterRoutingMode {}",
1363+
hashjoin.dynamic_filter_routing_mode
1364+
))
1365+
})?;
1366+
let dynamic_filter_routing_mode = match dynamic_filter_routing_mode {
1367+
protobuf::HashJoinDynamicFilterRoutingMode::CaseHash => {
1368+
DynamicFilterRoutingMode::CaseHash
1369+
}
1370+
protobuf::HashJoinDynamicFilterRoutingMode::PartitionIndex => {
1371+
DynamicFilterRoutingMode::PartitionIndex
1372+
}
1373+
};
13561374
let projection = if !hashjoin.projection.is_empty() {
13571375
Some(
13581376
hashjoin
@@ -1388,6 +1406,8 @@ impl protobuf::PhysicalPlanNode {
13881406
hash_join = hash_join.with_dynamic_filter(df)?;
13891407
}
13901408
}
1409+
hash_join =
1410+
hash_join.with_dynamic_filter_routing_mode(dynamic_filter_routing_mode);
13911411

13921412
Ok(Arc::new(hash_join))
13931413
}
@@ -2432,6 +2452,14 @@ impl protobuf::PhysicalPlanNode {
24322452
proto_converter.physical_expr_to_proto(&df_expr, codec)
24332453
})
24342454
.transpose()?;
2455+
let dynamic_filter_routing_mode = match exec.dynamic_filter_routing_mode() {
2456+
DynamicFilterRoutingMode::CaseHash => {
2457+
protobuf::HashJoinDynamicFilterRoutingMode::CaseHash
2458+
}
2459+
DynamicFilterRoutingMode::PartitionIndex => {
2460+
protobuf::HashJoinDynamicFilterRoutingMode::PartitionIndex
2461+
}
2462+
};
24352463

24362464
Ok(protobuf::PhysicalPlanNode {
24372465
physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
@@ -2447,6 +2475,7 @@ impl protobuf::PhysicalPlanNode {
24472475
v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
24482476
}),
24492477
dynamic_filter,
2478+
dynamic_filter_routing_mode: dynamic_filter_routing_mode.into(),
24502479
},
24512480
))),
24522481
})
@@ -3820,12 +3849,6 @@ impl DeduplicatingSerializer {
38203849
}
38213850
}
38223851

3823-
fn dedup_debug_enabled() -> bool {
3824-
std::env::var("DD_DF_PROTO_DEBUG")
3825-
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
3826-
.unwrap_or(false)
3827-
}
3828-
38293852
impl PhysicalProtoConverterExtension for DeduplicatingSerializer {
38303853
fn proto_to_execution_plan(
38313854
&self,
@@ -3878,12 +3901,6 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer {
38783901
proto.dynamic_filter_inner_id = Some(self.hash(dynamic_filter.inner_id()))
38793902
}
38803903
proto.expr_id = Some(self.hash(Arc::as_ptr(expr) as *const () as u64));
3881-
if dedup_debug_enabled() && proto.dynamic_filter_inner_id.is_some() {
3882-
eprintln!(
3883-
"[df-proto][ser] dynamic expr_id={:?} dynamic_filter_inner_id={:?} expr={expr}",
3884-
proto.expr_id, proto.dynamic_filter_inner_id
3885-
);
3886-
}
38873904

38883905
Ok(proto)
38893906
}
@@ -3936,24 +3953,12 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer {
39363953
if let Some(expr_id) = proto.expr_id
39373954
&& let Some(cached) = self.cache.borrow().get(&expr_id)
39383955
{
3939-
if dedup_debug_enabled() && proto.dynamic_filter_inner_id.is_some() {
3940-
eprintln!(
3941-
"[df-proto][de] cache_hit expr_id={expr_id} dynamic_filter_inner_id={:?}",
3942-
proto.dynamic_filter_inner_id
3943-
);
3944-
}
39453956
return Ok(Arc::clone(cached));
39463957
}
39473958

39483959
// Cache miss, we must deserialize the expr.
39493960
let mut expr =
39503961
parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)?;
3951-
if dedup_debug_enabled() && proto.dynamic_filter_inner_id.is_some() {
3952-
eprintln!(
3953-
"[df-proto][de] cache_miss expr_id={:?} dynamic_filter_inner_id={:?}",
3954-
proto.expr_id, proto.dynamic_filter_inner_id
3955-
);
3956-
}
39573962

39583963
// Check if we need to share inner state with a cached dynamic filter
39593964
if let Some(dynamic_filter_id) = proto.dynamic_filter_inner_id {
@@ -3975,21 +3980,11 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer {
39753980
.map_err(|_| internal_datafusion_err!("dynamic_filter_id present in proto, but the expression was not a DynamicFilterPhysicalExpr"))?;
39763981
expr = Arc::new(dynamic_filter_expr.new_from_source(cached_df)?)
39773982
as Arc<dyn PhysicalExpr>;
3978-
if dedup_debug_enabled() {
3979-
eprintln!(
3980-
"[df-proto][de] relinked dynamic_filter_inner_id={dynamic_filter_id} from cached source"
3981-
);
3982-
}
39833983
} else {
39843984
// Cache it
39853985
self.dynamic_filter_cache
39863986
.borrow_mut()
39873987
.insert(dynamic_filter_id, Arc::clone(&expr));
3988-
if dedup_debug_enabled() {
3989-
eprintln!(
3990-
"[df-proto][de] seeded dynamic_filter_inner_id={dynamic_filter_id} into cache"
3991-
);
3992-
}
39933988
}
39943989
};
39953990

@@ -4035,15 +4030,9 @@ impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter {
40354030
codec: &dyn PhysicalExtensionCodec,
40364031
proto: &protobuf::PhysicalPlanNode,
40374032
) -> Result<Arc<dyn ExecutionPlan>> {
4038-
if dedup_debug_enabled() {
4039-
eprintln!("[df-proto][plan-de] start");
4040-
}
40414033
let deserializer = DeduplicatingDeserializer::default();
40424034
let plan =
40434035
proto.try_into_physical_plan_with_converter(ctx, codec, &deserializer)?;
4044-
if dedup_debug_enabled() {
4045-
eprintln!("[df-proto][plan-de] done plan={}", plan.name());
4046-
}
40474036
Ok(plan)
40484037
}
40494038

@@ -4055,18 +4044,12 @@ impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter {
40554044
where
40564045
Self: Sized,
40574046
{
4058-
if dedup_debug_enabled() {
4059-
eprintln!("[df-proto][plan-ser] start plan={}", plan.name());
4060-
}
40614047
let serializer = DeduplicatingSerializer::new();
40624048
let proto = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
40634049
Arc::clone(plan),
40644050
codec,
40654051
&serializer,
40664052
)?;
4067-
if dedup_debug_enabled() {
4068-
eprintln!("[df-proto][plan-ser] done plan={}", plan.name());
4069-
}
40704053
Ok(proto)
40714054
}
40724055

0 commit comments

Comments
 (0)