-
Notifications
You must be signed in to change notification settings - Fork 2.1k
proto: serialize and dedupe dynamic filters v2 #21807
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2e7c261
0766648
8c96885
0b75476
64253e5
9c4b437
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would it make sense to remove
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's still used in Parquet pruning. But I agree it never served as many purposes as we thought it would, it's probably best to remove it and add a downcast for
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can do this as a quick follow up. Will post a PR momentarily.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I opened this PR #21975. The |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ | |
| // under the License. | ||
|
|
||
| use parking_lot::RwLock; | ||
| use std::sync::atomic::{AtomicU64, Ordering}; | ||
| use std::{fmt::Display, hash::Hash, sync::Arc}; | ||
| use tokio::sync::watch; | ||
|
|
||
|
|
@@ -76,21 +77,51 @@ pub struct DynamicFilterPhysicalExpr { | |
| nullable: Arc<RwLock<Option<bool>>>, | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| struct Inner { | ||
| /// Atomic internal state of a [`DynamicFilterPhysicalExpr`]. | ||
| /// | ||
| /// `expression_id` lives here because it identifies the actual filter expression `expr`. | ||
| /// Derived `DynamicFilterPhysicalExpr`s (e.g. via [`PhysicalExpr::with_new_children`]) are | ||
| /// the same logical filter and must report the same `expression_id`. | ||
| /// | ||
| /// **Warning:** exposed publicly solely so that proto (de)serialization in | ||
| /// `datafusion-proto` can read and rebuild this state. Do not treat this type | ||
| /// or its layout as a stable API. | ||
| #[derive(Clone)] | ||
| pub struct Inner { | ||
|
Comment on lines
+86
to
+90
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we hide this somehow instead of using a warning comment? Exposing this leaves the door open to callers messing with the inner dynamic filter's
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| /// A unique identifier for the expression. | ||
| pub expression_id: u64, | ||
| /// A counter that gets incremented every time the expression is updated so that we can track changes cheaply. | ||
| /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes. | ||
| generation: u64, | ||
| expr: Arc<dyn PhysicalExpr>, | ||
| pub generation: u64, | ||
| pub expr: Arc<dyn PhysicalExpr>, | ||
| /// Flag for quick synchronous check if filter is complete. | ||
| /// This is redundant with the watch channel state, but allows us to return immediately | ||
| /// from `wait_complete()` without subscribing if already complete. | ||
| is_complete: bool, | ||
| pub is_complete: bool, | ||
| } | ||
|
|
||
| // TODO: Include expression_id in Debug output. | ||
| // | ||
| // See https://github.com/apache/datafusion/issues/20418. Currently, plan nodes | ||
| // like `HashJoinExec`, `AggregateExec`, `SortExec` do not serialize their | ||
| // dynamic filter. They auto-create one on decode with a fresh `expression_id`, | ||
| // so a round-trip Debug comparison would diverge purely on the id even when | ||
| // the rest of the state is preserved. Excluding it from Debug keeps those | ||
| // roundtrip equality assertions meaningful until that work lands. | ||
| impl std::fmt::Debug for Inner { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| f.debug_struct("Inner") | ||
| .field("generation", &self.generation) | ||
| .field("expr", &self.expr) | ||
| .field("is_complete", &self.is_complete) | ||
| .finish() | ||
| } | ||
| } | ||
|
|
||
| impl Inner { | ||
| fn new(expr: Arc<dyn PhysicalExpr>) -> Self { | ||
| Self { | ||
| expression_id: EXPR_ID_SOURCE.next(), | ||
| // Start with generation 1 which gives us a different result for [`PhysicalExpr::generation`] than the default 0. | ||
| // This is not currently used anywhere but it seems useful to have this simple distinction. | ||
| generation: 1, | ||
|
|
@@ -243,6 +274,8 @@ impl DynamicFilterPhysicalExpr { | |
| let mut current = self.inner.write(); | ||
| let new_generation = current.generation + 1; | ||
| *current = Inner { | ||
| // Preserve the expression id across updates. | ||
| expression_id: current.expression_id, | ||
| generation: new_generation, | ||
| expr: new_expr, | ||
| is_complete: current.is_complete, | ||
|
|
@@ -346,6 +379,62 @@ impl DynamicFilterPhysicalExpr { | |
|
|
||
| write!(f, " ]") | ||
| } | ||
|
|
||
| /// Return the filter's original children (before any remapping). | ||
| /// | ||
| /// **Warning:** intended only for `datafusion-proto` (de)serialization. | ||
| /// Not a stable API. | ||
| pub fn original_children(&self) -> &[Arc<dyn PhysicalExpr>] { | ||
| &self.children | ||
| } | ||
|
|
||
| /// Return the filter's remapped children, if any have been set via | ||
| /// [`PhysicalExpr::with_new_children`]. | ||
| /// | ||
| /// **Warning:** intended only for `datafusion-proto` (de)serialization. | ||
| /// Not a stable API. | ||
| pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> { | ||
| self.remapped_children.as_deref() | ||
| } | ||
|
jayshrivastava marked this conversation as resolved.
|
||
|
|
||
| /// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by | ||
| /// proto deserialization. | ||
| /// | ||
| /// **Warning:** intended only for `datafusion-proto` (de)serialization. | ||
| /// Not a stable API. | ||
|
Comment on lines
+400
to
+404
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this be generic via the shared state proposal? Not sure what is up with that. Ditto
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The shared state discussion seems promising! I'm hoping it will be an easy migration. I imagine that we would end up storing all
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep that would make sense to me |
||
| pub fn from_parts( | ||
| children: Vec<Arc<dyn PhysicalExpr>>, | ||
| remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>, | ||
| inner: Inner, | ||
| ) -> Self { | ||
| let state = if inner.is_complete { | ||
| FilterState::Complete { | ||
| generation: inner.generation, | ||
| } | ||
| } else { | ||
| FilterState::InProgress { | ||
| generation: inner.generation, | ||
| } | ||
| }; | ||
| let (state_watch, _) = watch::channel(state); | ||
|
|
||
| Self { | ||
| children, | ||
| remapped_children, | ||
| inner: Arc::new(RwLock::new(inner)), | ||
| state_watch, | ||
| data_type: Arc::new(RwLock::new(None)), | ||
| nullable: Arc::new(RwLock::new(None)), | ||
| } | ||
| } | ||
|
jayshrivastava marked this conversation as resolved.
|
||
|
|
||
| /// Return a clone of the atomically-captured `Inner` state. | ||
| /// | ||
| /// **Warning:** intended only for `datafusion-proto` (de)serialization. | ||
| /// Not a stable API. | ||
| pub fn inner(&self) -> Inner { | ||
| self.inner.read().clone() | ||
| } | ||
| } | ||
|
|
||
| impl PhysicalExpr for DynamicFilterPhysicalExpr { | ||
|
|
@@ -364,6 +453,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { | |
| Ok(Arc::new(Self { | ||
| children: self.children.clone(), | ||
| remapped_children: Some(children), | ||
| // Note: expression_id is preserved | ||
| inner: Arc::clone(&self.inner), | ||
| state_watch: self.state_watch.clone(), | ||
| data_type: Arc::clone(&self.data_type), | ||
|
|
@@ -444,8 +534,36 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { | |
| // Return the current generation of the expression. | ||
| self.inner.read().generation | ||
| } | ||
|
|
||
| fn expression_id(&self) -> Option<u64> { | ||
| Some(self.inner.read().expression_id) | ||
| } | ||
| } | ||
|
|
||
| /// An atomic counter used to generate monotonic u64 ids. | ||
| struct ExpressionIdAtomicCounter { | ||
| inner: AtomicU64, | ||
| } | ||
|
|
||
| impl ExpressionIdAtomicCounter { | ||
| const fn new() -> Self { | ||
| Self { | ||
| inner: AtomicU64::new(0), | ||
| } | ||
| } | ||
|
|
||
| /// Returns a fresh `expression_id` by incrementing the internal counter. | ||
| fn next(&self) -> u64 { | ||
| self.inner.fetch_add(1, Ordering::Relaxed) | ||
| } | ||
| } | ||
|
|
||
| /// Process-wide source of deterministic `expression_id`s for [`DynamicFilterPhysicalExpr`]. | ||
| /// | ||
| /// Currently, no other [`PhysicalExpr`]s use this source. If needed, it can be moved out of this | ||
| /// file and be made public for other expressions to use. | ||
| static EXPR_ID_SOURCE: ExpressionIdAtomicCounter = ExpressionIdAtomicCounter::new(); | ||
|
|
||
| #[cfg(test)] | ||
| mod test { | ||
| use crate::{ | ||
|
|
@@ -861,4 +979,130 @@ mod test { | |
| "Hash should be stable after update (identity-based)" | ||
| ); | ||
| } | ||
|
|
||
| /// Verifies that `from_parts` rebuilds a `DynamicFilterPhysicalExpr` | ||
| /// whose observable state (original children, remapped children, | ||
| /// expression id, inner generation/expr/is_complete) matches the source | ||
| /// filter. | ||
| #[test] | ||
| fn test_from_parts_preserves_state() { | ||
| let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); | ||
| let col_a = col("a", &schema).unwrap(); | ||
|
|
||
| // Create a dynamic filter with children | ||
| let expr = Arc::new(BinaryExpr::new( | ||
| Arc::clone(&col_a), | ||
| datafusion_expr::Operator::Gt, | ||
| lit(10) as Arc<dyn PhysicalExpr>, | ||
| )); | ||
| let filter = DynamicFilterPhysicalExpr::new( | ||
| vec![Arc::clone(&col_a)], | ||
| expr as Arc<dyn PhysicalExpr>, | ||
| ); | ||
|
|
||
| // Add remapped children. | ||
| let reassigned_schema = Arc::new(Schema::new(vec![ | ||
| Field::new("b", DataType::Int32, false), | ||
| Field::new("a", DataType::Int32, false), | ||
| ])); | ||
| let reassigned = reassign_expr_columns( | ||
| Arc::new(filter) as Arc<dyn PhysicalExpr>, | ||
| &reassigned_schema, | ||
| ) | ||
| .expect("reassign_expr_columns should succeed"); | ||
| let reassigned = reassigned | ||
| .downcast_ref::<DynamicFilterPhysicalExpr>() | ||
| .expect("Expected dynamic filter after reassignment"); | ||
|
|
||
| reassigned | ||
| .update(lit(42) as Arc<dyn PhysicalExpr>) | ||
| .expect("Update should succeed"); | ||
| reassigned.mark_complete(); | ||
|
|
||
| // Capture the parts and reconstruct. `expression_id` rides in `inner`. | ||
| let reconstructed = DynamicFilterPhysicalExpr::from_parts( | ||
| reassigned.original_children().to_vec(), | ||
| reassigned.remapped_children().map(|r| r.to_vec()), | ||
| reassigned.inner(), | ||
| ); | ||
|
|
||
| assert_eq!( | ||
| reassigned.original_children(), | ||
| reconstructed.original_children(), | ||
| ); | ||
| assert_eq!( | ||
| reassigned.remapped_children(), | ||
| reconstructed.remapped_children(), | ||
| ); | ||
| assert_eq!(reassigned.expression_id(), reconstructed.expression_id()); | ||
| let r = reassigned.inner(); | ||
| let c = reconstructed.inner(); | ||
| assert_eq!(r.generation, c.generation); | ||
| assert_eq!(r.is_complete, c.is_complete); | ||
| assert_eq!(format!("{:?}", r.expr), format!("{:?}", c.expr)); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_expression_id() { | ||
| let source_schema = | ||
| Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); | ||
| let col_a = col("a", &source_schema).unwrap(); | ||
|
|
||
| // Create a source filter | ||
| let source = Arc::new(DynamicFilterPhysicalExpr::new( | ||
| vec![Arc::clone(&col_a)], | ||
| lit(true) as Arc<dyn PhysicalExpr>, | ||
| )); | ||
| let source_clone = Arc::clone(&source); | ||
|
|
||
| // Create a derived filter by reassigning the source filter to a different schema. | ||
| let derived_schema = Arc::new(Schema::new(vec![ | ||
| Field::new("x", DataType::Int32, false), | ||
| Field::new("a", DataType::Int32, false), | ||
| ])); | ||
| let derived = reassign_expr_columns( | ||
| Arc::clone(&source) as Arc<dyn PhysicalExpr>, | ||
| &derived_schema, | ||
| ) | ||
| .expect("reassign_expr_columns should succeed"); | ||
|
|
||
| let derived_expression_id = derived | ||
| .expression_id() | ||
| .expect("derived filter should have an expression id"); | ||
| let source_expression_id = source | ||
| .expression_id() | ||
| .expect("source filter should have an expression id"); | ||
| let source_clone_expression_id = source_clone | ||
| .expression_id() | ||
| .expect("source clone should have an expression id"); | ||
|
|
||
| assert_eq!( | ||
| source_clone_expression_id, source_expression_id, | ||
| "cloned filter should preserve its expression id", | ||
| ); | ||
|
|
||
| assert_eq!( | ||
| derived_expression_id, source_expression_id, | ||
| "derived filters should carry forward the source expression id", | ||
| ); | ||
|
|
||
| // `update()` rewrites the entire `Inner` struct in place; pin down | ||
| // that the rewrite preserves `expression_id`. | ||
| source | ||
| .update(lit(99) as Arc<dyn PhysicalExpr>) | ||
| .expect("update should succeed"); | ||
| assert_eq!( | ||
| source.expression_id().unwrap(), | ||
| source_expression_id, | ||
| "update() must not change expression_id", | ||
| ); | ||
|
|
||
| // `mark_complete()` also touches `Inner`; same invariant. | ||
| source.mark_complete(); | ||
| assert_eq!( | ||
| source.expression_id().unwrap(), | ||
| source_expression_id, | ||
| "mark_complete() must not change expression_id", | ||
| ); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.