Skip to content

Commit 59da102

Browse files
wip
1 parent 459a4e7 commit 59da102

2 files changed

Lines changed: 25 additions & 23 deletions

File tree

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ pub fn serialize_physical_aggr_expr(
7272
codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
7373
Ok(protobuf::PhysicalExprNode {
7474
expr_id: None,
75-
7675
expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
7776
protobuf::PhysicalAggregateExprNode {
7877
aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
@@ -258,6 +257,7 @@ pub fn serialize_physical_expr_with_converter(
258257
codec: &dyn PhysicalExtensionCodec,
259258
proto_converter: &dyn PhysicalProtoConverterExtension,
260259
) -> Result<protobuf::PhysicalExprNode> {
260+
let expr = value.as_any();
261261
// HashTableLookupExpr is used for dynamic filter pushdown in hash joins.
262262
// It contains an Arc<dyn JoinHashMapType> (the build-side hash table) which
263263
// cannot be serialized - the hash table is a runtime structure built during
@@ -271,11 +271,7 @@ pub fn serialize_physical_expr_with_converter(
271271
//
272272
// In distributed execution, the remote worker won't have access to the hash
273273
// table anyway, so the best we can do is skip this optimization.
274-
if value
275-
.as_any()
276-
.downcast_ref::<HashTableLookupExpr>()
277-
.is_some()
278-
{
274+
if expr.downcast_ref::<HashTableLookupExpr>().is_some() {
279275
let value = datafusion_proto_common::ScalarValue {
280276
value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
281277
true,
@@ -287,7 +283,7 @@ pub fn serialize_physical_expr_with_converter(
287283
});
288284
}
289285

290-
if let Some(expr) = value.as_any().downcast_ref::<Column>() {
286+
if let Some(expr) = expr.downcast_ref::<Column>() {
291287
Ok(protobuf::PhysicalExprNode {
292288
expr_id: None,
293289
expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
@@ -297,7 +293,7 @@ pub fn serialize_physical_expr_with_converter(
297293
},
298294
)),
299295
})
300-
} else if let Some(expr) = value.as_any().downcast_ref::<UnKnownColumn>() {
296+
} else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
301297
Ok(protobuf::PhysicalExprNode {
302298
expr_id: None,
303299
expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
@@ -306,7 +302,7 @@ pub fn serialize_physical_expr_with_converter(
306302
},
307303
)),
308304
})
309-
} else if let Some(expr) = value.as_any().downcast_ref::<BinaryExpr>() {
305+
} else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
310306
let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
311307
l: Some(Box::new(
312308
proto_converter.physical_expr_to_proto(expr.left(), codec)?,
@@ -323,7 +319,7 @@ pub fn serialize_physical_expr_with_converter(
323319
binary_expr,
324320
)),
325321
})
326-
} else if let Some(expr) = value.as_any().downcast_ref::<CaseExpr>() {
322+
} else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
327323
Ok(protobuf::PhysicalExprNode {
328324
expr_id: None,
329325
expr_type: Some(
@@ -366,7 +362,7 @@ pub fn serialize_physical_expr_with_converter(
366362
),
367363
),
368364
})
369-
} else if let Some(expr) = value.as_any().downcast_ref::<NotExpr>() {
365+
} else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
370366
Ok(protobuf::PhysicalExprNode {
371367
expr_id: None,
372368
expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
@@ -377,7 +373,7 @@ pub fn serialize_physical_expr_with_converter(
377373
},
378374
))),
379375
})
380-
} else if let Some(expr) = value.as_any().downcast_ref::<IsNullExpr>() {
376+
} else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
381377
Ok(protobuf::PhysicalExprNode {
382378
expr_id: None,
383379
expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
@@ -388,7 +384,7 @@ pub fn serialize_physical_expr_with_converter(
388384
}),
389385
)),
390386
})
391-
} else if let Some(expr) = value.as_any().downcast_ref::<IsNotNullExpr>() {
387+
} else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
392388
Ok(protobuf::PhysicalExprNode {
393389
expr_id: None,
394390
expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
@@ -399,7 +395,7 @@ pub fn serialize_physical_expr_with_converter(
399395
}),
400396
)),
401397
})
402-
} else if let Some(expr) = value.as_any().downcast_ref::<InListExpr>() {
398+
} else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
403399
Ok(protobuf::PhysicalExprNode {
404400
expr_id: None,
405401
expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
@@ -412,7 +408,7 @@ pub fn serialize_physical_expr_with_converter(
412408
},
413409
))),
414410
})
415-
} else if let Some(expr) = value.as_any().downcast_ref::<NegativeExpr>() {
411+
} else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
416412
Ok(protobuf::PhysicalExprNode {
417413
expr_id: None,
418414
expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
@@ -423,14 +419,14 @@ pub fn serialize_physical_expr_with_converter(
423419
},
424420
))),
425421
})
426-
} else if let Some(lit) = value.as_any().downcast_ref::<Literal>() {
422+
} else if let Some(lit) = expr.downcast_ref::<Literal>() {
427423
Ok(protobuf::PhysicalExprNode {
428424
expr_id: None,
429425
expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
430426
lit.value().try_into()?,
431427
)),
432428
})
433-
} else if let Some(cast) = value.as_any().downcast_ref::<CastExpr>() {
429+
} else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
434430
Ok(protobuf::PhysicalExprNode {
435431
expr_id: None,
436432
expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
@@ -442,7 +438,7 @@ pub fn serialize_physical_expr_with_converter(
442438
},
443439
))),
444440
})
445-
} else if let Some(cast) = value.as_any().downcast_ref::<TryCastExpr>() {
441+
} else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
446442
Ok(protobuf::PhysicalExprNode {
447443
expr_id: None,
448444
expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
@@ -454,7 +450,7 @@ pub fn serialize_physical_expr_with_converter(
454450
},
455451
))),
456452
})
457-
} else if let Some(expr) = value.as_any().downcast_ref::<ScalarFunctionExpr>() {
453+
} else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
458454
let mut buf = Vec::new();
459455
codec.try_encode_udf(expr.fun(), &mut buf)?;
460456
Ok(protobuf::PhysicalExprNode {
@@ -473,7 +469,7 @@ pub fn serialize_physical_expr_with_converter(
473469
},
474470
)),
475471
})
476-
} else if let Some(expr) = value.as_any().downcast_ref::<LikeExpr>() {
472+
} else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
477473
Ok(protobuf::PhysicalExprNode {
478474
expr_id: None,
479475
expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
@@ -489,7 +485,7 @@ pub fn serialize_physical_expr_with_converter(
489485
},
490486
))),
491487
})
492-
} else if let Some(expr) = value.as_any().downcast_ref::<HashExpr>() {
488+
} else if let Some(expr) = expr.downcast_ref::<HashExpr>() {
493489
Ok(protobuf::PhysicalExprNode {
494490
expr_id: None,
495491
expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr(
@@ -504,7 +500,7 @@ pub fn serialize_physical_expr_with_converter(
504500
},
505501
)),
506502
})
507-
} else if let Some(df) = value.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {
503+
} else if let Some(df) = expr.downcast_ref::<DynamicFilterPhysicalExpr>() {
508504
// Capture all state atomically, including the internal id.
509505
let snapshot = DynamicFilterSnapshot::from(df);
510506

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ use crate::cases::{
130130
CustomUDWF, CustomUDWFNode, MyAggregateUDF, MyAggregateUdfNode, MyRegexUdf,
131131
MyRegexUdfNode,
132132
};
133-
134133
use datafusion_physical_expr::expressions::{
135134
DynamicFilterPhysicalExpr, DynamicFilterSnapshot,
136135
};
@@ -3147,6 +3146,13 @@ fn roundtrip_dynamic_filter_expr_pair(
31473146
/// The pushed-down dynamic filter has been rewritten with
31483147
/// `reassign_expr_columns`, so the source schema is `b@0, a@1` while the
31493148
/// consumer-side filter above the projection still runs against `a@0`.
3149+
///
3150+
/// Returns `(left_before, right_before, left_after, right_after)` where:
3151+
///
3152+
/// - `left_before` is the filter above the projection before serialization
3153+
/// - `right_before` is the pushed-down scan filter before serialization
3154+
/// - `left_after` is the deserialized filter above the projection
3155+
/// - `right_after` is the deserialized pushed-down scan filter
31503156
fn roundtrip_dynamic_filter_plan_pair() -> Result<(
31513157
Arc<dyn PhysicalExpr>,
31523158
Arc<dyn PhysicalExpr>,

0 commit comments

Comments
 (0)