Skip to content

Commit 7709b0d

Browse files
proto: serialize and dedupe dynamic filters
Informs: datafusion-contrib/datafusion-distributed#180 Closes: #20418 Consider this scenario 1. You have a plan with a `HashJoinExec` and `DataSourceExec` 2. You run the physical optimizer and the `DataSourceExec` accepts `DynamicFilterPhysicalExpr` pushdown from the `HashJoinExec` 3. You serialize the plan, deserialize it, and execute it What should happen is that the dynamic filter should "work", meaning 1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr` 2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec` and the `DataSourceExec` should filter out rows This does not happen today for a few reasons, a couple of which this PR aims to address 1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) 2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, during pushdown, it's often the case that the `DynamicFilterPhysicalExpr` is rewritten. In this case, you have two `DynamicFilterPhysicalExpr` which are different `Arc`s but share the same `Inner` dynamic filter state. The current `DeduplicatingProtoConverter` does not handle this specific form of deduping. This PR aims to fix those problems by adding serde for `DynamicFilterPhysicalExpr` and deduping logic for the inner state of dynamic filters. It does not yet add a test for the `HashJoinExec` and `DataSourceExec` filter pushdown case, but this is relevant follow up work. I tried to keep the PR small for reviewers. Yes, via unit tests. `DynamicFilterPhysicalExpr` are now serialized by the default codec
1 parent c792700 commit 7709b0d

10 files changed

Lines changed: 1165 additions & 91 deletions

File tree

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::any::Any;
1919
use std::fmt;
2020
use std::fmt::{Debug, Display, Formatter};
21-
use std::hash::{Hash, Hasher};
21+
use std::hash::{DefaultHasher, Hash, Hasher};
2222
use std::sync::Arc;
2323

2424
use crate::utils::scatter;
@@ -441,6 +441,50 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
441441
fn placement(&self) -> ExpressionPlacement {
442442
ExpressionPlacement::KeepInPlace
443443
}
444+
445+
/// Returns a composite identifier for a [`PhysicalExpr`]. Note that if the expression
446+
/// is dropped, then the returned id is no longer valid.
447+
fn expr_id(self: Arc<Self>, salt: &[u64]) -> Option<PhysicalExprId> {
448+
Some(PhysicalExprId::new(expr_id_from_arc(&self, salt), None))
449+
}
450+
}
451+
452+
/// A composite identifier for [`PhysicalExpr`].
453+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
454+
pub struct PhysicalExprId {
455+
exact: u64,
456+
shallow: Option<u64>,
457+
}
458+
459+
impl PhysicalExprId {
460+
/// Create a new [`PhysicalExprId`]. Both ids must be globally unique within
461+
/// a process.
462+
pub fn new(exact: u64, shallow: Option<u64>) -> Self {
463+
Self { exact, shallow }
464+
}
465+
466+
/// Returns the identifier for the full expression tree, including children.
467+
pub fn exact(&self) -> u64 {
468+
self.exact
469+
}
470+
471+
/// Returns the identifier for just the expression root, ignoring children.
472+
pub fn shallow(&self) -> Option<u64> {
473+
self.shallow
474+
}
475+
}
476+
477+
/// Computes a unique identifier for a type contained within an [`Arc`]. It hashes
478+
/// the [`Arc`] pointer to create a process-local identifier that remains valid
479+
/// only while that allocation is still alive.
480+
pub fn expr_id_from_arc<T: ?Sized>(expr: &Arc<T>, salt: &[u64]) -> u64 {
481+
let mut hasher = DefaultHasher::new();
482+
let ptr = Arc::as_ptr(expr) as *const () as u64;
483+
ptr.hash(&mut hasher);
484+
for &salt in salt {
485+
salt.hash(&mut hasher);
486+
}
487+
hasher.finish()
444488
}
445489

446490
#[deprecated(

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 236 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ use datafusion_common::{
2626
tree_node::{Transformed, TransformedResult, TreeNode},
2727
};
2828
use datafusion_expr::ColumnarValue;
29-
use datafusion_physical_expr_common::physical_expr::DynHash;
29+
use datafusion_physical_expr_common::physical_expr::{
30+
DynHash, PhysicalExprId, expr_id_from_arc,
31+
};
3032

3133
/// State of a dynamic filter, tracking both updates and completion.
3234
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -88,6 +90,116 @@ struct Inner {
8890
is_complete: bool,
8991
}
9092

93+
/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during
94+
/// serialization / deserialization.
95+
pub struct DynamicFilterSnapshot {
96+
children: Vec<Arc<dyn PhysicalExpr>>,
97+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
98+
// Inner state.
99+
generation: u64,
100+
inner_expr: Arc<dyn PhysicalExpr>,
101+
is_complete: bool,
102+
}
103+
104+
impl DynamicFilterSnapshot {
105+
pub fn new(
106+
children: Vec<Arc<dyn PhysicalExpr>>,
107+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
108+
generation: u64,
109+
inner_expr: Arc<dyn PhysicalExpr>,
110+
is_complete: bool,
111+
) -> Self {
112+
Self {
113+
children,
114+
remapped_children,
115+
generation,
116+
inner_expr,
117+
is_complete,
118+
}
119+
}
120+
121+
pub fn children(&self) -> &[Arc<dyn PhysicalExpr>] {
122+
&self.children
123+
}
124+
125+
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
126+
self.remapped_children.as_deref()
127+
}
128+
129+
pub fn generation(&self) -> u64 {
130+
self.generation
131+
}
132+
133+
pub fn inner_expr(&self) -> &Arc<dyn PhysicalExpr> {
134+
&self.inner_expr
135+
}
136+
137+
pub fn is_complete(&self) -> bool {
138+
self.is_complete
139+
}
140+
}
141+
impl Display for DynamicFilterSnapshot {
142+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143+
write!(
144+
f,
145+
"DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
146+
self.children,
147+
self.remapped_children,
148+
self.generation,
149+
self.inner_expr,
150+
self.is_complete
151+
)
152+
}
153+
}
154+
155+
impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
156+
fn from(snapshot: DynamicFilterSnapshot) -> Self {
157+
let DynamicFilterSnapshot {
158+
children,
159+
remapped_children,
160+
generation,
161+
inner_expr,
162+
is_complete,
163+
} = snapshot;
164+
165+
let state = if is_complete {
166+
FilterState::Complete { generation }
167+
} else {
168+
FilterState::InProgress { generation }
169+
};
170+
let (state_watch, _) = watch::channel(state);
171+
172+
Self {
173+
children,
174+
remapped_children,
175+
inner: Arc::new(RwLock::new(Inner {
176+
generation,
177+
expr: inner_expr,
178+
is_complete,
179+
})),
180+
state_watch,
181+
data_type: Arc::new(RwLock::new(None)),
182+
nullable: Arc::new(RwLock::new(None)),
183+
}
184+
}
185+
}
186+
187+
impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot {
188+
fn from(expr: &DynamicFilterPhysicalExpr) -> Self {
189+
let (generation, inner_expr, is_complete) = {
190+
let inner = expr.inner.read();
191+
(inner.generation, Arc::clone(&inner.expr), inner.is_complete)
192+
};
193+
DynamicFilterSnapshot {
194+
children: expr.children.clone(),
195+
remapped_children: expr.remapped_children.clone(),
196+
generation,
197+
inner_expr,
198+
is_complete,
199+
}
200+
}
201+
}
202+
91203
impl Inner {
92204
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
93205
Self {
@@ -448,6 +560,13 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
448560
// Return the current generation of the expression.
449561
self.inner.read().generation
450562
}
563+
564+
fn expr_id(self: Arc<Self>, salt: &[u64]) -> Option<PhysicalExprId> {
565+
Some(PhysicalExprId::new(
566+
expr_id_from_arc(&self, salt),
567+
Some(expr_id_from_arc(&self.inner, salt)),
568+
))
569+
}
451570
}
452571

453572
#[cfg(test)]
@@ -867,4 +986,120 @@ mod test {
867986
"Hash should be stable after update (identity-based)"
868987
);
869988
}
989+
990+
#[test]
991+
fn test_current_snapshot_roundtrip() {
992+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
993+
let col_a = col("a", &schema).unwrap();
994+
995+
// Create a dynamic filter with children
996+
let expr = Arc::new(BinaryExpr::new(
997+
Arc::clone(&col_a),
998+
datafusion_expr::Operator::Gt,
999+
lit(10) as Arc<dyn PhysicalExpr>,
1000+
));
1001+
let filter = DynamicFilterPhysicalExpr::new(
1002+
vec![Arc::clone(&col_a)],
1003+
expr as Arc<dyn PhysicalExpr>,
1004+
);
1005+
1006+
// Update expression and mark complete
1007+
filter
1008+
.update(lit(42) as Arc<dyn PhysicalExpr>)
1009+
.expect("Update should succeed");
1010+
filter.mark_complete();
1011+
1012+
// Take a snapshot and reconstruct
1013+
let snapshot = DynamicFilterSnapshot::from(&filter);
1014+
let reconstructed = DynamicFilterPhysicalExpr::from(snapshot);
1015+
1016+
// String representations should be equal
1017+
assert_eq!(
1018+
DynamicFilterSnapshot::from(&filter).to_string(),
1019+
DynamicFilterSnapshot::from(&reconstructed).to_string(),
1020+
);
1021+
}
1022+
1023+
#[tokio::test]
1024+
async fn test_with_new_children_preserves_shared_inner_state() {
1025+
// Create a source filter
1026+
let source = Arc::new(DynamicFilterPhysicalExpr::new(
1027+
vec![],
1028+
lit(42) as Arc<dyn PhysicalExpr>,
1029+
));
1030+
1031+
// Create a target filter with different children
1032+
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
1033+
let col_x = col("x", &schema).unwrap();
1034+
// Rebuild a wrapper from the source filter using the target's children.
1035+
let combined = Arc::clone(&source)
1036+
.with_new_children(vec![Arc::clone(&col_x)])
1037+
.unwrap();
1038+
1039+
let combined_expr_id = Arc::clone(&combined)
1040+
.expr_id(&[])
1041+
.expect("combined filter should have an expr_id");
1042+
let source_expr_id = Arc::clone(&source)
1043+
.expr_id(&[])
1044+
.expect("source filter should have an expr_id");
1045+
assert_ne!(
1046+
combined_expr_id.exact(),
1047+
source_expr_id.exact(),
1048+
"with_new_children should produce a distinct outer Arc identity",
1049+
);
1050+
assert_eq!(
1051+
combined_expr_id.shallow(),
1052+
source_expr_id.shallow(),
1053+
"dynamic filters with shared inner state should have the same shallow identity",
1054+
);
1055+
1056+
let combined_dyn_filter = combined
1057+
.as_any()
1058+
.downcast_ref::<DynamicFilterPhysicalExpr>()
1059+
.unwrap();
1060+
1061+
// Verify the children are unchanged.
1062+
assert_eq!(
1063+
format!("{:?}", combined.children()),
1064+
format!("{:?}", vec![col_x]),
1065+
"Combined filter's children should be unchanged"
1066+
);
1067+
1068+
// Verify inner expression changed to the one from source.
1069+
assert_eq!(
1070+
format!("{:?}", combined_dyn_filter.current().unwrap()),
1071+
format!("{:?}", lit(42)),
1072+
"Combined filter should have inner expression from source filter"
1073+
);
1074+
1075+
// Verify that completing one filter also completes the other.
1076+
let combined_binding = Arc::clone(&combined) as Arc<dyn PhysicalExpr>;
1077+
#[expect(clippy::disallowed_methods)]
1078+
let wait_handle = tokio::task::spawn({
1079+
async move {
1080+
let df = combined_binding
1081+
.as_any()
1082+
.downcast_ref::<DynamicFilterPhysicalExpr>()
1083+
.unwrap();
1084+
df.wait_complete().await;
1085+
format!("{:?}", df.current().unwrap())
1086+
}
1087+
});
1088+
source.update(lit(999) as Arc<dyn PhysicalExpr>).unwrap();
1089+
source.mark_complete();
1090+
1091+
// The linked filter should be notified via the shared watch
1092+
let result = tokio::time::timeout(std::time::Duration::from_secs(1), wait_handle)
1093+
.await
1094+
.expect(
1095+
"linked filter should have been notified of completion within timeout",
1096+
)
1097+
.expect("task should not panic");
1098+
1099+
assert_eq!(
1100+
result,
1101+
format!("{:?}", lit(999)),
1102+
"linked filter should see source's updated inner expression"
1103+
);
1104+
}
8701105
}

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub use cast::{CastExpr, cast};
4545
pub use cast_column::CastColumnExpr;
4646
pub use column::{Column, col, with_new_schema};
4747
pub use datafusion_expr::utils::format_state_name;
48-
pub use dynamic_filters::DynamicFilterPhysicalExpr;
48+
pub use dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSnapshot};
4949
pub use in_list::{InListExpr, in_list};
5050
pub use is_not_null::{IsNotNullExpr, is_not_null};
5151
pub use is_null::{IsNullExpr, is_null};

datafusion/proto/proto/datafusion.proto

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -871,17 +871,18 @@ message PhysicalExtensionNode {
871871
}
872872

873873
// physical expressions
874+
message PhysicalExprId {
875+
uint64 exact = 1;
876+
optional uint64 shallow = 2;
877+
}
878+
874879
message PhysicalExprNode {
875880
// Was date_time_interval_expr
876881
reserved 17;
877882

878-
// Unique identifier for this expression to do deduplication during deserialization.
879-
// When serializing, this is set to a unique identifier for each combination of
880-
// expression, process and serialization run.
881-
// When deserializing, if this ID has been seen before, the cached Arc is returned
882-
// instead of creating a new one, enabling reconstruction of referential integrity
883-
// across serde roundtrips.
884-
optional uint64 expr_id = 30;
883+
// Unique identifier for this expression used during deserialization to restore
884+
// referential integrity across serde roundtrips.
885+
PhysicalExprId expr_id = 31;
885886

886887
oneof ExprType {
887888
// column references
@@ -920,9 +921,19 @@ message PhysicalExprNode {
920921
UnknownColumn unknown_column = 20;
921922

922923
PhysicalHashExprNode hash_expr = 21;
924+
925+
PhysicalDynamicFilterNode dynamic_filter = 22;
923926
}
924927
}
925928

929+
message PhysicalDynamicFilterNode {
930+
repeated PhysicalExprNode children = 1;
931+
repeated PhysicalExprNode remapped_children = 2;
932+
uint64 generation = 3;
933+
PhysicalExprNode inner_expr = 4;
934+
bool is_complete = 5;
935+
}
936+
926937
message PhysicalScalarUdfNode {
927938
string name = 1;
928939
repeated PhysicalExprNode args = 2;
@@ -1470,4 +1481,4 @@ message AsyncFuncExecNode {
14701481
message BufferExecNode {
14711482
PhysicalPlanNode input = 1;
14721483
uint64 capacity = 2;
1473-
}
1484+
}

0 commit comments

Comments
 (0)