Skip to content

Commit a334066

Browse files
support multiple path modes in variant_get arg
1 parent ebddd5e commit a334066

3 files changed

Lines changed: 135 additions & 68 deletions

File tree

src/impl_variant_get.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,12 @@ macro_rules! impl_variant_get_typed {
1212
#[derive(Debug, Hash, PartialEq, Eq)]
1313
pub struct $struct_name {
1414
signature: Signature,
15-
path_mode: crate::variant_get::PathMode,
1615
}
1716

1817
impl Default for $struct_name {
1918
fn default() -> Self {
2019
Self {
2120
signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable),
22-
path_mode: crate::variant_get::PathMode::DotNotation,
23-
}
24-
}
25-
}
26-
27-
impl $struct_name {
28-
/// Create a new instance with the specified path mode.
29-
pub fn with_path_mode(path_mode: crate::variant_get::PathMode) -> Self {
30-
Self {
31-
signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable),
32-
path_mode,
3321
}
3422
}
3523
}
@@ -52,7 +40,7 @@ macro_rules! impl_variant_get_typed {
5240
}
5341

5442
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
55-
invoke_variant_get_typed(args, $scalar_from, $array_from, $extract, self.path_mode)
43+
invoke_variant_get_typed(args, $scalar_from, $array_from, $extract)
5644
}
5745
}
5846
};

src/shared.rs

Lines changed: 77 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,12 @@ use arrow_schema::Fields;
88
use arrow_schema::extension::ExtensionType;
99
use arrow_schema::{DataType, Field};
1010
use datafusion::common::exec_datafusion_err;
11-
use datafusion::error::Result;
11+
use datafusion::error::{DataFusionError, Result};
1212
use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs};
1313
use datafusion::{common::exec_err, scalar::ScalarValue};
14-
use parquet_variant::Variant;
14+
use parquet_variant::{Variant, VariantPath, VariantPathElement};
1515
use parquet_variant_compute::{VariantArray, VariantType};
1616

17-
use crate::variant_get::PathMode;
18-
1917
#[cfg(test)]
2018
use parquet_variant_compute::VariantArrayBuilder;
2119

@@ -129,16 +127,14 @@ pub fn try_parse_string_columnar(array: &Arc<dyn Array>) -> Result<Vec<Option<&s
129127
pub fn variant_get_single_value<T>(
130128
variant_array: &VariantArray,
131129
index: usize,
132-
path: &str,
133-
path_mode: PathMode,
130+
path: &VariantPath<'_>,
134131
extract: for<'m, 'v> fn(Variant<'m, 'v>) -> Result<Option<T>>,
135132
) -> Result<Option<T>> {
136133
let Some(variant) = variant_array.iter().nth(index).flatten() else {
137134
return Ok(None);
138135
};
139136

140-
let variant_path = path_mode.try_build_path(path)?;
141-
let Some(value) = variant.get_path(&variant_path) else {
137+
let Some(value) = variant.get_path(path) else {
142138
return Ok(None);
143139
};
144140

@@ -147,20 +143,17 @@ pub fn variant_get_single_value<T>(
147143

148144
pub fn variant_get_array_values<T>(
149145
variant_array: &VariantArray,
150-
path: &str,
151-
path_mode: PathMode,
146+
path: &VariantPath<'_>,
152147
extract: for<'m, 'v> fn(Variant<'m, 'v>) -> Result<Option<T>>,
153148
) -> Result<Vec<Option<T>>> {
154-
let variant_path = path_mode.try_build_path(path)?;
155-
156149
variant_array
157150
.iter()
158151
.map(|maybe_variant| {
159152
let Some(variant) = maybe_variant else {
160153
return Ok(None);
161154
};
162155

163-
let Some(value) = variant.get_path(&variant_path) else {
156+
let Some(value) = variant.get_path(path) else {
164157
return Ok(None);
165158
};
166159

@@ -169,12 +162,66 @@ pub fn variant_get_array_values<T>(
169162
.collect()
170163
}
171164

165+
/// Build a [`VariantPath`] from a scalar value.
166+
///
167+
/// - **String** scalars use dot-notation parsing (e.g. `'a.b.c'` → path `[a, b, c]`)
168+
/// - **List** scalars treat each element as a single field name
169+
/// (e.g. `['a.b', 'c']` → path `[a.b, c]`), which is critical for keys that
170+
/// contain dots such as OTEL attribute keys like `http.response.status_code`.
171+
fn path_from_scalar(scalar: &ScalarValue) -> Result<VariantPath<'static>> {
172+
match scalar {
173+
ScalarValue::Utf8(Some(s))
174+
| ScalarValue::Utf8View(Some(s))
175+
| ScalarValue::LargeUtf8(Some(s)) => {
176+
let parsed =
177+
VariantPath::try_from(s.as_str()).map_err(Into::<DataFusionError>::into)?;
178+
Ok(to_owned_path(&parsed))
179+
}
180+
ScalarValue::Utf8(None) | ScalarValue::Utf8View(None) | ScalarValue::LargeUtf8(None) => {
181+
Ok(VariantPath::default())
182+
}
183+
ScalarValue::List(list_arr) => {
184+
if list_arr.is_null(0) {
185+
return Ok(VariantPath::default());
186+
}
187+
188+
path_from_list_values(list_arr.value(0))
189+
}
190+
other => exec_err!(
191+
"path must be a string or list of strings, got {}",
192+
other.data_type()
193+
),
194+
}
195+
}
196+
197+
fn path_from_list_values(values: ArrayRef) -> Result<VariantPath<'static>> {
198+
let strings = try_parse_string_columnar(&values)?;
199+
let elements = strings
200+
.iter()
201+
.map(|s| VariantPathElement::field(s.unwrap_or_default().to_string()))
202+
.collect();
203+
204+
Ok(VariantPath::new(elements))
205+
}
206+
207+
fn to_owned_path(path: &VariantPath<'_>) -> VariantPath<'static> {
208+
let elements = path
209+
.path()
210+
.iter()
211+
.map(|elem| match elem {
212+
VariantPathElement::Field { name } => VariantPathElement::field(name.to_string()),
213+
VariantPathElement::Index { index } => VariantPathElement::index(*index),
214+
})
215+
.collect();
216+
217+
VariantPath::new(elements)
218+
}
219+
172220
pub fn invoke_variant_get_typed<T>(
173221
args: ScalarFunctionArgs,
174222
scalar_from_option: fn(Option<T>) -> ScalarValue,
175223
array_from_values: fn(Vec<Option<T>>) -> ArrayRef,
176224
extract: for<'m, 'v> fn(Variant<'m, 'v>) -> Result<Option<T>>,
177-
path_mode: PathMode,
178225
) -> Result<ColumnarValue> {
179226
let (variant_arg, path_arg) = match args.args.as_slice() {
180227
[variant_arg, path_arg] => (variant_arg, path_arg),
@@ -190,25 +237,19 @@ pub fn invoke_variant_get_typed<T>(
190237

191238
let out = match (variant_arg, path_arg) {
192239
(ColumnarValue::Array(variant_array), ColumnarValue::Scalar(path_scalar)) => {
193-
let path = try_parse_string_scalar(path_scalar)?
194-
.map(|s| s.as_str())
195-
.unwrap_or_default();
196-
240+
let path = path_from_scalar(path_scalar)?;
197241
let variant_array = VariantArray::try_new(variant_array.as_ref())?;
198-
let values = variant_get_array_values(&variant_array, path, path_mode, extract)?;
242+
let values = variant_get_array_values(&variant_array, &path, extract)?;
199243
ColumnarValue::Array(array_from_values(values))
200244
}
201245
(ColumnarValue::Scalar(scalar_variant), ColumnarValue::Scalar(path_scalar)) => {
202246
let ScalarValue::Struct(variant_array) = scalar_variant else {
203247
return exec_err!("expected struct array");
204248
};
205249

206-
let path = try_parse_string_scalar(path_scalar)?
207-
.map(|s| s.as_str())
208-
.unwrap_or_default();
209-
250+
let path = path_from_scalar(path_scalar)?;
210251
let variant_array = VariantArray::try_new(variant_array.as_ref())?;
211-
let value = variant_get_single_value(&variant_array, 0, path, path_mode, extract)?;
252+
let value = variant_get_single_value(&variant_array, 0, &path, extract)?;
212253

213254
ColumnarValue::Scalar(scalar_from_option(value))
214255
}
@@ -220,10 +261,13 @@ pub fn invoke_variant_get_typed<T>(
220261
let paths = try_parse_string_columnar(paths)?;
221262
let variant_array = VariantArray::try_new(variant_array.as_ref())?;
222263

223-
let values: Vec<Option<T>> = (0..variant_array.len())
264+
let values = (0..variant_array.len())
224265
.map(|i| {
225-
let path = paths[i].unwrap_or_default();
226-
variant_get_single_value(&variant_array, i, path, path_mode, extract)
266+
let path_str = paths[i].unwrap_or_default();
267+
let path =
268+
VariantPath::try_from(path_str).map_err(Into::<DataFusionError>::into)?;
269+
270+
variant_get_single_value(&variant_array, i, &path, extract)
227271
})
228272
.collect::<Result<_>>()?;
229273

@@ -237,11 +281,13 @@ pub fn invoke_variant_get_typed<T>(
237281
let variant_array = VariantArray::try_new(variant_array.as_ref())?;
238282
let paths = try_parse_string_columnar(paths)?;
239283

240-
let values: Vec<Option<T>> = paths
284+
let values = paths
241285
.iter()
242-
.map(|path| {
243-
let path = path.unwrap_or_default();
244-
variant_get_single_value(&variant_array, 0, path, path_mode, extract)
286+
.map(|path_str| {
287+
let path_str = path_str.unwrap_or_default();
288+
let path =
289+
VariantPath::try_from(path_str).map_err(Into::<DataFusionError>::into)?;
290+
variant_get_single_value(&variant_array, 0, &path, extract)
245291
})
246292
.collect::<Result<_>>()?;
247293

src/variant_get.rs

Lines changed: 57 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,10 @@ mod tests {
429429
build_variant_get_args, standard_variant_get_arg_fields, variant_array_from_json_rows,
430430
variant_scalar_from_json,
431431
};
432-
use arrow::array::{Array, BinaryViewArray, BooleanArray, Int64Array};
432+
use arrow::{
433+
array::{Array, BinaryViewArray, BooleanArray, Int64Array, ListArray},
434+
buffer::OffsetBuffer,
435+
};
433436
use arrow_schema::Field;
434437
use datafusion::logical_expr::{ReturnFieldArgs, ScalarFunctionArgs};
435438
use parquet_variant::Variant;
@@ -1543,20 +1546,29 @@ mod tests {
15431546
assert!(bool_arr.is_null(3));
15441547
}
15451548

1549+
fn string_list_scalar(values: &[&str]) -> ScalarValue {
1550+
let string_array = Arc::new(StringViewArray::from(values.to_vec())) as ArrayRef;
1551+
1552+
ScalarValue::List(Arc::new(ListArray::new(
1553+
Arc::new(Field::new_list_field(DataType::Utf8View, true)),
1554+
OffsetBuffer::from_lengths([values.len()]),
1555+
string_array,
1556+
None,
1557+
)))
1558+
}
1559+
15461560
#[test]
1547-
fn test_get_str_with_single_field_mode_dotted_key() {
1548-
// VariantGetStrUdf with SingleField mode should treat dotted keys as a single field
1561+
fn test_get_str_list_path_dotted_key() {
1562+
// List path should treat each element as a single field — no dot splitting
15491563
let variant_input = variant_scalar_from_json(serde_json::json!({
15501564
"http.response.status_code": 200,
15511565
"service.name": "my-service"
15521566
}));
15531567

1554-
let udf = VariantGetStrUdf::with_path_mode(PathMode::SingleField);
1568+
let udf = VariantGetStrUdf::default();
15551569
let args = build_variant_get_args(
15561570
ColumnarValue::Scalar(variant_input),
1557-
ColumnarValue::Scalar(ScalarValue::Utf8(Some(
1558-
"http.response.status_code".to_string(),
1559-
))),
1571+
ColumnarValue::Scalar(string_list_scalar(&["http.response.status_code"])),
15601572
DataType::Utf8View,
15611573
standard_variant_get_arg_fields(),
15621574
);
@@ -1566,20 +1578,19 @@ mod tests {
15661578
let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(s))) = result else {
15671579
panic!("expected Utf8View scalar, got {result:?}");
15681580
};
1569-
// Integer 200 should be JSON-serialized to "200"
15701581
assert_eq!(s, "200");
15711582
}
15721583

15731584
#[test]
1574-
fn test_get_str_with_single_field_mode_string_value() {
1585+
fn test_get_str_list_path_string_value() {
15751586
let variant_input = variant_scalar_from_json(serde_json::json!({
15761587
"service.name": "my-service"
15771588
}));
15781589

1579-
let udf = VariantGetStrUdf::with_path_mode(PathMode::SingleField);
1590+
let udf = VariantGetStrUdf::default();
15801591
let args = build_variant_get_args(
15811592
ColumnarValue::Scalar(variant_input),
1582-
ColumnarValue::Scalar(ScalarValue::Utf8(Some("service.name".to_string()))),
1593+
ColumnarValue::Scalar(string_list_scalar(&["service.name"])),
15831594
DataType::Utf8View,
15841595
standard_variant_get_arg_fields(),
15851596
);
@@ -1589,13 +1600,12 @@ mod tests {
15891600
let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(s))) = result else {
15901601
panic!("expected Utf8View scalar, got {result:?}");
15911602
};
1592-
// String values returned as-is (no JSON quotes)
15931603
assert_eq!(s, "my-service");
15941604
}
15951605

15961606
#[test]
1597-
fn test_get_str_with_single_field_mode_array() {
1598-
// Test array input with SingleField mode
1607+
fn test_get_str_list_path_array_variant() {
1608+
// Test array variant input with list path
15991609
let json_rows = vec![
16001610
serde_json::json!({"http.status": 200, "http.method": "GET"}),
16011611
serde_json::json!({"http.status": 404, "http.method": "POST"}),
@@ -1604,10 +1614,10 @@ mod tests {
16041614

16051615
let variant_array = variant_array_from_json_rows(&json_rows);
16061616

1607-
let udf = VariantGetStrUdf::with_path_mode(PathMode::SingleField);
1617+
let udf = VariantGetStrUdf::default();
16081618
let args = build_variant_get_args(
16091619
ColumnarValue::Array(variant_array),
1610-
ColumnarValue::Scalar(ScalarValue::Utf8(Some("http.status".to_string()))),
1620+
ColumnarValue::Scalar(string_list_scalar(&["http.status"])),
16111621
DataType::Utf8View,
16121622
standard_variant_get_arg_fields(),
16131623
);
@@ -1625,15 +1635,15 @@ mod tests {
16251635
}
16261636

16271637
#[test]
1628-
fn test_get_int_with_single_field_mode() {
1638+
fn test_get_int_list_path_dotted_key() {
16291639
let variant_input = variant_scalar_from_json(serde_json::json!({
16301640
"http.status": 200
16311641
}));
16321642

1633-
let udf = VariantGetIntUdf::with_path_mode(PathMode::SingleField);
1643+
let udf = VariantGetIntUdf::default();
16341644
let args = build_variant_get_args(
16351645
ColumnarValue::Scalar(variant_input),
1636-
ColumnarValue::Scalar(ScalarValue::Utf8(Some("http.status".to_string()))),
1646+
ColumnarValue::Scalar(string_list_scalar(&["http.status"])),
16371647
DataType::Int64,
16381648
standard_variant_get_arg_fields(),
16391649
);
@@ -1647,14 +1657,37 @@ mod tests {
16471657
}
16481658

16491659
#[test]
1650-
fn test_get_str_dot_notation_splits_dotted_key() {
1651-
// With default DotNotation mode, dotted keys are split — should return NULL
1652-
// for a key like "http.response.status_code" that's stored as a single field
1660+
fn test_get_str_list_path_nested_traversal() {
1661+
// List path with multiple elements should traverse nested objects
1662+
let variant_input = variant_scalar_from_json(serde_json::json!({
1663+
"a": { "b": { "c": 42 } }
1664+
}));
1665+
1666+
let udf = VariantGetStrUdf::default();
1667+
let args = build_variant_get_args(
1668+
ColumnarValue::Scalar(variant_input),
1669+
ColumnarValue::Scalar(string_list_scalar(&["a", "b", "c"])),
1670+
DataType::Utf8View,
1671+
standard_variant_get_arg_fields(),
1672+
);
1673+
1674+
let result = udf.invoke_with_args(args).unwrap();
1675+
1676+
let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(s))) = result else {
1677+
panic!("expected Utf8View scalar, got {result:?}");
1678+
};
1679+
assert_eq!(s, "42");
1680+
}
1681+
1682+
#[test]
1683+
fn test_get_str_string_path_dot_notation_splits() {
1684+
// String path uses dot notation — should return NULL for a dotted key
1685+
// stored as a single field name
16531686
let variant_input = variant_scalar_from_json(serde_json::json!({
16541687
"http.response.status_code": 200
16551688
}));
16561689

1657-
let udf = VariantGetStrUdf::default(); // DotNotation
1690+
let udf = VariantGetStrUdf::default();
16581691
let args = build_variant_get_args(
16591692
ColumnarValue::Scalar(variant_input),
16601693
ColumnarValue::Scalar(ScalarValue::Utf8(Some(
@@ -1666,7 +1699,7 @@ mod tests {
16661699

16671700
let result = udf.invoke_with_args(args).unwrap();
16681701

1669-
// DotNotation splits on dots, tries to traverse http -> response -> status_code
1702+
// Dot notation splits on dots, tries to traverse http -> response -> status_code
16701703
// which doesn't exist, so returns NULL
16711704
let ColumnarValue::Scalar(ScalarValue::Utf8View(None)) = result else {
16721705
panic!("expected NULL (dot notation splits the key), got {result:?}");

0 commit comments

Comments
 (0)