Skip to content

Commit c705c2e

Browse files
committed
Reuse shared argument error helpers in remaining UDFs
1 parent a4725d8 commit c705c2e

5 files changed

Lines changed: 52 additions & 26 deletions

File tree

src/cast_to_variant.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::sync::Arc;
33
use arrow::array::{Array, ArrayRef, AsArray, StructArray};
44
use arrow_schema::{DataType, Field};
55
use datafusion::{
6-
common::exec_err,
76
error::Result,
87
logical_expr::{
98
ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature,
@@ -14,7 +13,9 @@ use datafusion::{
1413
use parquet_variant::Variant;
1514
use parquet_variant_compute::{VariantArray, VariantArrayBuilder, cast_to_variant};
1615

17-
use crate::shared::{try_parse_binary_columnar, try_parse_binary_scalar};
16+
use crate::shared::{
17+
arg_shape_err, args_count_err, try_parse_binary_columnar, try_parse_binary_scalar,
18+
};
1819

1920
#[derive(Debug, Hash, PartialEq, Eq)]
2021
pub struct CastToVariantUdf {
@@ -57,15 +58,19 @@ impl CastToVariantUdf {
5758
}
5859

5960
fn from_metadata_value(
61+
udf_name: &str,
6062
metadata_argument: &ColumnarValue,
6163
variant_argument: &ColumnarValue,
6264
) -> Result<ColumnarValue> {
6365
let out = match (metadata_argument, variant_argument) {
6466
(ColumnarValue::Array(metadata_array), ColumnarValue::Array(value_array)) => {
6567
if metadata_array.len() != value_array.len() {
66-
return exec_err!(
67-
"expected metadata array to be of same length as variant array"
68-
);
68+
return Err(arg_shape_err(
69+
udf_name,
70+
2,
71+
"array with same length as arg #1",
72+
"array with different length",
73+
));
6974
}
7075

7176
let metadata_array = try_parse_binary_columnar(metadata_array)?;
@@ -180,11 +185,11 @@ impl ScalarUDFImpl for CastToVariantUdf {
180185
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
181186
match args.args.as_slice() {
182187
[metadata_value, variant_value] => {
183-
Self::from_metadata_value(metadata_value, variant_value)
188+
Self::from_metadata_value(self.name(), metadata_value, variant_value)
184189
}
185190
[ColumnarValue::Scalar(scalar_value)] => Self::from_scalar_value(scalar_value),
186191
[ColumnarValue::Array(array)] => Self::from_array(array),
187-
_ => exec_err!("unrecognized argument"),
192+
_ => Err(args_count_err(self.name(), "1 or 2", args.args.len())),
188193
}
189194
}
190195
}

src/variant_list_delete.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use datafusion::{
1313
use parquet_variant::{Variant, VariantBuilder};
1414
use parquet_variant_compute::{VariantArray, VariantType};
1515

16-
use crate::shared::{ensure, try_parse_variant_scalar};
16+
use crate::shared::{arg_shape_err, args_count_err, ensure, try_parse_variant_scalar};
1717

1818
#[derive(Debug, Hash, PartialEq, Eq)]
1919
pub struct VariantListDelete {
@@ -109,7 +109,7 @@ impl ScalarUDFImpl for VariantListDelete {
109109
)?;
110110

111111
let [variant_list_to_update, index_to_delete] = argument_values.as_slice() else {
112-
return exec_err!("expected 2 arguments");
112+
return Err(args_count_err(self.name(), "2", argument_values.len()));
113113
};
114114

115115
ensure(
@@ -119,7 +119,12 @@ impl ScalarUDFImpl for VariantListDelete {
119119

120120
let index = {
121121
let ColumnarValue::Scalar(index) = index_to_delete else {
122-
return exec_err!("expected scalar value for index");
122+
return Err(arg_shape_err(
123+
self.name(),
124+
2,
125+
"scalar integer value",
126+
"array value",
127+
));
123128
};
124129

125130
try_parse_index_scalar(index)?

src/variant_normalize.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::sync::Arc;
22

33
use arrow::array::{ArrayRef, StructArray};
44
use arrow_schema::{DataType, Field, Fields};
5-
use datafusion::common::{exec_datafusion_err, exec_err};
65
use datafusion::error::Result;
76
use datafusion::logical_expr::{
87
ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
@@ -11,7 +10,9 @@ use datafusion::logical_expr::{
1110
use datafusion::scalar::ScalarValue;
1211
use parquet_variant_compute::{VariantArray, VariantArrayBuilder, VariantType};
1312

14-
use crate::shared::try_field_as_variant_array;
13+
use crate::shared::{
14+
arg_field_meta_missing_err, arg_type_err, args_count_err, try_field_as_variant_array,
15+
};
1516

1617
/// Normalizes a Variant value into a canonical binary form.
1718
///
@@ -92,18 +93,18 @@ impl ScalarUDFImpl for VariantNormalizeUdf {
9293
let variant_field = args
9394
.arg_fields
9495
.first()
95-
.ok_or_else(|| exec_datafusion_err!("expected 1 argument"))?;
96+
.ok_or_else(|| arg_field_meta_missing_err(self.name(), 1))?;
9697

9798
try_field_as_variant_array(variant_field.as_ref())?;
9899

99100
let [variant_arg] = args.args.as_slice() else {
100-
return exec_err!("expected 1 argument");
101+
return Err(args_count_err(self.name(), "1", args.args.len()));
101102
};
102103

103104
let out = match variant_arg {
104105
ColumnarValue::Scalar(scalar_variant) => {
105106
let ScalarValue::Struct(struct_array) = scalar_variant else {
106-
return exec_err!("expected variant struct");
107+
return arg_type_err(self.name(), 1, "Struct", &scalar_variant.data_type());
107108
};
108109

109110
let variant_array = VariantArray::try_new(struct_array.as_ref())?;

src/variant_object_delete.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ use datafusion::{
1313
use parquet_variant::{Variant, VariantBuilder};
1414
use parquet_variant_compute::{VariantArray, VariantType};
1515

16-
use crate::shared::{ensure, try_parse_string_scalar, try_parse_variant_scalar};
16+
use crate::shared::{
17+
arg_null_error, arg_shape_err, args_count_err, ensure, try_parse_string_scalar,
18+
try_parse_variant_scalar,
19+
};
1720

1821
#[derive(Debug, Hash, PartialEq, Eq)]
1922
pub struct VariantObjectDelete {
@@ -71,7 +74,7 @@ impl ScalarUDFImpl for VariantObjectDelete {
7174
)?;
7275

7376
let [variant_object_to_update, key_to_delete] = argument_values.as_slice() else {
74-
return exec_err!("expected 2 arguments");
77+
return Err(args_count_err(self.name(), "2", argument_values.len()));
7578
};
7679

7780
ensure(
@@ -81,11 +84,16 @@ impl ScalarUDFImpl for VariantObjectDelete {
8184

8285
let key = {
8386
let ColumnarValue::Scalar(key) = key_to_delete else {
84-
return exec_err!("expected scalar value for key");
87+
return Err(arg_shape_err(
88+
self.name(),
89+
2,
90+
"scalar string value",
91+
"array value",
92+
));
8593
};
8694

8795
try_parse_string_scalar(key)?
88-
.ok_or_else(|| DataFusionError::Execution("expected non null string".into()))?
96+
.ok_or_else(|| arg_null_error(self.name(), 2, "a non-null string literal"))?
8997
};
9098

9199
match variant_object_to_update {

src/variant_object_keys.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::sync::Arc;
22

33
use arrow::array::{ArrayRef, ListBuilder, StringBuilder};
44
use arrow_schema::{DataType, Field};
5-
use datafusion::common::{exec_datafusion_err, exec_err};
65
use datafusion::error::Result;
76
use datafusion::logical_expr::{
87
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
@@ -12,7 +11,10 @@ use parquet_variant::Variant;
1211
use parquet_variant::VariantPath;
1312
use parquet_variant_compute::{GetOptions, VariantArray, variant_get as compute_variant_get};
1413

15-
use crate::shared::{try_field_as_variant_array, try_parse_string_scalar};
14+
use crate::shared::{
15+
arg_field_meta_missing_err, arg_null_error, arg_shape_err, arg_type_err, args_count_err,
16+
try_field_as_variant_array, try_parse_string_scalar,
17+
};
1618

1719
#[derive(Debug, Hash, PartialEq, Eq)]
1820
pub struct VariantObjectKeys {
@@ -70,26 +72,31 @@ impl ScalarUDFImpl for VariantObjectKeys {
7072
[variant_arg] => (variant_arg, None),
7173
[variant_arg, path_arg] => {
7274
let ColumnarValue::Scalar(path_scalar) = path_arg else {
73-
return exec_err!("expected scalar value for path");
75+
return Err(arg_shape_err(
76+
self.name(),
77+
2,
78+
"scalar string value",
79+
"array value",
80+
));
7481
};
7582
let path = try_parse_string_scalar(path_scalar)?
76-
.ok_or_else(|| exec_datafusion_err!("expected non-null string for path"))?;
83+
.ok_or_else(|| arg_null_error(self.name(), 2, "a non-null string literal"))?;
7784
(variant_arg, Some(path))
7885
}
79-
_ => return exec_err!("expected 1 or 2 arguments"),
86+
_ => return Err(args_count_err(self.name(), "1 or 2", args.args.len())),
8087
};
8188

8289
let variant_field = args
8390
.arg_fields
8491
.first()
85-
.ok_or_else(|| exec_datafusion_err!("expected 1 argument field type"))?;
92+
.ok_or_else(|| arg_field_meta_missing_err(self.name(), 1))?;
8693

8794
try_field_as_variant_array(variant_field.as_ref())?;
8895

8996
let out = match variant_arg {
9097
ColumnarValue::Scalar(scalar_variant) => {
9198
let ScalarValue::Struct(struct_arr) = scalar_variant else {
92-
return exec_err!("expected variant scalar value");
99+
return arg_type_err(self.name(), 1, "Struct", &scalar_variant.data_type());
93100
};
94101
let arr: ArrayRef = Arc::clone(struct_arr) as ArrayRef;
95102

0 commit comments

Comments
 (0)