Skip to content

Commit d20914b

Browse files
committed
[3/3] allow customization of executor grpc endpoint
1 parent 1a07876 commit d20914b

3 files changed

Lines changed: 57 additions & 15 deletions

File tree

ballista/executor/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ impl TryFrom<Config> for ExecutorProcessConfig {
147147
override_logical_codec: None,
148148
override_physical_codec: None,
149149
override_arrow_flight_service: None,
150+
override_create_grpc_client_endpoint: None,
150151
})
151152
}
152153
}

ballista/executor/src/executor_process.rs

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ use ballista_core::utils::{
5656
get_time_before,
5757
};
5858
use ballista_core::{ConfigProducer, RuntimeProducer, BALLISTA_VERSION};
59+
use tonic::transport::{Endpoint, Error as TonicTransportError};
5960

6061
use crate::execution_engine::ExecutionEngine;
6162
use crate::executor::{Executor, TasksDrainedFuture};
@@ -104,6 +105,9 @@ pub struct ExecutorProcessConfig {
104105
pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
105106
/// [ArrowFlightServerProvider] implementation override option
106107
pub override_arrow_flight_service: Option<Arc<ArrowFlightServerProvider>>,
108+
/// Override function for customizing gRPC client endpoints before they are used
109+
pub override_create_grpc_client_endpoint:
110+
Option<Arc<dyn Fn(Endpoint) -> Result<Endpoint, TonicTransportError> + Send + Sync>>,
107111
}
108112

109113
impl ExecutorProcessConfig {
@@ -147,6 +151,7 @@ impl Default for ExecutorProcessConfig {
147151
override_logical_codec: None,
148152
override_physical_codec: None,
149153
override_arrow_flight_service: None,
154+
override_create_grpc_client_endpoint: None,
150155
}
151156
}
152157
}
@@ -246,12 +251,22 @@ pub async fn start_executor_process(
246251

247252
let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
248253
let connection = if connect_timeout == 0 {
249-
create_grpc_client_endpoint(scheduler_url)
254+
let mut endpoint = create_grpc_client_endpoint(scheduler_url)
250255
.map_err(|_| {
251256
BallistaError::GrpcConnectionError(
252257
"Could not create endpoint to scheduler".to_string(),
253258
)
254-
})?
259+
})?;
260+
261+
if let Some(ref override_fn) = opt.override_create_grpc_client_endpoint {
262+
endpoint = override_fn(endpoint).map_err(|_| {
263+
BallistaError::GrpcConnectionError(
264+
"Failed to apply endpoint override".to_string(),
265+
)
266+
})?;
267+
}
268+
269+
endpoint
255270
.connect()
256271
.await
257272
.map_err(|_| {
@@ -269,16 +284,31 @@ pub async fn start_executor_process(
269284
&& Instant::now().elapsed().as_secs() - start_time < connect_timeout
270285
{
271286
match create_grpc_client_endpoint(scheduler_url.clone()) {
272-
Ok(endpoint) => match endpoint.connect().await {
273-
Ok(connection) => {
274-
info!("Connected to scheduler at {scheduler_url}");
275-
x = Some(connection);
287+
Ok(mut endpoint) => {
288+
if let Some(ref override_fn) = opt.override_create_grpc_client_endpoint {
289+
match override_fn(endpoint) {
290+
Ok(overridden_endpoint) => endpoint = overridden_endpoint,
291+
Err(e) => {
292+
warn!(
293+
"Failed to apply endpoint override to scheduler at {scheduler_url} ({e}); retrying ..."
294+
);
295+
tokio::time::sleep(time::Duration::from_millis(500)).await;
296+
continue;
297+
}
298+
}
276299
}
277-
Err(e) => {
278-
warn!(
279-
"Failed to connect to scheduler at {scheduler_url} ({e}); retrying ..."
280-
);
281-
tokio::time::sleep(time::Duration::from_millis(500)).await;
300+
301+
match endpoint.connect().await {
302+
Ok(connection) => {
303+
info!("Connected to scheduler at {scheduler_url}");
304+
x = Some(connection);
305+
}
306+
Err(e) => {
307+
warn!(
308+
"Failed to connect to scheduler at {scheduler_url} ({e}); retrying ..."
309+
);
310+
tokio::time::sleep(time::Duration::from_millis(500)).await;
311+
}
282312
}
283313
},
284314
Err(e) => {

ballista/executor/src/executor_server.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
2424
use tokio::sync::mpsc;
2525

2626
use log::{debug, error, info, warn};
27-
use tonic::transport::Channel;
27+
use tonic::transport::{Channel, Endpoint, Error as TonicTransportError};
2828
use tonic::{Request, Response, Status};
2929

3030
use ballista_core::error::BallistaError;
@@ -97,6 +97,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
9797
codec,
9898
config.grpc_max_encoding_message_size as usize,
9999
config.grpc_max_decoding_message_size as usize,
100+
config.override_create_grpc_client_endpoint.clone(),
100101
);
101102

102103
// 1. Start executor grpc service
@@ -183,6 +184,8 @@ pub struct ExecutorServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPl
183184
schedulers: SchedulerClients,
184185
grpc_max_encoding_message_size: usize,
185186
grpc_max_decoding_message_size: usize,
187+
override_create_grpc_client_endpoint:
188+
Option<Arc<dyn Fn(Endpoint) -> Result<Endpoint, TonicTransportError> + Send + Sync>>,
186189
}
187190

188191
#[derive(Clone)]
@@ -209,6 +212,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
209212
codec: BallistaCodec<T, U>,
210213
grpc_max_encoding_message_size: usize,
211214
grpc_max_decoding_message_size: usize,
215+
override_create_grpc_client_endpoint: Option<
216+
Arc<dyn Fn(Endpoint) -> Result<Endpoint, TonicTransportError> + Send + Sync>,
217+
>,
212218
) -> Self {
213219
Self {
214220
_start_time: SystemTime::now()
@@ -222,6 +228,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
222228
schedulers: Default::default(),
223229
grpc_max_encoding_message_size,
224230
grpc_max_decoding_message_size,
231+
override_create_grpc_client_endpoint,
225232
}
226233
}
227234

@@ -235,9 +242,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
235242
Ok(scheduler)
236243
} else {
237244
let scheduler_url = format!("http://{scheduler_id}");
238-
let connection = create_grpc_client_endpoint(scheduler_url)?
239-
.connect()
240-
.await?;
245+
let mut endpoint = create_grpc_client_endpoint(scheduler_url)?;
246+
247+
if let Some(ref override_fn) = self.override_create_grpc_client_endpoint {
248+
endpoint = override_fn(endpoint)?;
249+
}
250+
251+
let connection = endpoint.connect().await?;
241252
let scheduler = SchedulerGrpcClient::new(connection)
242253
.max_encoding_message_size(self.grpc_max_encoding_message_size)
243254
.max_decoding_message_size(self.grpc_max_decoding_message_size);

0 commit comments

Comments
 (0)