Skip to content

Commit 94db4f9

Browse files
committed
Cherry-pick spice patches onto DF51 upstream
Patches applied from spiceai-50 branch: - 336bb6e: executor poll_loop oneshot channel for readiness reporting - a84f7d0: client-remote-catalog merge (catalog RPC, stub table provider) - 12957e4: Scheduler UDF sync -> client planning with stubs - 9c17685: Cluster RPC customizations for TLS and API key auth Manual fixes for DF51 compatibility: - Fixed arrow imports to use datafusion::arrow - Added GrpcServerConfig/GrpcClientConfig imports - Added CatalogSerializeExt trait import
1 parent 707a596 commit 94db4f9

8 files changed

Lines changed: 84 additions & 241 deletions

File tree

ballista/core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub mod object_store;
3939
pub mod planner;
4040
pub mod registry;
4141
pub mod serde;
42+
pub mod remote_catalog;
4243
pub mod utils;
4344

4445
///

ballista/core/src/remote_catalog/remote_scalar_udf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
//
1818

1919
use crate::serde::protobuf::ScalarUdfInfo;
20-
use arrow::datatypes::DataType;
20+
use datafusion::arrow::datatypes::DataType;
2121
use datafusion::common::Result;
2222
use datafusion::common::{exec_err, plan_err, DataFusionError};
2323
use datafusion::logical_expr::{

ballista/core/src/serde/generated/ballista.rs

Lines changed: 52 additions & 227 deletions
Original file line numberDiff line numberDiff line change
@@ -831,6 +831,7 @@ pub mod execute_query_params {
831831
#[prost(bytes, tag = "1")]
832832
LogicalPlan(::prost::alloc::vec::Vec<u8>),
833833
/// I'd suggest to remove this, if SQL needed use `flight-sql`
834+
#[deprecated]
834835
#[prost(string, tag = "2")]
835836
Sql(::prost::alloc::string::String),
836837
}
@@ -1063,7 +1064,7 @@ pub struct RunningTaskInfo {
10631064
#[prost(uint32, tag = "4")]
10641065
pub partition_id: u32,
10651066
}
1066-
#[derive(Clone, PartialEq, ::prost::Message)]
1067+
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
10671068
pub struct GetCatalogParams {
10681069
#[prost(string, tag = "1")]
10691070
pub session_id: ::prost::alloc::string::String,
@@ -1105,7 +1106,7 @@ pub struct RemoteTableProviderNode {
11051106
#[prost(message, optional, tag = "4")]
11061107
pub schema: ::core::option::Option<::datafusion_proto_common::Schema>,
11071108
}
1108-
#[derive(Clone, PartialEq, ::prost::Message)]
1109+
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
11091110
pub struct GetRemoteFunctionsParams {
11101111
#[prost(string, tag = "1")]
11111112
pub session_id: ::prost::alloc::string::String,
@@ -1142,7 +1143,7 @@ pub struct ScalarUdfDocumentation {
11421143
#[prost(message, repeated, tag = "4")]
11431144
pub arguments: ::prost::alloc::vec::Vec<ScalarUdfDocumentationArgument>,
11441145
}
1145-
#[derive(Clone, PartialEq, ::prost::Message)]
1146+
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
11461147
pub struct ScalarUdfDocumentationArgument {
11471148
#[prost(string, tag = "1")]
11481149
pub argument: ::prost::alloc::string::String,
@@ -1551,7 +1552,7 @@ pub mod scheduler_grpc_client {
15511552
format!("Service was not ready: {}", e.into()),
15521553
)
15531554
})?;
1554-
let codec = tonic::codec::ProstCodec::default();
1555+
let codec = tonic_prost::ProstCodec::default();
15551556
let path = http::uri::PathAndQuery::from_static(
15561557
"/ballista.protobuf.SchedulerGrpc/GetCatalog",
15571558
);
@@ -1578,7 +1579,7 @@ pub mod scheduler_grpc_client {
15781579
format!("Service was not ready: {}", e.into()),
15791580
)
15801581
})?;
1581-
let codec = tonic::codec::ProstCodec::default();
1582+
let codec = tonic_prost::ProstCodec::default();
15821583
let path = http::uri::PathAndQuery::from_static(
15831584
"/ballista.protobuf.SchedulerGrpc/GetRemoteFunctions",
15841585
);
@@ -1594,227 +1595,6 @@ pub mod scheduler_grpc_client {
15941595
}
15951596
}
15961597
}
1597-
/// Generated client implementations.
1598-
pub mod executor_grpc_client {
1599-
#![allow(
1600-
unused_variables,
1601-
dead_code,
1602-
missing_docs,
1603-
clippy::wildcard_imports,
1604-
clippy::let_unit_value,
1605-
)]
1606-
use tonic::codegen::*;
1607-
use tonic::codegen::http::Uri;
1608-
#[derive(Debug, Clone)]
1609-
pub struct ExecutorGrpcClient<T> {
1610-
inner: tonic::client::Grpc<T>,
1611-
}
1612-
impl ExecutorGrpcClient<tonic::transport::Channel> {
1613-
/// Attempt to create a new client by connecting to a given endpoint.
1614-
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
1615-
where
1616-
D: TryInto<tonic::transport::Endpoint>,
1617-
D::Error: Into<StdError>,
1618-
{
1619-
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
1620-
Ok(Self::new(conn))
1621-
}
1622-
}
1623-
impl<T> ExecutorGrpcClient<T>
1624-
where
1625-
T: tonic::client::GrpcService<tonic::body::Body>,
1626-
T::Error: Into<StdError>,
1627-
T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
1628-
<T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
1629-
{
1630-
pub fn new(inner: T) -> Self {
1631-
let inner = tonic::client::Grpc::new(inner);
1632-
Self { inner }
1633-
}
1634-
pub fn with_origin(inner: T, origin: Uri) -> Self {
1635-
let inner = tonic::client::Grpc::with_origin(inner, origin);
1636-
Self { inner }
1637-
}
1638-
pub fn with_interceptor<F>(
1639-
inner: T,
1640-
interceptor: F,
1641-
) -> ExecutorGrpcClient<InterceptedService<T, F>>
1642-
where
1643-
F: tonic::service::Interceptor,
1644-
T::ResponseBody: Default,
1645-
T: tonic::codegen::Service<
1646-
http::Request<tonic::body::Body>,
1647-
Response = http::Response<
1648-
<T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
1649-
>,
1650-
>,
1651-
<T as tonic::codegen::Service<
1652-
http::Request<tonic::body::Body>,
1653-
>>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
1654-
{
1655-
ExecutorGrpcClient::new(InterceptedService::new(inner, interceptor))
1656-
}
1657-
/// Compress requests with the given encoding.
1658-
///
1659-
/// This requires the server to support it otherwise it might respond with an
1660-
/// error.
1661-
#[must_use]
1662-
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
1663-
self.inner = self.inner.send_compressed(encoding);
1664-
self
1665-
}
1666-
/// Enable decompressing responses.
1667-
#[must_use]
1668-
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
1669-
self.inner = self.inner.accept_compressed(encoding);
1670-
self
1671-
}
1672-
/// Limits the maximum size of a decoded message.
1673-
///
1674-
/// Default: `4MB`
1675-
#[must_use]
1676-
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
1677-
self.inner = self.inner.max_decoding_message_size(limit);
1678-
self
1679-
}
1680-
/// Limits the maximum size of an encoded message.
1681-
///
1682-
/// Default: `usize::MAX`
1683-
#[must_use]
1684-
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
1685-
self.inner = self.inner.max_encoding_message_size(limit);
1686-
self
1687-
}
1688-
pub async fn launch_task(
1689-
&mut self,
1690-
request: impl tonic::IntoRequest<super::LaunchTaskParams>,
1691-
) -> std::result::Result<
1692-
tonic::Response<super::LaunchTaskResult>,
1693-
tonic::Status,
1694-
> {
1695-
self.inner
1696-
.ready()
1697-
.await
1698-
.map_err(|e| {
1699-
tonic::Status::unknown(
1700-
format!("Service was not ready: {}", e.into()),
1701-
)
1702-
})?;
1703-
let codec = tonic::codec::ProstCodec::default();
1704-
let path = http::uri::PathAndQuery::from_static(
1705-
"/ballista.protobuf.ExecutorGrpc/LaunchTask",
1706-
);
1707-
let mut req = request.into_request();
1708-
req.extensions_mut()
1709-
.insert(GrpcMethod::new("ballista.protobuf.ExecutorGrpc", "LaunchTask"));
1710-
self.inner.unary(req, path, codec).await
1711-
}
1712-
pub async fn launch_multi_task(
1713-
&mut self,
1714-
request: impl tonic::IntoRequest<super::LaunchMultiTaskParams>,
1715-
) -> std::result::Result<
1716-
tonic::Response<super::LaunchMultiTaskResult>,
1717-
tonic::Status,
1718-
> {
1719-
self.inner
1720-
.ready()
1721-
.await
1722-
.map_err(|e| {
1723-
tonic::Status::unknown(
1724-
format!("Service was not ready: {}", e.into()),
1725-
)
1726-
})?;
1727-
let codec = tonic::codec::ProstCodec::default();
1728-
let path = http::uri::PathAndQuery::from_static(
1729-
"/ballista.protobuf.ExecutorGrpc/LaunchMultiTask",
1730-
);
1731-
let mut req = request.into_request();
1732-
req.extensions_mut()
1733-
.insert(
1734-
GrpcMethod::new("ballista.protobuf.ExecutorGrpc", "LaunchMultiTask"),
1735-
);
1736-
self.inner.unary(req, path, codec).await
1737-
}
1738-
pub async fn stop_executor(
1739-
&mut self,
1740-
request: impl tonic::IntoRequest<super::StopExecutorParams>,
1741-
) -> std::result::Result<
1742-
tonic::Response<super::StopExecutorResult>,
1743-
tonic::Status,
1744-
> {
1745-
self.inner
1746-
.ready()
1747-
.await
1748-
.map_err(|e| {
1749-
tonic::Status::unknown(
1750-
format!("Service was not ready: {}", e.into()),
1751-
)
1752-
})?;
1753-
let codec = tonic::codec::ProstCodec::default();
1754-
let path = http::uri::PathAndQuery::from_static(
1755-
"/ballista.protobuf.ExecutorGrpc/StopExecutor",
1756-
);
1757-
let mut req = request.into_request();
1758-
req.extensions_mut()
1759-
.insert(
1760-
GrpcMethod::new("ballista.protobuf.ExecutorGrpc", "StopExecutor"),
1761-
);
1762-
self.inner.unary(req, path, codec).await
1763-
}
1764-
pub async fn cancel_tasks(
1765-
&mut self,
1766-
request: impl tonic::IntoRequest<super::CancelTasksParams>,
1767-
) -> std::result::Result<
1768-
tonic::Response<super::CancelTasksResult>,
1769-
tonic::Status,
1770-
> {
1771-
self.inner
1772-
.ready()
1773-
.await
1774-
.map_err(|e| {
1775-
tonic::Status::unknown(
1776-
format!("Service was not ready: {}", e.into()),
1777-
)
1778-
})?;
1779-
let codec = tonic::codec::ProstCodec::default();
1780-
let path = http::uri::PathAndQuery::from_static(
1781-
"/ballista.protobuf.ExecutorGrpc/CancelTasks",
1782-
);
1783-
let mut req = request.into_request();
1784-
req.extensions_mut()
1785-
.insert(
1786-
GrpcMethod::new("ballista.protobuf.ExecutorGrpc", "CancelTasks"),
1787-
);
1788-
self.inner.unary(req, path, codec).await
1789-
}
1790-
pub async fn remove_job_data(
1791-
&mut self,
1792-
request: impl tonic::IntoRequest<super::RemoveJobDataParams>,
1793-
) -> std::result::Result<
1794-
tonic::Response<super::RemoveJobDataResult>,
1795-
tonic::Status,
1796-
> {
1797-
self.inner
1798-
.ready()
1799-
.await
1800-
.map_err(|e| {
1801-
tonic::Status::unknown(
1802-
format!("Service was not ready: {}", e.into()),
1803-
)
1804-
})?;
1805-
let codec = tonic::codec::ProstCodec::default();
1806-
let path = http::uri::PathAndQuery::from_static(
1807-
"/ballista.protobuf.ExecutorGrpc/RemoveJobData",
1808-
);
1809-
let mut req = request.into_request();
1810-
req.extensions_mut()
1811-
.insert(
1812-
GrpcMethod::new("ballista.protobuf.ExecutorGrpc", "RemoveJobData"),
1813-
);
1814-
self.inner.unary(req, path, codec).await
1815-
}
1816-
}
1817-
}
18181598
/// Generated server implementations.
18191599
pub mod scheduler_grpc_server {
18201600
#![allow(
@@ -2496,6 +2276,51 @@ pub mod scheduler_grpc_server {
24962276
};
24972277
Box::pin(fut)
24982278
}
2279+
"/ballista.protobuf.SchedulerGrpc/GetCatalog" => {
2280+
#[allow(non_camel_case_types)]
2281+
struct GetCatalogSvc<T: SchedulerGrpc>(pub Arc<T>);
2282+
impl<
2283+
T: SchedulerGrpc,
2284+
> tonic::server::UnaryService<super::GetCatalogParams>
2285+
for GetCatalogSvc<T> {
2286+
type Response = super::GetCatalogResult;
2287+
type Future = BoxFuture<
2288+
tonic::Response<Self::Response>,
2289+
tonic::Status,
2290+
>;
2291+
fn call(
2292+
&mut self,
2293+
request: tonic::Request<super::GetCatalogParams>,
2294+
) -> Self::Future {
2295+
let inner = Arc::clone(&self.0);
2296+
let fut = async move {
2297+
<T as SchedulerGrpc>::get_catalog(&inner, request).await
2298+
};
2299+
Box::pin(fut)
2300+
}
2301+
}
2302+
let accept_compression_encodings = self.accept_compression_encodings;
2303+
let send_compression_encodings = self.send_compression_encodings;
2304+
let max_decoding_message_size = self.max_decoding_message_size;
2305+
let max_encoding_message_size = self.max_encoding_message_size;
2306+
let inner = self.inner.clone();
2307+
let fut = async move {
2308+
let method = GetCatalogSvc(inner);
2309+
let codec = tonic_prost::ProstCodec::default();
2310+
let mut grpc = tonic::server::Grpc::new(codec)
2311+
.apply_compression_config(
2312+
accept_compression_encodings,
2313+
send_compression_encodings,
2314+
)
2315+
.apply_max_message_size_config(
2316+
max_decoding_message_size,
2317+
max_encoding_message_size,
2318+
);
2319+
let res = grpc.unary(method, req).await;
2320+
Ok(res)
2321+
};
2322+
Box::pin(fut)
2323+
}
24992324
"/ballista.protobuf.SchedulerGrpc/GetRemoteFunctions" => {
25002325
#[allow(non_camel_case_types)]
25012326
struct GetRemoteFunctionsSvc<T: SchedulerGrpc>(pub Arc<T>);
@@ -2527,7 +2352,7 @@ pub mod scheduler_grpc_server {
25272352
let inner = self.inner.clone();
25282353
let fut = async move {
25292354
let method = GetRemoteFunctionsSvc(inner);
2530-
let codec = tonic::codec::ProstCodec::default();
2355+
let codec = tonic_prost::ProstCodec::default();
25312356
let mut grpc = tonic::server::Grpc::new(codec)
25322357
.apply_compression_config(
25332358
accept_compression_encodings,

ballista/core/src/serde/mod.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,6 @@ use datafusion_proto::{
4141
physical_plan::{AsExecutionPlan, PhysicalExtensionCodec},
4242
};
4343

44-
use crate::execution_plans::{
45-
BallistaExplainExec, BallistaPlanType, BallistaStringifiedPlan, ShuffleReaderExec,
46-
ShuffleWriterExec, UnresolvedShuffleExec,
47-
};
48-
use crate::remote_catalog::remote_table_provider::RemoteTableProvider;
49-
use crate::serde::protobuf::ballista_physical_plan_node::PhysicalPlanType;
50-
use crate::serde::scheduler::PartitionLocation;
51-
use datafusion::catalog::TableProvider;
52-
pub use generated::ballista as protobuf;
5344
use prost::Message;
5445
use std::fmt::Debug;
5546
use std::marker::PhantomData;

0 commit comments

Comments
 (0)