@@ -372,7 +372,9 @@ impl DynamicFilterPhysicalExpr {
372372 children : self . children . clone ( ) ,
373373 remapped_children : self . remapped_children . clone ( ) ,
374374 inner : Arc :: clone ( & source. inner ) ,
375- state_watch : self . state_watch . clone ( ) ,
375+ // Reuse source watch channel so waiters on any relinked clone
376+ // observe update/complete notifications from the producer.
377+ state_watch : source. state_watch . clone ( ) ,
376378 data_type : Arc :: clone ( & self . data_type ) ,
377379 nullable : Arc :: clone ( & self . nullable ) ,
378380 runtime_partition : None ,
@@ -1482,4 +1484,34 @@ mod test {
14821484 "Combined filter should have source's inner expression"
14831485 ) ;
14841486 }
1487+
1488+ #[ tokio:: test]
1489+ async fn test_new_from_source_wait_complete_notifications ( ) {
1490+ // Create an incomplete source and relink a second filter to it.
1491+ let source = Arc :: new ( DynamicFilterPhysicalExpr :: new (
1492+ vec ! [ ] ,
1493+ lit ( 42 ) as Arc < dyn PhysicalExpr > ,
1494+ ) ) ;
1495+ let target = Arc :: new ( DynamicFilterPhysicalExpr :: new (
1496+ vec ! [ ] ,
1497+ lit ( 0 ) as Arc < dyn PhysicalExpr > ,
1498+ ) ) ;
1499+ let combined = Arc :: new ( target. new_from_source ( & source) . unwrap ( ) ) ;
1500+
1501+ let waiter = tokio:: spawn ( {
1502+ let combined = Arc :: clone ( & combined) ;
1503+ async move {
1504+ combined. wait_complete ( ) . await ;
1505+ }
1506+ } ) ;
1507+
1508+ // Ensure waiter has subscribed before completion is signalled.
1509+ tokio:: task:: yield_now ( ) . await ;
1510+ source. mark_complete ( ) ;
1511+
1512+ tokio:: time:: timeout ( std:: time:: Duration :: from_secs ( 1 ) , waiter)
1513+ . await
1514+ . expect ( "wait_complete should be notified by source mark_complete" )
1515+ . expect ( "wait_complete task should not panic" ) ;
1516+ }
14851517}
0 commit comments