Skip to content

Commit 2cf8009

Browse files
adriangbclaude
andcommitted
Add PhysicalExprDecodeCtx and migrate Column decode
Adds the decode-side counterpart to `PhysicalExprEncodeCtx`. The public surface is a single `decode(node)` method on the context — the central match (and, in future, third-party registry lookups) live behind it. Per-expression contract: ```rust impl Column { pub fn try_from_proto( node: &PhysicalExprNode, ctx: &PhysicalExprDecodeCtx, ) -> Result<Arc<dyn PhysicalExpr>>; } ``` `try_from_proto` takes the whole `PhysicalExprNode` — the exact inverse of what `try_to_proto` returns — and unwraps its own `ExprType` variant. That keeps one signature for every expression (trait-ready) and gives decoders access to outer-node fields such as `expr_id`. The central match in `parse_physical_expr_with_converter` only routes to the right constructor. `Column` is the first expression to migrate. Other variants stay inline; they migrate in follow-ups, same shape, same trick (one expression per commit, central match keeps shrinking). Removes the temporary `FromProto<&PhysicalColumn> for Column` impl introduced in the proto-models extraction — `Column` now owns both directions of its own serialization. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent bc6b620 commit 2cf8009

3 files changed

Lines changed: 154 additions & 11 deletions

File tree

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,89 @@ pub mod proto_encode {
566566
}
567567
}
568568

569+
/// Decode-side counterpart to [`proto_encode`].
570+
///
571+
/// Expression authors implement an associated `try_from_proto` on their
572+
/// concrete type, with the signature
573+
///
574+
/// ```ignore
575+
/// fn try_from_proto(
576+
/// node: &PhysicalExprNode,
577+
/// ctx: &PhysicalExprDecodeCtx<'_>,
578+
/// ) -> Result<Arc<dyn PhysicalExpr>>
579+
/// ```
580+
///
581+
/// It takes the whole [`PhysicalExprNode`] — the exact inverse of what
582+
/// [`PhysicalExpr::try_to_proto`] returns — so the constructor can also see
583+
/// outer-node fields such as `expr_id`. The central match in
584+
/// `datafusion-proto` dispatches `ExprType` variants to these constructors.
585+
///
586+
/// As with the encode side, the public surface is a struct (not a `&dyn`
587+
/// trait) so future fields/helpers (registries for third-party expressions,
588+
/// schema-resolution caches, etc.) can be added without changing the
589+
/// signature every expression depends on.
590+
#[cfg(feature = "proto")]
591+
pub mod proto_decode {
592+
use std::sync::Arc;
593+
594+
use arrow::datatypes::Schema;
595+
use datafusion_common::Result;
596+
use datafusion_proto_models::protobuf::PhysicalExprNode;
597+
598+
use super::PhysicalExpr;
599+
600+
/// Decoder context handed to per-expression `try_from_proto` constructors.
601+
///
602+
/// Wraps an internal [`PhysicalExprDecode`] trait object plus a borrowed
603+
/// schema. The trait stays an implementation detail of `datafusion-proto`;
604+
/// expression authors only see this struct.
605+
pub struct PhysicalExprDecodeCtx<'a> {
606+
schema: &'a Schema,
607+
decoder: &'a dyn PhysicalExprDecode,
608+
}
609+
610+
impl<'a> PhysicalExprDecodeCtx<'a> {
611+
/// Construct a new decode context. Typically called by
612+
/// `datafusion-proto`; expression authors receive
613+
/// `&PhysicalExprDecodeCtx`.
614+
pub fn new(schema: &'a Schema, decoder: &'a dyn PhysicalExprDecode) -> Self {
615+
Self { schema, decoder }
616+
}
617+
618+
/// The schema bound to this decode context. Use it for column lookups,
619+
/// data-type resolution, etc.
620+
pub fn schema(&self) -> &Schema {
621+
self.schema
622+
}
623+
624+
/// Decode an expression node, recursing into child sub-expressions.
625+
///
626+
/// Routes built-in `ExprType` variants through `datafusion-proto`'s
627+
/// central match and forwards extension nodes to the registered codec
628+
/// (today via [`PhysicalExtensionCodec::try_decode_expr`]; later via
629+
/// a per-type registry — see #21835).
630+
///
631+
/// [`PhysicalExtensionCodec::try_decode_expr`]: https://docs.rs/datafusion-proto/latest/datafusion_proto/physical_plan/trait.PhysicalExtensionCodec.html#method.try_decode_expr
632+
pub fn decode(&self, node: &PhysicalExprNode) -> Result<Arc<dyn PhysicalExpr>> {
633+
self.decoder.decode(node, self.schema)
634+
}
635+
}
636+
637+
/// Internal dispatch trait. Implementors live in `datafusion-proto`.
638+
/// Expression authors should use [`PhysicalExprDecodeCtx`] instead of
639+
/// calling this directly.
640+
pub trait PhysicalExprDecode {
641+
/// Decode a proto node into a concrete `PhysicalExpr`. The schema is
642+
/// passed alongside so implementations can support recursive children
643+
/// and rebind the context per call (e.g. for nested plans).
644+
fn decode(
645+
&self,
646+
node: &PhysicalExprNode,
647+
schema: &Schema,
648+
) -> Result<Arc<dyn PhysicalExpr>>;
649+
}
650+
}
651+
569652
#[deprecated(
570653
since = "50.0.0",
571654
note = "Use `datafusion_expr_common::dyn_eq` instead"

datafusion/physical-expr/src/expressions/column.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,32 @@ impl PhysicalExpr for Column {
165165
}
166166
}
167167

168+
#[cfg(feature = "proto")]
169+
impl Column {
170+
/// Reconstruct a [`Column`] from its protobuf representation.
171+
///
172+
/// Takes the whole [`PhysicalExprNode`] — the exact inverse of what
173+
/// [`PhysicalExpr::try_to_proto`] produces — so every expression's
174+
/// `try_from_proto` shares one signature. The decode context is currently
175+
/// unused, but is threaded through so that future expressions with child
176+
/// sub-expressions can recurse via [`PhysicalExprDecodeCtx::decode`].
177+
///
178+
/// [`PhysicalExprNode`]: datafusion_proto_models::protobuf::PhysicalExprNode
179+
/// [`PhysicalExpr::try_to_proto`]: datafusion_physical_expr_common::physical_expr::PhysicalExpr::try_to_proto
180+
/// [`PhysicalExprDecodeCtx::decode`]: datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx::decode
181+
pub fn try_from_proto(
182+
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
183+
_ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
184+
) -> Result<Arc<dyn PhysicalExpr>> {
185+
use datafusion_proto_models::protobuf;
186+
let protobuf::PhysicalColumn { name, index } = match &node.expr_type {
187+
Some(protobuf::physical_expr_node::ExprType::Column(c)) => c,
188+
_ => return internal_err!("PhysicalExprNode is not a Column"),
189+
};
190+
Ok(Arc::new(Column::new(name, *index as usize)))
191+
}
192+
}
193+
168194
impl Column {
169195
fn bounds_check(&self, input_schema: &Schema) -> Result<()> {
170196
if self.index < input_schema.fields.len() {

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,14 @@ use super::{
5757
DefaultPhysicalProtoConverter, PhysicalExtensionCodec, PhysicalPlanDecodeContext,
5858
PhysicalProtoConverterExtension,
5959
};
60-
use crate::convert::{FromProto, TryFromProto};
60+
use crate::convert::TryFromProto;
6161
use crate::logical_plan::{self};
6262
use crate::protobuf::physical_expr_node::ExprType;
6363
use crate::{convert_required, convert_required_proto, protobuf};
6464
use datafusion_physical_expr::expressions::{
6565
DynamicFilterInner, DynamicFilterPhysicalExpr,
6666
};
6767

68-
impl FromProto<&protobuf::PhysicalColumn> for Column {
69-
fn from_proto(c: &protobuf::PhysicalColumn) -> Column {
70-
Column::new(&c.name, c.index as usize)
71-
}
72-
}
73-
7468
/// Parses a physical sort expression from a protobuf.
7569
///
7670
/// # Arguments
@@ -267,11 +261,24 @@ pub fn parse_physical_expr_with_converter(
267261
.as_ref()
268262
.ok_or_else(|| proto_error("Unexpected empty physical expression"))?;
269263

264+
// Decoder context handed to per-expression `try_from_proto` constructors.
265+
// This is the new shape the codebase is migrating toward (see #21835);
266+
// for now only `Column` is migrated and the rest of the variants are still
267+
// matched inline.
268+
let decoder = ConverterDecoder {
269+
ctx,
270+
proto_converter,
271+
};
272+
let decode_ctx =
273+
datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx::new(
274+
input_schema,
275+
&decoder,
276+
);
277+
270278
let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
271-
ExprType::Column(c) => {
272-
let pcol = Column::from_proto(c);
273-
Arc::new(pcol)
274-
}
279+
// Migrated expressions take the whole `PhysicalExprNode` and unwrap
280+
// their own `ExprType` variant (see #21835); this match only routes.
281+
ExprType::Column(_) => Column::try_from_proto(proto, &decode_ctx)?,
275282
ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
276283
ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
277284
ExprType::BinaryExpr(binary_expr) => {
@@ -905,6 +912,33 @@ impl TryFromProto<&protobuf::FileSinkConfig> for FileSinkConfig {
905912
}
906913
}
907914

915+
/// Concrete [`PhysicalExprDecode`] driver that backs
916+
/// [`PhysicalExprDecodeCtx`] inside `parse_physical_expr_with_converter`.
917+
///
918+
/// Today this is a thin wrapper that re-enters the central match through
919+
/// `proto_to_physical_expr`; once more expressions migrate, the central match
920+
/// shrinks and a future builder-style decoder can take over.
921+
///
922+
/// [`PhysicalExprDecode`]: datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecode
923+
/// [`PhysicalExprDecodeCtx`]: datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx
924+
struct ConverterDecoder<'a, 'b> {
925+
ctx: &'a PhysicalPlanDecodeContext<'b>,
926+
proto_converter: &'a dyn PhysicalProtoConverterExtension,
927+
}
928+
929+
impl datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecode
930+
for ConverterDecoder<'_, '_>
931+
{
932+
fn decode(
933+
&self,
934+
node: &protobuf::PhysicalExprNode,
935+
schema: &Schema,
936+
) -> Result<Arc<dyn PhysicalExpr>> {
937+
self.proto_converter
938+
.proto_to_physical_expr(node, schema, self.ctx)
939+
}
940+
}
941+
908942
#[cfg(test)]
909943
mod tests {
910944

0 commit comments

Comments
 (0)