Skip to content

Commit 459a4e7

Browse files
wip
1 parent 7709b0d commit 459a4e7

3 files changed

Lines changed: 175 additions & 290 deletions

File tree

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 22 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,85 +1021,53 @@ mod test {
10211021
}
10221022

10231023
#[tokio::test]
1024-
async fn test_with_new_children_preserves_shared_inner_state() {
1024+
async fn test_expr_id() {
10251025
// Create a source filter
10261026
let source = Arc::new(DynamicFilterPhysicalExpr::new(
10271027
vec![],
10281028
lit(42) as Arc<dyn PhysicalExpr>,
10291029
));
1030+
let source_clone = Arc::clone(&source);
10301031

1031-
// Create a target filter with different children
1032+
// Create a derived filter with different children
10321033
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
10331034
let col_x = col("x", &schema).unwrap();
1034-
// Rebuild a wrapper from the source filter using the target's children.
1035-
let combined = Arc::clone(&source)
1035+
let derived = Arc::clone(&source)
10361036
.with_new_children(vec![Arc::clone(&col_x)])
10371037
.unwrap();
10381038

1039-
let combined_expr_id = Arc::clone(&combined)
1039+
let derived_expr_id = Arc::clone(&derived)
10401040
.expr_id(&[])
10411041
.expect("combined filter should have an expr_id");
10421042
let source_expr_id = Arc::clone(&source)
10431043
.expr_id(&[])
10441044
.expect("source filter should have an expr_id");
1045-
assert_ne!(
1046-
combined_expr_id.exact(),
1047-
source_expr_id.exact(),
1048-
"with_new_children should produce a distinct outer Arc identity",
1049-
);
1050-
assert_eq!(
1051-
combined_expr_id.shallow(),
1052-
source_expr_id.shallow(),
1053-
"dynamic filters with shared inner state should have the same shallow identity",
1054-
);
1055-
1056-
let combined_dyn_filter = combined
1057-
.as_any()
1058-
.downcast_ref::<DynamicFilterPhysicalExpr>()
1059-
.unwrap();
1045+
let source_clone_expr_id = Arc::clone(&source_clone)
1046+
.expr_id(&[])
1047+
.expect("source clone should have an expr_id");
10601048

1061-
// Verify the children are unchanged.
10621049
assert_eq!(
1063-
format!("{:?}", combined.children()),
1064-
format!("{:?}", vec![col_x]),
1065-
"Combined filter's children should be unchanged"
1050+
source_clone_expr_id.exact(),
1051+
source_expr_id.exact(),
1052+
"cloned filter should have the same exact id because the outer Arc is the same",
10661053
);
10671054

1068-
// Verify inner expression changed to the one from source.
10691055
assert_eq!(
1070-
format!("{:?}", combined_dyn_filter.current().unwrap()),
1071-
format!("{:?}", lit(42)),
1072-
"Combined filter should have inner expression from source filter"
1056+
source_clone_expr_id.shallow(),
1057+
source_expr_id.shallow(),
1058+
"cloned filter should have the same shallow id because the inner state is the same",
10731059
);
10741060

1075-
// Verify that completing one filter also completes the other.
1076-
let combined_binding = Arc::clone(&combined) as Arc<dyn PhysicalExpr>;
1077-
#[expect(clippy::disallowed_methods)]
1078-
let wait_handle = tokio::task::spawn({
1079-
async move {
1080-
let df = combined_binding
1081-
.as_any()
1082-
.downcast_ref::<DynamicFilterPhysicalExpr>()
1083-
.unwrap();
1084-
df.wait_complete().await;
1085-
format!("{:?}", df.current().unwrap())
1086-
}
1087-
});
1088-
source.update(lit(999) as Arc<dyn PhysicalExpr>).unwrap();
1089-
source.mark_complete();
1090-
1091-
// The linked filter should be notified via the shared watch
1092-
let result = tokio::time::timeout(std::time::Duration::from_secs(1), wait_handle)
1093-
.await
1094-
.expect(
1095-
"linked filter should have been notified of completion within timeout",
1096-
)
1097-
.expect("task should not panic");
1061+
assert_ne!(
1062+
derived_expr_id.exact(),
1063+
source_expr_id.exact(),
1064+
"filters should have different exact ids because the children are different",
1065+
);
10981066

10991067
assert_eq!(
1100-
result,
1101-
format!("{:?}", lit(999)),
1102-
"linked filter should see source's updated inner expression"
1068+
derived_expr_id.shallow(),
1069+
source_expr_id.shallow(),
1070+
"filters should have the same shallow id because they are the same expression",
11031071
);
11041072
}
11051073
}

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use datafusion_datasource_parquet::file_format::ParquetSink;
3333
use datafusion_expr::WindowFrame;
3434
use datafusion_physical_expr::ScalarFunctionExpr;
3535
use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
36-
use datafusion_physical_expr_common::physical_expr::PhysicalExprId;
3736
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3837
use datafusion_physical_plan::expressions::{
3938
BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr,
@@ -54,13 +53,6 @@ use crate::protobuf::{
5453
physical_aggregate_expr_node, physical_window_expr_node,
5554
};
5655

57-
fn to_proto_expr_id(expr_id: PhysicalExprId) -> protobuf::PhysicalExprId {
58-
protobuf::PhysicalExprId {
59-
exact: expr_id.exact(),
60-
shallow: expr_id.shallow(),
61-
}
62-
}
63-
6456
#[expect(clippy::needless_pass_by_value)]
6557
pub fn serialize_physical_aggr_expr(
6658
aggr_expr: Arc<AggregateFunctionExpr>,

0 commit comments

Comments
 (0)