Skip to content

Commit c782b9b

Browse files
jayshrivastavagene-bordegaray
authored andcommitted
proto: serialize and dedupe dynamic filters
Informs: datafusion-contrib/datafusion-distributed#180 Closes: apache#20418 Consider this scenario 1. You have a plan with a `HashJoinExec` and `DataSourceExec` 2. You run the physical optimizer and the `DataSourceExec` accepts `DynamicFilterPhysicalExpr` pushdown from the `HashJoinExec` 3. You serialize the plan, deserialize it, and execute it What should happen is that the dynamic filter should "work", meaning 1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr` 2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec` and the `DataSourceExec` should filter out rows This does not happen today for a few reasons, a couple of which this PR aims to address 1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) 2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, during pushdown, it's often the case that the `DynamicFilterPhysicalExpr` is rewritten. In this case, you have two `DynamicFilterPhysicalExpr` which are different `Arc`s but share the same `Inner` dynamic filter state. The current `DeduplicatingProtoConverter` does not handle this specific form of deduping. This PR aims to fix those problems by adding serde for `DynamicFilterPhysicalExpr` and deduping logic for the inner state of dynamic filters. It does not yet add a test for the `HashJoinExec` and `DataSourceExec` filter pushdown case, but this is relevant follow up work. I tried to keep the PR small for reviewers. Yes, via unit tests. `DynamicFilterPhysicalExpr` are now serialized by the default codec
1 parent a0c10b0 commit c782b9b

9 files changed

Lines changed: 918 additions & 36 deletions

File tree

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 214 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::PhysicalExpr;
2323
use crate::expressions::lit;
2424
use arrow::datatypes::{DataType, Schema};
2525
use datafusion_common::{
26-
Result,
26+
Result, internal_err,
2727
tree_node::{Transformed, TransformedResult, TreeNode},
2828
};
2929
use datafusion_expr::ColumnarValue;
@@ -136,6 +136,117 @@ struct Inner {
136136
partitioned_exprs: PartitionedFilters,
137137
}
138138

139+
/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during
140+
/// serialization / deserialization.
141+
pub struct DynamicFilterSnapshot {
142+
children: Vec<Arc<dyn PhysicalExpr>>,
143+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
144+
// Inner state.
145+
generation: u64,
146+
inner_expr: Arc<dyn PhysicalExpr>,
147+
is_complete: bool,
148+
}
149+
150+
impl DynamicFilterSnapshot {
151+
pub fn new(
152+
children: Vec<Arc<dyn PhysicalExpr>>,
153+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
154+
generation: u64,
155+
inner_expr: Arc<dyn PhysicalExpr>,
156+
is_complete: bool,
157+
) -> Self {
158+
Self {
159+
children,
160+
remapped_children,
161+
generation,
162+
inner_expr,
163+
is_complete,
164+
}
165+
}
166+
167+
pub fn children(&self) -> &[Arc<dyn PhysicalExpr>] {
168+
&self.children
169+
}
170+
171+
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
172+
self.remapped_children.as_deref()
173+
}
174+
175+
pub fn generation(&self) -> u64 {
176+
self.generation
177+
}
178+
179+
pub fn inner_expr(&self) -> &Arc<dyn PhysicalExpr> {
180+
&self.inner_expr
181+
}
182+
183+
pub fn is_complete(&self) -> bool {
184+
self.is_complete
185+
}
186+
}
187+
188+
impl Display for DynamicFilterSnapshot {
189+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190+
write!(
191+
f,
192+
"DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
193+
self.children,
194+
self.remapped_children,
195+
self.generation,
196+
self.inner_expr,
197+
self.is_complete
198+
)
199+
}
200+
}
201+
202+
impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
203+
fn from(snapshot: DynamicFilterSnapshot) -> Self {
204+
let DynamicFilterSnapshot {
205+
children,
206+
remapped_children,
207+
generation,
208+
inner_expr,
209+
is_complete,
210+
} = snapshot;
211+
212+
let state = if is_complete {
213+
FilterState::Complete { generation }
214+
} else {
215+
FilterState::InProgress { generation }
216+
};
217+
let (state_watch, _) = watch::channel(state);
218+
219+
Self {
220+
children,
221+
remapped_children,
222+
inner: Arc::new(RwLock::new(Inner {
223+
generation,
224+
expr: inner_expr,
225+
is_complete,
226+
})),
227+
state_watch,
228+
data_type: Arc::new(RwLock::new(None)),
229+
nullable: Arc::new(RwLock::new(None)),
230+
}
231+
}
232+
}
233+
234+
impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot {
235+
fn from(expr: &DynamicFilterPhysicalExpr) -> Self {
236+
let (generation, inner_expr, is_complete) = {
237+
let inner = expr.inner.read();
238+
(inner.generation, Arc::clone(&inner.expr), inner.is_complete)
239+
};
240+
DynamicFilterSnapshot {
241+
children: expr.children.clone(),
242+
remapped_children: expr.remapped_children.clone(),
243+
generation,
244+
inner_expr,
245+
is_complete,
246+
}
247+
}
248+
}
249+
139250
impl Inner {
140251
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
141252
Self {
@@ -232,18 +343,52 @@ impl DynamicFilterPhysicalExpr {
232343
}
233344
}
234345

346+
/// Create a new [`DynamicFilterPhysicalExpr`] from `self`, except it overwrites the
347+
/// internal state with the source filter's state.
348+
///
349+
/// This is a low-level API intended for use by the proto deserialization layer.
350+
///
351+
/// # Safety
352+
///
353+
/// The dynamic filter should not be in use when calling this method, otherwise there
354+
/// may be undefined behavior. Changing the inner state of a filter may do the following:
355+
/// - transition the state to complete without notifying the watch
356+
/// - cause a generation number to be emitted which is out of order
357+
pub fn new_from_source(
358+
self: &Arc<Self>,
359+
source: &DynamicFilterPhysicalExpr,
360+
) -> Result<Self> {
361+
// If there's any references to this filter or any watchers, we should not replace the
362+
// inner state.
363+
if self.is_used() {
364+
return internal_err!(
365+
"Cannot replace the inner state of a DynamicFilterPhysicalExpr that is in use"
366+
);
367+
};
368+
369+
Ok(Self {
370+
children: self.children.clone(),
371+
remapped_children: self.remapped_children.clone(),
372+
inner: Arc::clone(&source.inner),
373+
state_watch: self.state_watch.clone(),
374+
data_type: Arc::clone(&self.data_type),
375+
nullable: Arc::clone(&self.nullable),
376+
})
377+
}
378+
235379
fn remap_children(
236-
children: &[Arc<dyn PhysicalExpr>],
237-
remapped_children: Option<&Vec<Arc<dyn PhysicalExpr>>>,
380+
&self,
238381
expr: Arc<dyn PhysicalExpr>,
239382
) -> Result<Arc<dyn PhysicalExpr>> {
240-
if let Some(remapped_children) = remapped_children {
383+
if let Some(remapped_children) = &self.remapped_children {
241384
// Remap the children to the new children
242385
// of the expression.
243386
expr.transform_up(|child| {
244387
// Check if this is any of our original children
245-
if let Some(pos) =
246-
children.iter().position(|c| c.as_ref() == child.as_ref())
388+
if let Some(pos) = self
389+
.children
390+
.iter()
391+
.position(|c| c.as_ref() == child.as_ref())
247392
{
248393
// If so, remap it to the current children
249394
// of the expression.
@@ -433,6 +578,14 @@ impl DynamicFilterPhysicalExpr {
433578
Arc::strong_count(self) > 1 || Arc::strong_count(&self.inner) > 1
434579
}
435580

581+
/// Returns a unique identifier for the inner shared state.
582+
///
583+
/// Useful for checking if two [`Arc<PhysicalExpr>`] with the same
584+
/// underlying [`DynamicFilterPhysicalExpr`] are the same.
585+
pub fn inner_id(&self) -> u64 {
586+
Arc::as_ptr(&self.inner) as *const () as u64
587+
}
588+
436589
fn render(
437590
&self,
438591
f: &mut std::fmt::Formatter<'_>,
@@ -1154,7 +1307,6 @@ mod test {
11541307
);
11551308
}
11561309

1157-
#[test]
11581310
fn test_bind_dynamic_filters_for_partition_without_partitioned_data() {
11591311
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
11601312
vec![],
@@ -1283,4 +1435,59 @@ mod test {
12831435
"Bound dynamic filter should route to partition-local payload after update"
12841436
);
12851437
}
1438+
1439+
#[test]
1440+
fn test_new_from_source() {
1441+
// Create a source filter
1442+
let source = Arc::new(DynamicFilterPhysicalExpr::new(
1443+
vec![],
1444+
lit(42) as Arc<dyn PhysicalExpr>,
1445+
));
1446+
1447+
// Update and mark complete
1448+
source
1449+
.update(DynamicFilterUpdate::Global(
1450+
lit(100) as Arc<dyn PhysicalExpr>,
1451+
))
1452+
.unwrap();
1453+
source.mark_complete();
1454+
1455+
// Create a target filter with different children
1456+
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
1457+
let col_x = col("x", &schema).unwrap();
1458+
let target = Arc::new(DynamicFilterPhysicalExpr::new(
1459+
vec![Arc::clone(&col_x)],
1460+
lit(0) as Arc<dyn PhysicalExpr>,
1461+
));
1462+
1463+
// Create new filter from source's inner state
1464+
let combined = target.new_from_source(&source).unwrap();
1465+
1466+
// Verify inner state is shared (same inner_id)
1467+
assert_eq!(
1468+
combined.inner_id(),
1469+
source.inner_id(),
1470+
"new_from_source should share inner state with source"
1471+
);
1472+
1473+
// Verify children are from target, not source
1474+
let combined_snapshot = DynamicFilterSnapshot::from(&combined);
1475+
assert_eq!(
1476+
combined_snapshot.children().len(),
1477+
1,
1478+
"Combined filter should have target's children"
1479+
);
1480+
assert_eq!(
1481+
format!("{:?}", combined_snapshot.children()[0]),
1482+
format!("{:?}", col_x),
1483+
"Combined filter should have target's children"
1484+
);
1485+
1486+
// Verify inner expression comes from source
1487+
assert_eq!(
1488+
format!("{:?}", combined_snapshot.inner_expr()),
1489+
format!("{:?}", lit(100)),
1490+
"Combined filter should have source's inner expression"
1491+
);
1492+
}
12861493
}

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub use column::{Column, col, with_new_schema};
4747
pub use datafusion_expr::utils::format_state_name;
4848
pub use dynamic_filters::DynamicFilterPhysicalExpr;
4949
pub use dynamic_filters::DynamicFilterRuntimeContext;
50+
pub use dynamic_filters::DynamicFilterSnapshot;
5051
pub use dynamic_filters::DynamicFilterUpdate;
5152
pub use in_list::{InListExpr, in_list};
5253
pub use is_not_null::{IsNotNullExpr, is_not_null};

datafusion/proto/proto/datafusion.proto

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,12 @@ message PhysicalExprNode {
845845
// across serde roundtrips.
846846
optional uint64 expr_id = 30;
847847

848+
// For DynamicFilterPhysicalExpr, this identifies the shared inner state.
849+
// Multiple expressions may have different expr_id values (different outer Arc wrappers)
850+
// but the same dynamic_filter_inner_id (shared inner state).
851+
// Used to reconstruct shared inner state during deserialization.
852+
optional uint64 dynamic_filter_inner_id = 31;
853+
848854
oneof ExprType {
849855
// column references
850856
PhysicalColumn column = 1;
@@ -882,9 +888,19 @@ message PhysicalExprNode {
882888
UnknownColumn unknown_column = 20;
883889

884890
PhysicalHashExprNode hash_expr = 21;
891+
892+
PhysicalDynamicFilterNode dynamic_filter = 22;
885893
}
886894
}
887895

896+
message PhysicalDynamicFilterNode {
897+
repeated PhysicalExprNode children = 1;
898+
repeated PhysicalExprNode remapped_children = 2;
899+
uint64 generation = 3;
900+
PhysicalExprNode inner_expr = 4;
901+
bool is_complete = 5;
902+
}
903+
888904
message PhysicalScalarUdfNode {
889905
string name = 1;
890906
repeated PhysicalExprNode args = 2;

0 commit comments

Comments
 (0)