Skip to content

Commit a459936

Browse files
committed
fix error handling, serialize fn arguments for docs, add note about return field
1 parent 34f022c commit a459936

8 files changed

Lines changed: 67 additions & 61 deletions

File tree

ballista/client/src/extension.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ impl SessionContextExt for SessionContext {
300300
continue;
301301
}
302302

303-
self.register_udf(ScalarUDF::new_from_impl(RemoteScalarUDF::new(udf)))
303+
self.register_udf(ScalarUDF::new_from_impl(RemoteScalarUDF::new(udf)?))
304304
}
305305

306306
Ok(())

ballista/core/proto/ballista.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,12 @@ message ScalarUDFDocumentation {
772772
string description = 1;
773773
string syntax_example = 2;
774774
optional string sql_example = 3;
775+
repeated ScalarUDFDocumentationArgument arguments = 4;
776+
}
777+
778+
message ScalarUDFDocumentationArgument {
779+
string argument = 1;
780+
string description = 2;
775781
}
776782

777783
service SchedulerGrpc {

ballista/core/src/remote_catalog/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@
1717
//
1818

1919
pub mod catalog_serialize_ext;
20-
pub mod function_serialize_ext;
20+
pub mod remote_function_serialize_ext;
2121
pub mod remote_scalar_udf;
2222
pub mod remote_table_provider;

ballista/core/src/remote_catalog/function_serialize_ext.rs renamed to ballista/core/src/remote_catalog/remote_function_serialize_ext.rs

Lines changed: 27 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,42 +16,21 @@
1616
// under the License.
1717

1818
use crate::serde::protobuf::{
19-
ScalarUdfDocumentation, ScalarUdfInfo, ScalarUdfTypeSignature,
19+
ScalarUdfDocumentation, ScalarUdfDocumentationArgument, ScalarUdfInfo,
20+
ScalarUdfTypeSignature,
2021
};
21-
use arrow::datatypes::DataType;
2222
use datafusion::execution::FunctionRegistry;
2323
use datafusion::functions::all_default_functions;
24-
use datafusion::logical_expr::{ScalarUDF, UserDefinedLogicalNode};
2524
use datafusion::prelude::SessionContext;
2625
use datafusion_proto_common::ArrowType;
2726
use std::collections::HashSet;
2827

2928
/// Used to serialize function shapes to ship to Ballista clients
30-
pub trait FunctionSerializeExt {
29+
pub trait RemoteFunctionSerializeExt {
3130
fn serialize_udfs(&self) -> Vec<ScalarUdfInfo>;
3231
}
3332

34-
fn try_derive_return_type(udf: &ScalarUDF, types: &[DataType]) -> Option<DataType> {
35-
if let Ok(return_type) = udf.return_type(&types) {
36-
return Some(return_type);
37-
}
38-
39-
// TODO: try to serialize these with dummy values
40-
// let fields = types.iter().enumerate().map(|(i, t)| Arc::new(Field::new(
41-
// format!("Field{}", i),
42-
// t.clone(),
43-
// true
44-
// ))).collect::<Vec<_>>();
45-
//
46-
// let return_type = f.return_field_from_args(ReturnFieldArgs {
47-
// arg_fields: &fields,
48-
// scalar_arguments: &vec![None; fields.len()],
49-
// }).expect("Must have return type");
50-
51-
None
52-
}
53-
54-
impl FunctionSerializeExt for SessionContext {
33+
impl RemoteFunctionSerializeExt for SessionContext {
5534
fn serialize_udfs(&self) -> Vec<ScalarUdfInfo> {
5635
let mut udfs = vec![];
5736

@@ -78,26 +57,34 @@ impl FunctionSerializeExt for SessionContext {
7857
.collect::<Result<Vec<ArrowType>, _>>()
7958
.expect("Must serialize data types");
8059

81-
if let Some(ref return_type) = try_derive_return_type(f.as_ref(), &t)
82-
{
83-
Some(ScalarUdfTypeSignature {
60+
// TODO: some functions use `ScalarUDF::return_field_from_args`, which this does not support
61+
f.return_type(&t)
62+
.ok()
63+
.and_then(|ref return_type| return_type.try_into().ok())
64+
.map(|arrow_return_type| ScalarUdfTypeSignature {
8465
arity,
85-
return_type: Some(
86-
return_type
87-
.try_into()
88-
.expect("Must serialize return type"),
89-
),
66+
return_type: Some(arrow_return_type),
9067
})
91-
} else {
92-
None
93-
}
9468
})
9569
.collect::<Vec<_>>();
9670

97-
let docs = f.documentation().map(|d| ScalarUdfDocumentation {
98-
description: d.description.clone(),
99-
syntax_example: d.syntax_example.clone(),
100-
sql_example: d.sql_example.clone(),
71+
let docs = f.documentation().map(|d| {
72+
let arguments = d
73+
.arguments
74+
.iter()
75+
.flatten()
76+
.map(|(arg, desc)| ScalarUdfDocumentationArgument {
77+
argument: arg.clone(),
78+
description: desc.clone(),
79+
})
80+
.collect::<Vec<_>>();
81+
82+
ScalarUdfDocumentation {
83+
description: d.description.clone(),
84+
syntax_example: d.syntax_example.clone(),
85+
sql_example: d.sql_example.clone(),
86+
arguments,
87+
}
10188
});
10289

10390
udfs.push(ScalarUdfInfo {

ballista/core/src/remote_catalog/remote_scalar_udf.rs

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
use crate::serde::protobuf::ScalarUdfInfo;
2020
use arrow::datatypes::DataType;
21+
use datafusion::common::Result;
2122
use datafusion::common::{exec_err, plan_err, DataFusionError};
2223
use datafusion::logical_expr::{
2324
ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
@@ -55,36 +56,40 @@ impl PartialEq for RemoteScalarUDF {
5556
impl Eq for RemoteScalarUDF {}
5657

5758
impl RemoteScalarUDF {
58-
pub fn new(meta: ScalarUdfInfo) -> Self {
59-
let arities = meta
60-
.signatures
61-
.iter()
62-
.map(|signature| {
63-
signature
64-
.arity
65-
.iter()
66-
.map(|t| t.try_into())
67-
.collect::<Result<Vec<DataType>, _>>()
68-
.expect("Must deserialize arities")
69-
})
70-
.collect();
59+
pub fn new(meta: ScalarUdfInfo) -> Result<Self> {
60+
let mut arities = vec![];
61+
62+
for signature in &meta.signatures {
63+
let signature_types = signature
64+
.arity
65+
.iter()
66+
.map(|t| t.try_into())
67+
.collect::<Result<Vec<DataType>, _>>()?;
68+
69+
arities.push(signature_types);
70+
}
7171

7272
let documentation = meta.documentation.clone().map(|d| Documentation {
7373
doc_section: Default::default(),
7474
description: d.description.clone(),
7575
syntax_example: d.syntax_example.clone(),
7676
sql_example: d.sql_example.clone(),
77-
arguments: None,
77+
arguments: Some(
78+
d.arguments
79+
.iter()
80+
.map(|a| (a.argument.clone(), a.description.clone()))
81+
.collect(),
82+
),
7883
alternative_syntax: None,
7984
related_udfs: None,
8085
});
8186

82-
Self {
87+
Ok(Self {
8388
arities,
8489
documentation,
8590
meta,
8691
signature: Signature::new(TypeSignature::VariadicAny, Volatility::Volatile),
87-
}
92+
})
8893
}
8994
}
9095

@@ -126,7 +131,7 @@ impl ScalarUDFImpl for RemoteScalarUDF {
126131

127132
fn invoke_with_args(
128133
&self,
129-
args: ScalarFunctionArgs,
134+
_args: ScalarFunctionArgs,
130135
) -> datafusion::common::Result<ColumnarValue> {
131136
exec_err!("This is a stub function and should never be called on the client")
132137
}

ballista/core/src/serde/generated/ballista.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,6 +1151,15 @@ pub struct ScalarUdfDocumentation {
11511151
pub syntax_example: ::prost::alloc::string::String,
11521152
#[prost(string, optional, tag = "3")]
11531153
pub sql_example: ::core::option::Option<::prost::alloc::string::String>,
1154+
#[prost(message, repeated, tag = "4")]
1155+
pub arguments: ::prost::alloc::vec::Vec<ScalarUdfDocumentationArgument>,
1156+
}
1157+
#[derive(Clone, PartialEq, ::prost::Message)]
1158+
pub struct ScalarUdfDocumentationArgument {
1159+
#[prost(string, tag = "1")]
1160+
pub argument: ::prost::alloc::string::String,
1161+
#[prost(string, tag = "2")]
1162+
pub description: ::prost::alloc::string::String,
11541163
}
11551164
/// Generated client implementations.
11561165
pub mod scheduler_grpc_client {

ballista/core/src/serde/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ use crate::remote_catalog::remote_table_provider::RemoteTableProvider;
5050
use crate::serde::protobuf::ballista_physical_plan_node::PhysicalPlanType;
5151
use crate::serde::scheduler::PartitionLocation;
5252
use datafusion::catalog::TableProvider;
53-
use datafusion::logical_expr::UserDefinedLogicalNode;
5453
pub use generated::ballista as protobuf;
5554
use prost::Message;
5655
use std::fmt::Debug;

ballista/scheduler/src/scheduler_server/grpc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::cluster::{bind_task_bias, bind_task_round_robin};
4444
use crate::config::TaskDistributionPolicy;
4545
use crate::scheduler_server::event::QueryStageSchedulerEvent;
4646
use crate::scheduler_server::SchedulerServer;
47-
use ballista_core::remote_catalog::function_serialize_ext::FunctionSerializeExt;
47+
use ballista_core::remote_catalog::remote_function_serialize_ext::RemoteFunctionSerializeExt;
4848
use std::time::{SystemTime, UNIX_EPOCH};
4949
use tonic::{Request, Response, Status};
5050

0 commit comments

Comments
 (0)