Skip to content

Commit 6a74fa1

Browse files
Anurag Tryambak RautAnuragRaut08
authored andcommitted
refactor: add try_to_proto / try_from_proto to DynamicFilterPhysicalExpr
Part of apache#22434. Adds try_to_proto / try_from_proto to DynamicFilterPhysicalExpr so it participates in the expression-local serialization pattern introduced in apache#21929. The centralized arms in to_proto.rs / from_proto.rs remain as fallbacks for now. Cleanup of the pub-for-proto scaffolding (from_parts, inner, original_children, remapped_children) can follow once decode reads state directly via try_from_proto.
1 parent 107713f commit 6a74fa1

1 file changed

Lines changed: 108 additions & 0 deletions

File tree

  • datafusion/physical-expr/src/expressions/dynamic_filters

datafusion/physical-expr/src/expressions/dynamic_filters/mod.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,114 @@ impl DynamicFilterPhysicalExpr {
447447
}
448448
}
449449

450+
#[cfg(feature = "proto")]
451+
impl DynamicFilterPhysicalExpr {
452+
/// Serialize this expression to protobuf.
453+
///
454+
/// Encodes `children`, `remapped_children`, and the atomically-captured
455+
/// `Inner` state (expression id, generation, current expr, is_complete).
456+
pub fn try_to_proto(
457+
&self,
458+
ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
459+
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>>
460+
{
461+
use datafusion_proto_models::protobuf;
462+
use datafusion_proto_models::protobuf::physical_expr_node::ExprType;
463+
464+
let children = self
465+
.children
466+
.iter()
467+
.map(|c| ctx.encode_child(c))
468+
.collect::<Result<Vec<_>>>()?;
469+
470+
let remapped_children = match &self.remapped_children {
471+
Some(remapped) => remapped
472+
.iter()
473+
.map(|c| ctx.encode_child(c))
474+
.collect::<Result<Vec<_>>>()?,
475+
None => vec![],
476+
};
477+
478+
let inner = self.inner.read().clone();
479+
let inner_expr = Box::new(ctx.encode_child(&inner.expr)?);
480+
481+
Ok(Some(protobuf::PhysicalExprNode {
482+
expr_id: Some(inner.expression_id),
483+
expr_type: Some(ExprType::DynamicFilter(Box::new(
484+
protobuf::PhysicalDynamicFilterNode {
485+
children,
486+
remapped_children,
487+
generation: inner.generation,
488+
inner_expr: Some(inner_expr),
489+
is_complete: inner.is_complete,
490+
},
491+
))),
492+
}))
493+
}
494+
495+
/// Reconstruct a [`DynamicFilterPhysicalExpr`] from its protobuf representation.
496+
pub fn try_from_proto(
497+
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
498+
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
499+
) -> Result<Arc<dyn PhysicalExpr>>
500+
{
501+
use datafusion_proto_models::protobuf::physical_expr_node::ExprType;
502+
503+
let dynamic_filter = match &node.expr_type {
504+
Some(ExprType::DynamicFilter(df)) => df.as_ref(),
505+
_ => return datafusion_common::internal_err!(
506+
"PhysicalExprNode is not a DynamicFilter"
507+
),
508+
};
509+
510+
let expression_id = node.expr_id.ok_or_else(|| {
511+
datafusion_common::DataFusionError::Internal(
512+
"DynamicFilterPhysicalExpr requires PhysicalExprNode.expr_id \
513+
to be set by the serializer"
514+
.to_string(),
515+
)
516+
})?;
517+
518+
let children = dynamic_filter
519+
.children
520+
.iter()
521+
.map(|c| ctx.decode(c))
522+
.collect::<Result<Vec<_>>>()?;
523+
524+
let remapped_children = if !dynamic_filter.remapped_children.is_empty() {
525+
Some(
526+
dynamic_filter
527+
.remapped_children
528+
.iter()
529+
.map(|c| ctx.decode(c))
530+
.collect::<Result<Vec<_>>>()?,
531+
)
532+
} else {
533+
None
534+
};
535+
536+
let inner_expr = ctx.decode(
537+
dynamic_filter
538+
.inner_expr
539+
.as_deref()
540+
.ok_or_else(|| datafusion_common::DataFusionError::Internal(
541+
"DynamicFilterPhysicalExpr missing inner_expr".to_string(),
542+
))?,
543+
)?;
544+
545+
Ok(Arc::new(DynamicFilterPhysicalExpr::from_parts(
546+
children,
547+
remapped_children,
548+
Inner {
549+
expression_id,
550+
generation: dynamic_filter.generation,
551+
expr: inner_expr,
552+
is_complete: dynamic_filter.is_complete,
553+
},
554+
)))
555+
}
556+
}
557+
450558
impl PhysicalExpr for DynamicFilterPhysicalExpr {
451559
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
452560
self.remapped_children

0 commit comments

Comments
 (0)