Skip to content

Commit 158f2cf

Browse files
wip
1 parent 468b690 commit 158f2cf

7 files changed

Lines changed: 163 additions & 10 deletions

File tree

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,14 @@ impl DynamicFilterPhysicalExpr {
327327
Arc::strong_count(self) > 1 || Arc::strong_count(&self.inner) > 1
328328
}
329329

330+
/// Returns a unique identifier for the inner shared state.
331+
///
332+
/// Useful for checking if two [Arc<PhysicalExpr>] with the same
333+
/// underlying [DynamicFilterPhysicalExpr] are the same.
334+
pub fn inner_id(&self) -> u64 {
335+
Arc::as_ptr(&self.inner) as *const () as u64
336+
}
337+
330338
fn render(
331339
&self,
332340
f: &mut std::fmt::Formatter<'_>,

datafusion/proto/proto/datafusion.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,12 @@ message PhysicalExprNode {
860860
// across serde roundtrips.
861861
optional uint64 expr_id = 30;
862862

863+
// For DynamicFilterPhysicalExpr, this identifies the shared inner state.
864+
// Multiple expressions may have different expr_id values (different outer Arc wrappers)
865+
// but the same dynamic_filter_expr_id (shared inner state).
866+
// Used to reconstruct shared inner state during deserialization.
867+
optional uint64 dynamic_filter_expr_id = 31;
868+
863869
oneof ExprType {
864870
// column references
865871
PhysicalColumn column = 1;

datafusion/proto/src/generated/pbjson.rs

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
8484
use datafusion_physical_plan::unnest::{ListUnnest, UnnestExec};
8585
use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
8686
use datafusion_physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr};
87+
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
8788
use prost::Message;
8889
use prost::bytes::BufMut;
8990

@@ -3064,6 +3065,7 @@ impl protobuf::PhysicalPlanNode {
30643065
});
30653066
Ok(protobuf::PhysicalExprNode {
30663067
expr_id: None,
3068+
dynamic_filter_expr_id: None,
30673069
expr_type: Some(ExprType::Sort(sort_expr)),
30683070
})
30693071
})
@@ -3150,6 +3152,7 @@ impl protobuf::PhysicalPlanNode {
31503152
});
31513153
Ok(protobuf::PhysicalExprNode {
31523154
expr_id: None,
3155+
dynamic_filter_expr_id: None,
31533156
expr_type: Some(ExprType::Sort(sort_expr)),
31543157
})
31553158
})
@@ -3818,6 +3821,18 @@ impl DeduplicatingSerializer {
38183821
session_id: rand::random(),
38193822
}
38203823
}
3824+
3825+
fn hash(&self, ptr: u64) -> u64 {
3826+
// Hash session_id, pointer address, and process ID together to create expr_id.
3827+
// - session_id: random per serializer, prevents collisions when merging serializations
3828+
// - ptr: unique address per Arc within a process
3829+
// - pid: prevents collisions if serializer is shared across processes
3830+
let mut hasher = DefaultHasher::new();
3831+
self.session_id.hash(&mut hasher);
3832+
ptr.hash(&mut hasher);
3833+
std::process::id().hash(&mut hasher);
3834+
hasher.finish()
3835+
}
38213836
}
38223837

38233838
impl PhysicalProtoConverterExtension for DeduplicatingSerializer {
@@ -3864,16 +3879,12 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer {
38643879
codec: &dyn PhysicalExtensionCodec,
38653880
) -> Result<protobuf::PhysicalExprNode> {
38663881
let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?;
3867-
3868-
// Hash session_id, pointer address, and process ID together to create expr_id.
3869-
// - session_id: random per serializer, prevents collisions when merging serializations
3870-
// - ptr: unique address per Arc within a process
3871-
// - pid: prevents collisions if serializer is shared across processes
3872-
let mut hasher = DefaultHasher::new();
3873-
self.session_id.hash(&mut hasher);
3874-
(Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher);
3875-
std::process::id().hash(&mut hasher);
3876-
proto.expr_id = Some(hasher.finish());
3882+
// Special case for dynamic filters. Two expressions may live in separate Arcs but
3883+
// point to the same inner dynamic filter state. This inner state must be deduplicated.
3884+
if let Some(dynamic_filter) = expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {
3885+
proto.dynamic_filter_expr_id = Some(self.hash(dynamic_filter.inner_id()))
3886+
}
3887+
proto.expr_id = Some(self.hash(Arc::as_ptr(expr) as *const () as u64));
38773888

38783889
Ok(proto)
38793890
}

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pub fn serialize_physical_aggr_expr(
7272
codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
7373
Ok(protobuf::PhysicalExprNode {
7474
expr_id: None,
75+
dynamic_filter_expr_id: None,
7576
expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
7677
protobuf::PhysicalAggregateExprNode {
7778
aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
@@ -282,13 +283,15 @@ pub fn serialize_physical_expr_with_converter(
282283
};
283284
return Ok(protobuf::PhysicalExprNode {
284285
expr_id: None,
286+
dynamic_filter_expr_id: None,
285287
expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)),
286288
});
287289
}
288290

289291
if let Some(expr) = expr.downcast_ref::<Column>() {
290292
Ok(protobuf::PhysicalExprNode {
291293
expr_id: None,
294+
dynamic_filter_expr_id: None,
292295
expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
293296
protobuf::PhysicalColumn {
294297
name: expr.name().to_string(),
@@ -299,6 +302,7 @@ pub fn serialize_physical_expr_with_converter(
299302
} else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
300303
Ok(protobuf::PhysicalExprNode {
301304
expr_id: None,
305+
dynamic_filter_expr_id: None,
302306
expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
303307
protobuf::UnknownColumn {
304308
name: expr.name().to_string(),
@@ -318,13 +322,15 @@ pub fn serialize_physical_expr_with_converter(
318322

319323
Ok(protobuf::PhysicalExprNode {
320324
expr_id: None,
325+
dynamic_filter_expr_id: None,
321326
expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
322327
binary_expr,
323328
)),
324329
})
325330
} else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
326331
Ok(protobuf::PhysicalExprNode {
327332
expr_id: None,
333+
dynamic_filter_expr_id: None,
328334
expr_type: Some(
329335
protobuf::physical_expr_node::ExprType::Case(
330336
Box::new(
@@ -368,6 +374,7 @@ pub fn serialize_physical_expr_with_converter(
368374
} else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
369375
Ok(protobuf::PhysicalExprNode {
370376
expr_id: None,
377+
dynamic_filter_expr_id: None,
371378
expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
372379
protobuf::PhysicalNot {
373380
expr: Some(Box::new(
@@ -379,6 +386,7 @@ pub fn serialize_physical_expr_with_converter(
379386
} else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
380387
Ok(protobuf::PhysicalExprNode {
381388
expr_id: None,
389+
dynamic_filter_expr_id: None,
382390
expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
383391
Box::new(protobuf::PhysicalIsNull {
384392
expr: Some(Box::new(
@@ -390,6 +398,7 @@ pub fn serialize_physical_expr_with_converter(
390398
} else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
391399
Ok(protobuf::PhysicalExprNode {
392400
expr_id: None,
401+
dynamic_filter_expr_id: None,
393402
expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
394403
Box::new(protobuf::PhysicalIsNotNull {
395404
expr: Some(Box::new(
@@ -401,6 +410,7 @@ pub fn serialize_physical_expr_with_converter(
401410
} else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
402411
Ok(protobuf::PhysicalExprNode {
403412
expr_id: None,
413+
dynamic_filter_expr_id: None,
404414
expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
405415
protobuf::PhysicalInListNode {
406416
expr: Some(Box::new(
@@ -414,6 +424,7 @@ pub fn serialize_physical_expr_with_converter(
414424
} else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
415425
Ok(protobuf::PhysicalExprNode {
416426
expr_id: None,
427+
dynamic_filter_expr_id: None,
417428
expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
418429
protobuf::PhysicalNegativeNode {
419430
expr: Some(Box::new(
@@ -425,13 +436,15 @@ pub fn serialize_physical_expr_with_converter(
425436
} else if let Some(lit) = expr.downcast_ref::<Literal>() {
426437
Ok(protobuf::PhysicalExprNode {
427438
expr_id: None,
439+
dynamic_filter_expr_id: None,
428440
expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
429441
lit.value().try_into()?,
430442
)),
431443
})
432444
} else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
433445
Ok(protobuf::PhysicalExprNode {
434446
expr_id: None,
447+
dynamic_filter_expr_id: None,
435448
expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
436449
protobuf::PhysicalCastNode {
437450
expr: Some(Box::new(
@@ -444,6 +457,7 @@ pub fn serialize_physical_expr_with_converter(
444457
} else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
445458
Ok(protobuf::PhysicalExprNode {
446459
expr_id: None,
460+
dynamic_filter_expr_id: None,
447461
expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
448462
protobuf::PhysicalTryCastNode {
449463
expr: Some(Box::new(
@@ -458,6 +472,7 @@ pub fn serialize_physical_expr_with_converter(
458472
codec.try_encode_udf(expr.fun(), &mut buf)?;
459473
Ok(protobuf::PhysicalExprNode {
460474
expr_id: None,
475+
dynamic_filter_expr_id: None,
461476
expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
462477
protobuf::PhysicalScalarUdfNode {
463478
name: expr.name().to_string(),
@@ -475,6 +490,7 @@ pub fn serialize_physical_expr_with_converter(
475490
} else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
476491
Ok(protobuf::PhysicalExprNode {
477492
expr_id: None,
493+
dynamic_filter_expr_id: None,
478494
expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
479495
protobuf::PhysicalLikeExprNode {
480496
negated: expr.negated(),
@@ -492,6 +508,7 @@ pub fn serialize_physical_expr_with_converter(
492508
let (s0, s1, s2, s3) = expr.seeds();
493509
Ok(protobuf::PhysicalExprNode {
494510
expr_id: None,
511+
dynamic_filter_expr_id: None,
495512
expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr(
496513
protobuf::PhysicalHashExprNode {
497514
on_columns: serialize_physical_exprs(
@@ -518,6 +535,7 @@ pub fn serialize_physical_expr_with_converter(
518535
.collect::<Result<_>>()?;
519536
Ok(protobuf::PhysicalExprNode {
520537
expr_id: None,
538+
dynamic_filter_expr_id: None,
521539
expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
522540
protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
523541
)),

0 commit comments

Comments
 (0)