diff --git a/Cargo.lock b/Cargo.lock index c18fd2012891c..817a62ac9d25e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,7 +112,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -123,7 +123,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -2051,6 +2051,7 @@ dependencies = [ "datafusion-proto", "datafusion-sql", "env_logger", + "erased-serde", "futures", "insta", "log", @@ -2346,6 +2347,7 @@ dependencies = [ "datafusion-functions", "datafusion-functions-aggregate-common", "datafusion-physical-expr-common", + "erased-serde", "half", "hashbrown 0.17.0", "indexmap 2.14.0", @@ -2356,6 +2358,8 @@ dependencies = [ "rand 0.9.4", "recursive", "rstest", + "serde", + "serde_json", "tokio", ] @@ -2381,12 +2385,15 @@ dependencies = [ "criterion", "datafusion-common", "datafusion-expr-common", + "erased-serde", "hashbrown 0.17.0", "indexmap 2.14.0", "itertools 0.14.0", "parking_lot", "pin-project", "rand 0.9.4", + "serde", + "serde_json", ] [[package]] @@ -2713,7 +2720,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -2850,6 +2857,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "erased-serde" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2add8a07dd6a8d93ff627029c51de145e12686fbc36ecb298ac22e74cf02dec" +dependencies = [ + "serde", + "serde_core", + "typeid", +] + [[package]] name = "errno" version = "0.3.14" @@ -2857,7 +2875,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -4119,7 +4137,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -4868,7 +4886,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.60.2", ] [[package]] @@ -5249,7 +5267,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -5711,7 +5729,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -5812,7 +5830,7 @@ dependencies = [ "cfg-if", "libc", "psm", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -5982,7 +6000,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -6428,6 +6446,12 @@ version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" +[[package]] +name = "typeid" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" + [[package]] name = "typenum" version = "1.20.0" @@ -6900,7 +6924,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 37734211266ba..612fa6034433c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -160,6 +160,7 @@ datafusion-substrait = { path = "datafusion/substrait", version = "53.1.0" } doc-comment = "0.3" env_logger = "0.11" +erased-serde = "0.4" flate2 = "1.1.9" futures = "0.3" glob = "0.3.0" @@ -190,6 +191,7 @@ rand = "0.9" recursive = "0.1.1" regex = "1.12" rstest = "0.26.1" +serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" sha2 = "^0.11.0" sqlparser = { version = "0.61.0", default-features = false, features = ["std", "visitor"] } diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index e56f5ad6b8ca7..a05a0d8c14b4b 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -56,6 +56,7 @@ datafusion-physical-expr-adapter = { workspace = true } datafusion-proto = { workspace = true } datafusion-sql = { workspace = true } env_logger = { workspace = true } +erased-serde = { workspace = true } futures = { workspace = true } insta = { workspace = true } log = { workspace = true } diff --git a/datafusion-examples/examples/physical_expr_serde/main.rs b/datafusion-examples/examples/physical_expr_serde/main.rs new file mode 100644 index 0000000000000..2ea4678f44ee0 --- /dev/null +++ b/datafusion-examples/examples/physical_expr_serde/main.rs @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Round-trip a custom `PhysicalExpr` through JSON. +//! +//! This example shows the four moving parts an extension author touches to +//! make their own `PhysicalExpr` round-trippable: +//! +//! 1. Pick a stable `TAG` and implement [`PhysicalExprDeserialize`] for it. +//! 2. Override `serde_tag` and `erased_serialize` in `PhysicalExpr` so the +//! type opts in to the serialization layer. +//! 3. Add the type to a `PhysicalExprRegistry` (here we layer it on top of +//! DataFusion's default registry). +//! 4. Serialize via `serde_json::to_string`; decode via +//! `registry.deserialize_json`. +//! +//! Run with: +//! ```bash +//! cargo run --example physical_expr_serde +//! ``` + +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use arrow::array::{ArrayRef, BooleanArray}; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; +use datafusion::common::Result; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_expr::expressions::Column; +use datafusion::physical_expr::serde::{ + DeserializeContext, PhysicalExprDeserialize, PhysicalExprRegistry, default_registry, +}; +use datafusion_expr::ColumnarValue; +use serde::{Deserialize, Serialize}; + +/// A toy `PhysicalExpr` that delegates to a child but flips its boolean +/// output. Self-contained, no children, holds a `String` tag and a child +/// expression — enough to exercise both `ctx.deserialize::()` for +/// owned fields and `ctx.registry().expr_seed()` for trait-object children. +#[derive(Debug, Eq, Serialize)] +struct AnnotatedNot { + label: String, + child: Arc, +} + +impl PartialEq for AnnotatedNot { + fn eq(&self, other: &Self) -> bool { + self.label == other.label && self.child.eq(&other.child) + } +} + +impl Hash for AnnotatedNot { + fn hash(&self, state: &mut H) { + self.label.hash(state); + self.child.hash(state); + } +} + +impl std::fmt::Display for AnnotatedNot { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "AnnotatedNot[{}]({})", self.label, self.child) + } +} + +impl PhysicalExpr for AnnotatedNot { + fn data_type(&self, _input_schema: &Schema) -> Result { + Ok(DataType::Boolean) + } + + fn nullable(&self, input_schema: &Schema) -> Result { + self.child.nullable(input_schema) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + let child = self.child.evaluate(batch)?; + match child { + ColumnarValue::Array(arr) => { + let bools = arr + .as_any() + .downcast_ref::() + .expect("AnnotatedNot child must yield Boolean"); + let flipped = arrow::compute::kernels::boolean::not(bools)?; + Ok(ColumnarValue::Array(Arc::new(flipped) as ArrayRef)) + } + ColumnarValue::Scalar(_) => unimplemented!("scalar path omitted"), + } + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(AnnotatedNot { + label: self.label.clone(), + child: Arc::clone(&children[0]), + })) + } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ANNOTATED_NOT({})", self.label) + } + + // Serialization opt-in: match TAG with the deserialize impl below. + fn serde_tag(&self) -> &'static str { + ::TAG + } + + fn erased_serialize(&self) -> Box { + Box::new(self) + } +} + +impl PhysicalExprDeserialize for AnnotatedNot { + const TAG: &'static str = "example.AnnotatedNot"; + + fn deserialize(ctx: &mut DeserializeContext<'_, '_>) -> Result { + // Hand-rolled because `child: Arc` isn't + // `Deserialize` — it has to recurse through the registry's seed. + use serde::de::{Deserializer as _, Error as DeError, MapAccess, Visitor}; + + struct V<'r> { + registry: &'r PhysicalExprRegistry, + } + impl<'de, 'r> Visitor<'de> for V<'r> { + type Value = AnnotatedNot; + fn expecting(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("AnnotatedNot {label, child}") + } + fn visit_map>( + self, + mut map: A, + ) -> std::result::Result { + let mut label: Option = None; + let mut child: Option> = None; + while let Some(k) = map.next_key::()? { + match k.as_str() { + "label" => label = Some(map.next_value()?), + "child" => { + child = Some(map.next_value_seed(self.registry.expr_seed())?); + } + _ => { + let _: serde::de::IgnoredAny = map.next_value()?; + } + } + } + Ok(AnnotatedNot { + label: label.ok_or_else(|| A::Error::missing_field("label"))?, + child: child.ok_or_else(|| A::Error::missing_field("child"))?, + }) + } + } + let registry = ctx.registry(); + ctx.deserializer() + .deserialize_map(V { registry }) + .map_err(|e| { + datafusion_common::exec_datafusion_err!("AnnotatedNot deserialize: {e}") + }) + } +} + +// I'm using a plain `Deserialize` derive for completeness even though the +// custom `PhysicalExprDeserialize` impl above does the actual work. The +// derive is harmless because the trait method bypasses it entirely. +impl<'de> Deserialize<'de> for AnnotatedNot { + fn deserialize>( + _deserializer: D, + ) -> std::result::Result { + Err(serde::de::Error::custom( + "AnnotatedNot must be deserialized through PhysicalExprRegistry", + )) + } +} + +fn main() -> Result<()> { + // Build an expression: AnnotatedNot { label: "demo", child: a@0 } + let expr: Arc = Arc::new(AnnotatedNot { + label: "demo".to_string(), + child: Arc::new(Column::new("a", 0)), + }); + println!("expr: {expr}"); + + // Serialize as JSON. No registry needed for serialization — the + // serde_tag + erased_serialize hooks live on the type itself. + let json = serde_json::to_string_pretty(&expr).unwrap(); + println!("\njson:\n{json}"); + + // Deserialize. We layer our custom registration on top of the default + // registry (which knows about Column, BinaryExpr, etc.). + let registry = default_registry().with::(); + let back = registry.deserialize_json(&json)?; + println!("\nback: {back}"); + + // Round-tripped expressions should compare equal. + assert!(expr.dyn_eq(back.as_ref())); + println!("\nround-trip OK"); + + // Sanity check: drop our type and the same JSON should be rejected. + let stripped_registry = default_registry(); + match stripped_registry.deserialize_json(&json) { + Ok(_) => panic!("expected error from registry without AnnotatedNot"), + Err(e) => println!("\nexpected error without registration: {e}"), + } + + Ok(()) +} diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml index 0e4748b81d3ff..9b27053bfd7bf 100644 --- a/datafusion/physical-expr-common/Cargo.toml +++ b/datafusion/physical-expr-common/Cargo.toml @@ -45,11 +45,14 @@ arrow = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true } datafusion-expr-common = { workspace = true } +erased-serde = { workspace = true } hashbrown = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true } parking_lot = { workspace = true } pin-project = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } [dev-dependencies] criterion = { workspace = true } diff --git a/datafusion/physical-expr-common/src/lib.rs b/datafusion/physical-expr-common/src/lib.rs index b6eaacdca2505..f9daf3aa8e1f0 100644 --- a/datafusion/physical-expr-common/src/lib.rs +++ b/datafusion/physical-expr-common/src/lib.rs @@ -35,6 +35,7 @@ pub mod binary_view_map; pub mod datum; pub mod metrics; pub mod physical_expr; +pub mod serde; pub mod sort_expr; pub mod tree_node; pub mod utils; diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index bdaddc324e328..ec303a16cf1a2 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -446,6 +446,40 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { fn placement(&self) -> ExpressionPlacement { ExpressionPlacement::KeepInPlace } + + /// Stable identifier used by the format-agnostic serialization layer to + /// dispatch deserialization back to a concrete type. + /// + /// Returning the empty string (the default) means "this expression is not + /// serializable"; the [`serde::Serialize`] impl for `dyn PhysicalExpr` will + /// produce an error in that case. Implementations that opt in should + /// return a stable, globally unique string — typically by mirroring + /// `::TAG`. + /// + /// See the [`serde`](crate::serde) module for the serialization story. + fn serde_tag(&self) -> &'static str { + "" + } + + /// Returns a type-erased serializable view of this expression's body. + /// + /// The default implementation returns a sentinel that produces a + /// descriptive error when serialized. Implementations that wish to + /// participate in serialization should override this and return a + /// `Box::new(self)` (which requires `Self: serde::Serialize`), or any + /// other value that implements [`serde::Serialize`]. + /// + /// Erasure via [`erased_serde::Serialize`] is what lets this method be + /// object-safe without `PhysicalExpr` having to extend `serde::Serialize` + /// (which would force every existing impl to add a `Serialize` impl). + /// + /// See the [`serde`](crate::serde) module for the full serialization + /// story, including how the result is wrapped in a `{tag, data}` envelope. + fn erased_serialize(&self) -> Box { + Box::new(crate::serde::NotSerializable(format!( + "PhysicalExpr serialization not implemented for {self}" + ))) + } } #[deprecated( diff --git a/datafusion/physical-expr-common/src/serde.rs b/datafusion/physical-expr-common/src/serde.rs new file mode 100644 index 0000000000000..e84d53f1024cd --- /dev/null +++ b/datafusion/physical-expr-common/src/serde.rs @@ -0,0 +1,420 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Format-agnostic serialization for [`PhysicalExpr`]. +//! +//! This module provides the building blocks that let any [`serde`]-compatible +//! format (JSON for debugging, protobuf via a serde adapter, etc.) serialize +//! an `Arc` without being coupled to a specific wire format. +//! +//! # Encoding +//! +//! Each expression is encoded as a `{tag, data}` envelope. The `tag` is the +//! string returned from [`PhysicalExpr::serde_tag`] and is used by the +//! deserialization side to dispatch back to a concrete type. The `data` is +//! the body produced by [`PhysicalExpr::erased_serialize`]. +//! +//! `Arc` is serializable too: serde's `rc` feature provides +//! a blanket `Serialize` impl for `Arc` whenever `T: Serialize`. +//! +//! # Opt-in +//! +//! Both `serde_tag` and `erased_serialize` have default implementations on the +//! `PhysicalExpr` trait. Expressions that don't override them are not +//! serializable — the [`Serialize`] impl below will fail at runtime with a +//! descriptive error. +//! +//! # Deserialization +//! +//! Decoding goes through a [`PhysicalExprRegistry`]: implementers register a +//! constructor for each tag, and the registry's `deserialize_*` methods +//! dispatch on the tag to call the right constructor. Implementers opt in +//! by implementing [`PhysicalExprDeserialize`]. +//! +//! Decoding is format-agnostic: the trait method receives a +//! [`DeserializeContext`] holding `&mut dyn erased_serde::Deserializer<'de>` +//! plus the registry. Convenience entry points like +//! [`PhysicalExprRegistry::deserialize_json`] wrap a concrete serde +//! `Deserializer` for callers; other formats can be added by writing a +//! similar wrapper. +//! +//! Wire stability across DataFusion versions is **not** a goal of this layer. +//! Use the proto crate for stable cross-version wire formats. + +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; + +use datafusion_common::{Result, exec_datafusion_err}; +use serde::Serialize; +use serde::de::{Deserialize, DeserializeSeed, Error as DeError, MapAccess, Visitor}; +use serde::ser::{Error as _, SerializeStruct, Serializer}; + +use crate::physical_expr::PhysicalExpr; + +impl Serialize for dyn PhysicalExpr { + fn serialize(&self, serializer: S) -> Result { + let tag = self.serde_tag(); + if tag.is_empty() { + return Err(S::Error::custom(format!( + "PhysicalExpr serialization not implemented for {self}" + ))); + } + + let body = self.erased_serialize(); + let mut state = serializer.serialize_struct("PhysicalExpr", 2)?; + state.serialize_field("tag", tag)?; + // `&dyn erased_serde::Serialize` implements `serde::Serialize` via the + // `serialize_trait_object!(Serialize)` invocation inside erased_serde. + state.serialize_field("data", &*body)?; + state.end() + } +} + +/// Sentinel returned by the default [`PhysicalExpr::erased_serialize`] impl. +/// +/// Serializing this value fails with a descriptive error. Used so that the +/// trait method can have a default implementation without needing +/// specialization or a separate "is serializable" branch outside the tag +/// check. +#[doc(hidden)] +pub struct NotSerializable(pub String); + +impl Serialize for NotSerializable { + fn serialize(&self, _serializer: S) -> Result { + Err(S::Error::custom(&self.0)) + } +} + +/// Trait implemented by expressions that opt in to deserialization. +/// +/// Each implementer pairs a stable string tag (`TAG`) with a constructor +/// (`deserialize`) that rebuilds the expression from a serde +/// [`Deserializer`]. The tag must be globally unique and must agree with +/// what the type returns from [`PhysicalExpr::serde_tag`] — the convention +/// is to use `TAG` for both: +/// +/// ```ignore +/// impl PhysicalExpr for MyExpr { +/// fn serde_tag(&self) -> &'static str { Self::TAG } +/// // ... +/// } +/// ``` +/// +/// # Implementing for leaf expressions (no trait-object children) +/// +/// Derive `serde::Deserialize` on the type and call `ctx.deserialize::()`: +/// +/// ```ignore +/// fn deserialize(ctx: &mut DeserializeContext<'_, '_>) -> Result { +/// ctx.deserialize::() +/// } +/// ``` +/// +/// # Implementing for expressions with trait-object children +/// +/// `Arc` is not directly `Deserialize` — child expressions +/// need to recurse through the registry. Use [`DeserializeContext::expr_seed`] +/// inside a hand-written `Visitor`: +/// +/// ```ignore +/// struct V<'r> { registry: &'r PhysicalExprRegistry } +/// impl<'de, 'r> Visitor<'de> for V<'r> { +/// type Value = MyExpr; +/// fn visit_map>(self, mut map: A) -> Result { +/// let mut child = None; +/// while let Some(k) = map.next_key::()? { +/// match k.as_str() { +/// "child" => child = Some(map.next_value_seed(self.registry.expr_seed())?), +/// _ => { let _: serde::de::IgnoredAny = map.next_value()?; } +/// } +/// } +/// Ok(MyExpr { child: child.ok_or_else(|| Error::missing_field("child"))? }) +/// } +/// } +/// ctx.deserializer().deserialize_map(V { registry: ctx.registry() }) +/// ``` +pub trait PhysicalExprDeserialize: PhysicalExpr + Sized { + /// Stable identifier for this expression in serialized form. + const TAG: &'static str; + + /// Rebuild `Self` from the body of an envelope. + fn deserialize(ctx: &mut DeserializeContext<'_, '_>) -> Result; +} + +/// Context passed to [`PhysicalExprDeserialize::deserialize`]. +/// +/// Holds the registry (so child expressions can recurse via +/// [`PhysicalExprRegistry::expr_seed`]) and a type-erased deserializer +/// pointing at the expression's data body. +pub struct DeserializeContext<'reg, 'de> { + registry: &'reg PhysicalExprRegistry, + de: &'reg mut dyn erased_serde::Deserializer<'de>, +} + +impl<'reg, 'de> DeserializeContext<'reg, 'de> { + /// Returns the registry, primarily to access [`PhysicalExprRegistry::expr_seed`] + /// when deserializing trait-object children. + pub fn registry(&self) -> &'reg PhysicalExprRegistry { + self.registry + } + + /// Direct access to the erased deserializer. Use this when implementing a + /// custom `Visitor` for an expression with trait-object children. + pub fn deserializer(&mut self) -> &mut dyn erased_serde::Deserializer<'de> { + &mut *self.de + } + + /// Deserialize the data body as `T`. Convenience for leaf expressions + /// whose body is `T: serde::Deserialize`. + pub fn deserialize>(&mut self) -> Result { + erased_serde::deserialize(&mut *self.de) + .map_err(|e| exec_datafusion_err!("PhysicalExpr deserialize failed: {e}")) + } + + /// Deserialize the data body as a struct with a single + /// `Arc` field named `field`. Helper for unary wrapper + /// expressions like `NotExpr`, `NegativeExpr`, `IsNullExpr`, etc. + pub fn deserialize_unary( + &mut self, + field: &'static str, + ) -> Result> { + struct V<'r> { + registry: &'r PhysicalExprRegistry, + field: &'static str, + } + impl<'de, 'r> Visitor<'de> for V<'r> { + type Value = Arc; + fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "a unary expression body {{{}}}", self.field) + } + fn visit_map>( + self, + mut map: A, + ) -> std::result::Result { + let mut child: Option> = None; + while let Some(k) = map.next_key::()? { + if k == self.field { + child = Some(map.next_value_seed(self.registry.expr_seed())?); + } else { + let _: serde::de::IgnoredAny = map.next_value()?; + } + } + child.ok_or_else(|| A::Error::missing_field(self.field)) + } + } + let registry = self.registry; + serde::Deserializer::deserialize_map(&mut *self.de, V { registry, field }) + .map_err(|e| exec_datafusion_err!("unary expr deserialize failed: {e}")) + } +} + +/// `DeserializeSeed` that reads a `{tag, data}` envelope and dispatches to the +/// registered constructor for `tag`. Use via [`PhysicalExprRegistry::expr_seed`] +/// inside `next_value_seed` / `next_element_seed` calls. +pub struct ExprSeed<'reg> { + registry: &'reg PhysicalExprRegistry, +} + +impl<'de, 'reg> DeserializeSeed<'de> for ExprSeed<'reg> { + type Value = Arc; + + fn deserialize>( + self, + deserializer: D, + ) -> std::result::Result { + deserializer.deserialize_map(EnvelopeVisitor { + registry: self.registry, + }) + } +} + +struct EnvelopeVisitor<'reg> { + registry: &'reg PhysicalExprRegistry, +} + +impl<'de, 'reg> Visitor<'de> for EnvelopeVisitor<'reg> { + type Value = Arc; + + fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("a {tag, data} PhysicalExpr envelope") + } + + fn visit_map>( + self, + mut map: A, + ) -> std::result::Result { + // Tag must come first — that's how we serialize, and reading it first + // lets us dispatch the data field to the correct constructor without + // buffering. + let tag_key: String = map + .next_key()? + .ok_or_else(|| A::Error::missing_field("tag"))?; + if tag_key != "tag" { + return Err(A::Error::custom(format!( + "expected `tag` as first key in PhysicalExpr envelope, got {tag_key:?}" + ))); + } + let tag: String = map.next_value()?; + + let constructor = + *self + .registry + .constructors + .get(tag.as_str()) + .ok_or_else(|| { + A::Error::custom(format!( + "no PhysicalExpr registered under tag {tag:?}" + )) + })?; + + let data_key: String = map + .next_key()? + .ok_or_else(|| A::Error::missing_field("data"))?; + if data_key != "data" { + return Err(A::Error::custom(format!( + "expected `data` after `tag` in PhysicalExpr envelope, got {data_key:?}" + ))); + } + + map.next_value_seed(DataSeed { + registry: self.registry, + constructor, + }) + } +} + +struct DataSeed<'reg> { + registry: &'reg PhysicalExprRegistry, + constructor: Constructor, +} + +impl<'de, 'reg> DeserializeSeed<'de> for DataSeed<'reg> { + type Value = Arc; + + fn deserialize>( + self, + deserializer: D, + ) -> std::result::Result { + let mut erased = >::erase(deserializer); + let mut ctx = DeserializeContext { + registry: self.registry, + de: &mut erased, + }; + (self.constructor)(&mut ctx).map_err(D::Error::custom) + } +} + +type Constructor = for<'reg, 'de> fn( + &mut DeserializeContext<'reg, 'de>, +) -> Result>; + +/// Registry mapping serialization tags to constructors. +/// +/// Built up with [`PhysicalExprRegistry::empty`] and +/// [`PhysicalExprRegistry::with`] in builder style. The `physical-expr` crate +/// provides a `default_registry()` that returns a registry pre-populated with +/// all of DataFusion's built-in expressions. +pub struct PhysicalExprRegistry { + constructors: HashMap<&'static str, Constructor>, +} + +impl PhysicalExprRegistry { + /// Returns an empty registry. Use [`Self::with`] to add constructors. + pub fn empty() -> Self { + Self { + constructors: HashMap::new(), + } + } + + /// Register the constructor for `T`. + /// + /// Panics if `T::TAG` is empty (the "not serializable" sentinel) or if a + /// constructor was already registered under `T::TAG`. Tag collisions + /// almost always indicate a programming error — two types claiming the + /// same identifier — so failing loudly during registry construction is + /// the safer default. Use [`Self::try_with`] if you need a fallible + /// version (e.g. when registering plugins discovered at runtime). + pub fn with(self) -> Self { + match self.try_with::() { + Ok(reg) => reg, + Err(e) => panic!("{e}"), + } + } + + /// Fallible version of [`Self::with`]. Returns the registry unchanged on + /// success, or a [`DataFusionError`] on tag collision / empty tag. + pub fn try_with(mut self) -> Result { + let tag = T::TAG; + if tag.is_empty() { + return Err(exec_datafusion_err!( + "PhysicalExprDeserialize::TAG must not be empty" + )); + } + if self.constructors.contains_key(tag) { + return Err(exec_datafusion_err!( + "PhysicalExprRegistry: duplicate registration for tag {tag:?}" + )); + } + self.constructors.insert(tag, |ctx| { + T::deserialize(ctx).map(|v| Arc::new(v) as Arc) + }); + Ok(self) + } + + /// Returns true if a constructor is registered for `tag`. + pub fn contains_tag(&self, tag: &str) -> bool { + self.constructors.contains_key(tag) + } + + /// Returns a `DeserializeSeed` that reads a `{tag, data}` envelope and + /// produces an `Arc`. Use this inside a custom + /// `Visitor` to deserialize trait-object children of an expression. + pub fn expr_seed(&self) -> ExprSeed<'_> { + ExprSeed { registry: self } + } + + /// Decode an expression from any serde [`Deserializer`]. + /// + /// This is the format-agnostic entry point. Convenience methods like + /// [`Self::deserialize_json`] wrap this for specific formats. + pub fn deserialize<'de, D: serde::Deserializer<'de>>( + &self, + deserializer: D, + ) -> std::result::Result, D::Error> { + self.expr_seed().deserialize(deserializer) + } + + /// Decode a JSON-serialized expression. Convenience wrapper around + /// [`Self::deserialize`] with a `serde_json` deserializer. + pub fn deserialize_json(&self, s: &str) -> Result> { + let mut de = serde_json::Deserializer::from_str(s); + let expr = self + .deserialize(&mut de) + .map_err(|e| exec_datafusion_err!("PhysicalExpr JSON decode failed: {e}"))?; + de.end().map_err(|e| { + exec_datafusion_err!("PhysicalExpr JSON had trailing data: {e}") + })?; + Ok(expr) + } +} + +impl Default for PhysicalExprRegistry { + fn default() -> Self { + Self::empty() + } +} diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index b755353d75658..3597aae409a5b 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -50,12 +50,15 @@ datafusion-expr = { workspace = true } datafusion-expr-common = { workspace = true } datafusion-functions-aggregate-common = { workspace = true } datafusion-physical-expr-common = { workspace = true } +erased-serde = { workspace = true } hashbrown = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true, features = ["use_std"] } parking_lot = { workspace = true } petgraph = "0.8.3" recursive = { workspace = true, optional = true } +serde = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true } half = { workspace = true } diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index ba8cd5e6360a1..29f944bfe4ff8 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -30,6 +30,10 @@ use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{Result, internal_err, plan_err}; use datafusion_expr::ColumnarValue; use datafusion_expr_common::placement::ExpressionPlacement; +use datafusion_physical_expr_common::serde::{ + DeserializeContext, PhysicalExprDeserialize, +}; +use serde::{Deserialize, Serialize}; /// Represents the column at a given index in a RecordBatch /// @@ -63,7 +67,7 @@ use datafusion_expr_common::placement::ExpressionPlacement; /// assert_eq!(column_c.index(), 2); /// ``` /// [logical `Expr::Column`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html#variant.Column -#[derive(Debug, Hash, PartialEq, Eq, Clone)] +#[derive(Debug, Hash, PartialEq, Eq, Clone, Serialize, Deserialize)] pub struct Column { /// The name of the column (used for debugging and display purposes) name: String, @@ -145,6 +149,22 @@ impl PhysicalExpr for Column { fn placement(&self) -> ExpressionPlacement { ExpressionPlacement::Column } + + fn serde_tag(&self) -> &'static str { + ::TAG + } + + fn erased_serialize(&self) -> Box { + Box::new(self) + } +} + +impl PhysicalExprDeserialize for Column { + const TAG: &'static str = "Column"; + + fn deserialize(ctx: &mut DeserializeContext<'_, '_>) -> Result { + ctx.deserialize::() + } } impl Column { diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index 917d3a953573b..d2bcef5a5b55b 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -29,9 +29,13 @@ use datafusion_common::{Result, ScalarValue, cast::as_boolean_array, internal_er use datafusion_expr::ColumnarValue; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::statistics::Distribution::{self, Bernoulli}; +use datafusion_physical_expr_common::serde::{ + DeserializeContext, PhysicalExprDeserialize, +}; +use serde::Serialize; /// Not expression -#[derive(Debug, Eq)] +#[derive(Debug, Eq, Serialize)] pub struct NotExpr { /// Input expression arg: Arc, @@ -178,6 +182,22 @@ impl PhysicalExpr for NotExpr { write!(f, "NOT ")?; self.arg.fmt_sql(f) } + + fn serde_tag(&self) -> &'static str { + ::TAG + } + + fn erased_serialize(&self) -> Box { + Box::new(self) + } +} + +impl PhysicalExprDeserialize for NotExpr { + const TAG: &'static str = "NotExpr"; + + fn deserialize(ctx: &mut DeserializeContext<'_, '_>) -> Result { + Ok(NotExpr::new(ctx.deserialize_unary("arg")?)) + } } /// Creates a unary expression NOT diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 848bf81d15979..573ec804048f1 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -42,6 +42,7 @@ pub mod planner; pub mod projection; mod scalar_function; pub mod scalar_subquery; +pub mod serde; pub mod simplifier; pub mod statistics; pub mod utils; diff --git a/datafusion/physical-expr/src/serde.rs b/datafusion/physical-expr/src/serde.rs new file mode 100644 index 0000000000000..da0e4c72a77f8 --- /dev/null +++ b/datafusion/physical-expr/src/serde.rs @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`PhysicalExprRegistry::new`] — a registry pre-populated with all of +//! DataFusion's built-in physical expressions. +//! +//! See [`datafusion_physical_expr_common::serde`] for the underlying +//! serialization/deserialization machinery and the +//! [`PhysicalExprDeserialize`] trait. + +pub use datafusion_physical_expr_common::serde::{ + DeserializeContext, PhysicalExprDeserialize, PhysicalExprRegistry, +}; + +use crate::expressions::{Column, NotExpr}; + +/// Returns a [`PhysicalExprRegistry`] with the subset of DataFusion's +/// built-in physical expressions that have been wired up to the new serde +/// hook so far. +/// +/// Currently registered: +/// +/// - [`Column`] +/// - [`NotExpr`] +/// +/// More built-ins will be added in follow-up PRs as their internal types +/// (notably [`datafusion_common::ScalarValue`] and `arrow::DataType`) gain +/// `serde::{Serialize, Deserialize}` impls. +/// +/// Use [`PhysicalExprRegistry::with`] to layer additional custom expressions +/// on top. +pub fn default_registry() -> PhysicalExprRegistry { + PhysicalExprRegistry::empty() + .with::() + .with::() +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + + fn round_trip(expr: Arc) { + let json = serde_json::to_string(&expr).unwrap(); + let registry = default_registry(); + let back = registry.deserialize_json(&json).unwrap_or_else(|e| { + panic!("deserialize failed for {json}: {e}"); + }); + assert!( + expr.dyn_eq(back.as_ref()), + "round-trip mismatch:\n before: {expr}\n after: {back}\n json: {json}" + ); + } + + #[test] + fn column_round_trip() { + round_trip(Arc::new(Column::new("a", 3))); + } + + #[test] + fn not_expr_round_trip() { + round_trip(Arc::new(NotExpr::new(Arc::new(Column::new("a", 0))))); + } + + #[test] + fn nested_not_round_trip() { + // NOT(NOT(a)) + let inner = NotExpr::new(Arc::new(Column::new("a", 0))); + let expr: Arc = Arc::new(NotExpr::new(Arc::new(inner))); + round_trip(expr); + } + + #[test] + fn unknown_tag_errors() { + let registry = PhysicalExprRegistry::empty(); + let json = r#"{"tag":"Column","data":{"name":"a","index":0}}"#; + let err = registry.deserialize_json(json).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("no PhysicalExpr registered under tag"), + "{msg}" + ); + } + + #[test] + fn try_with_duplicate_tag_returns_err() { + let result = PhysicalExprRegistry::empty() + .with::() + .try_with::(); + let err = match result { + Ok(_) => panic!("expected error on duplicate registration"), + Err(e) => e, + }; + assert!(err.to_string().contains("duplicate registration"), "{err}"); + } + + #[test] + #[should_panic(expected = "duplicate registration")] + fn with_duplicate_tag_panics() { + let _ = PhysicalExprRegistry::empty() + .with::() + .with::(); + } +} diff --git a/docs/source/index.rst b/docs/source/index.rst index cc0da3c44473e..97ad8c14a3b6c 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -139,6 +139,7 @@ To get started, see library-user-guide/using-the-sql-api library-user-guide/extending-sql library-user-guide/working-with-exprs + library-user-guide/physical-expr-serde library-user-guide/using-the-dataframe-api library-user-guide/building-logical-plans library-user-guide/catalogs diff --git a/docs/source/library-user-guide/physical-expr-serde.md b/docs/source/library-user-guide/physical-expr-serde.md new file mode 100644 index 0000000000000..c053b2b731395 --- /dev/null +++ b/docs/source/library-user-guide/physical-expr-serde.md @@ -0,0 +1,181 @@ + + +# Serializing `PhysicalExpr` + +DataFusion has a format-agnostic serialization layer for +`Arc`. It is built on `serde`, so any `serde::Serializer` / +`serde::Deserializer` works — JSON for debugging, bincode for binary, a serde +adapter on top of protobuf, etc. + +This page describes how to make a custom `PhysicalExpr` round-trippable. +For the **stable wire format** that the Ballista distributed-execution layer +uses across DataFusion versions, see the +[`datafusion-proto`](https://docs.rs/datafusion-proto/) crate. The +serde-based path documented here is **not** wire-stable across versions — +use it for in-process tools, debugging, and intra-cluster ephemeral +serialization. + +## Overview + +There are four moving parts: + +| Concept | Where it lives | Purpose | +| --- | --- | --- | +| `PhysicalExpr::serde_tag` | trait method, default `""` | stable identifier (`"Column"`, `"BinaryExpr"`, …) used to dispatch deserialization | +| `PhysicalExpr::erased_serialize` | trait method, default errors | returns a type-erased `serde::Serialize` view of `self` | +| `PhysicalExprDeserialize` | separate trait | constructor that rebuilds `Self` from a `DeserializeContext` | +| `PhysicalExprRegistry` | builder-style map | tag → constructor lookup; built up explicitly with `.with::()` | + +The encoded form is a `{tag, data}` envelope. `tag` is the string returned +from `serde_tag`; `data` is whatever `erased_serialize` produces. + +## Implementing for a custom expression + +Walk through `datafusion-examples/examples/physical_expr_serde/main.rs` for +a runnable end-to-end demo. The shape is: + +```rust,ignore +use std::sync::Arc; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_expr::serde::{ + DeserializeContext, PhysicalExprDeserialize, PhysicalExprRegistry, + default_registry, +}; +use serde::Serialize; + +#[derive(Debug, Eq, PartialEq, Hash, Serialize)] +pub struct MyExpr { + name: String, + child: Arc, +} + +impl PhysicalExpr for MyExpr { + // ... usual evaluate, data_type, fmt_sql, etc ... + + fn serde_tag(&self) -> &'static str { + ::TAG + } + + fn erased_serialize(&self) -> Box { + Box::new(self) + } +} + +impl PhysicalExprDeserialize for MyExpr { + const TAG: &'static str = "myproject.MyExpr"; + + fn deserialize( + ctx: &mut DeserializeContext<'_, '_>, + ) -> datafusion::common::Result { + // For a leaf expression with no trait-object children, just delegate + // to the auto-derived `serde::Deserialize` impl: + // + // ctx.deserialize::() + // + // Expressions with `Arc` fields (children) need a + // hand-rolled `Visitor` that uses `ctx.registry().expr_seed()` for + // each child. See the example file for a complete walkthrough. + unimplemented!("see example") + } +} +``` + +Then layer your type onto a registry and round-trip: + +```rust,ignore +let registry = default_registry().with::(); +let json = serde_json::to_string(&expr)?; +let back: Arc = registry.deserialize_json(&json)?; +``` + +## Children that are themselves `PhysicalExpr` + +`Arc` deliberately doesn't implement `serde::Deserialize` +— deserialization needs the registry, and `Deserialize` doesn't carry +context. To recurse on a child, use the registry's `DeserializeSeed`: + +```rust,ignore +fn visit_map>(self, mut map: A) -> Result { + let mut child: Option> = None; + while let Some(k) = map.next_key::()? { + match k.as_str() { + "child" => child = Some(map.next_value_seed(self.registry.expr_seed())?), + _ => { let _: serde::de::IgnoredAny = map.next_value()?; } + } + } + // ... +} +``` + +The unary case (single child of type `Arc`) is common +enough that there's a built-in helper: `ctx.deserialize_unary("arg")`. + +## Tags + +Tags must be globally unique. Pick something specific to your project: +prefix with your crate name (`"myproject.MyExpr"`) rather than just +`"MyExpr"`, since DataFusion's built-ins occupy short names like +`"Column"` and `"BinaryExpr"`. + +`PhysicalExprRegistry::with::()` panics on duplicate tag registration — +this is almost always a programming error and failing loudly during +registry construction is safer than silently shadowing. If you need to +register from runtime-discovered plugins, use `try_with::()` which +returns `Result`. + +A tag of `""` (the default `serde_tag` returns) is treated as "not +serializable"; serialization fails with a descriptive error. This means +adding a new `PhysicalExpr` impl is non-breaking — existing types simply +won't be serializable until they opt in. + +## Choosing a format + +The `PhysicalExprRegistry::deserialize` method accepts any +`serde::Deserializer`, so any format works. Common choices: + +- **JSON** (`serde_json`) — readable, great for tests and debugging. Use + `Registry::deserialize_json(s)` for convenience. +- **Bincode / postcard** — compact binary for over-the-wire ephemeral use. +- **Protobuf via a serde adapter** — if you want to reuse existing `.proto` + schemas; note this is distinct from `datafusion-proto`'s downcast-based + path. + +Wire stability across DataFusion versions is **not** a goal of this layer. +Use `datafusion-proto` for that. + +## What's currently registered + +`default_registry()` currently ships with `Column` and `NotExpr` — the +two expressions wired up in this initial PR to validate the trait surface +end-to-end. The rest of the built-ins will be added in follow-ups, in +batches grouped by what they need from the rest of the codebase +(`Operator`, `ScalarValue`, `arrow::DataType`, etc., all need their own +`serde` impls before the expressions that hold them can be registered). +Expressions that haven't opted in fall back to the default `serde_tag` +(empty string) and serialization fails with a "not implemented" error. + +## See also + +- `datafusion-examples/examples/physical_expr_serde/main.rs` — runnable + custom-expression round-trip. +- `datafusion-physical-expr-common::serde` — module-level docs covering the + trait and registry. +- `datafusion-proto` — wire-stable, version-portable proto serialization + (separate path).