Skip to content

Commit 1a07876

Browse files
committed
[2/3] allow customization of scheduler grpc endpoint
1 parent 2c0505c commit 1a07876

2 files changed

Lines changed: 21 additions & 1 deletion

File tree

ballista/scheduler/src/config.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use datafusion_proto::logical_plan::LogicalExtensionCodec;
3232
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
3333
use std::fmt::Display;
3434
use std::sync::Arc;
35+
use tonic::transport::{Endpoint, Error as TonicTransportError};
3536

3637
/// Configuration of the application
3738
#[cfg(feature = "build-binary")]
@@ -215,6 +216,9 @@ pub struct SchedulerConfig {
215216
pub override_logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
216217
/// [PhysicalExtensionCodec] override option
217218
pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
219+
/// Override function for customizing gRPC client endpoints before they are used
220+
pub override_create_grpc_client_endpoint:
221+
Option<Arc<dyn Fn(Endpoint) -> Result<Endpoint, TonicTransportError> + Send + Sync>>,
218222
}
219223

220224
impl Default for SchedulerConfig {
@@ -241,6 +245,7 @@ impl Default for SchedulerConfig {
241245
override_session_builder: None,
242246
override_logical_codec: None,
243247
override_physical_codec: None,
248+
override_create_grpc_client_endpoint: None,
244249
}
245250
}
246251
}
@@ -343,6 +348,14 @@ impl SchedulerConfig {
343348
self.override_session_builder = Some(override_session_builder);
344349
self
345350
}
351+
352+
pub fn with_override_create_grpc_client_endpoint(
353+
mut self,
354+
override_fn: Arc<dyn Fn(Endpoint) -> Result<Endpoint, TonicTransportError> + Send + Sync>,
355+
) -> Self {
356+
self.override_create_grpc_client_endpoint = Some(override_fn);
357+
self
358+
}
346359
}
347360

348361
/// Policy of distributing tasks to available executor slots
@@ -450,6 +463,7 @@ impl TryFrom<Config> for SchedulerConfig {
450463
override_logical_codec: None,
451464
override_physical_codec: None,
452465
override_session_builder: None,
466+
override_create_grpc_client_endpoint: None,
453467
};
454468

455469
Ok(config)

ballista/scheduler/src/state/executor_manager.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,13 @@ impl ExecutorManager {
427427
"http://{}:{}",
428428
executor_metadata.host, executor_metadata.grpc_port
429429
);
430-
let connection = create_grpc_client_endpoint(executor_url)?.connect().await?;
430+
let mut endpoint = create_grpc_client_endpoint(executor_url)?;
431+
432+
if let Some(ref override_fn) = self.config.override_create_grpc_client_endpoint {
433+
endpoint = override_fn(endpoint)?;
434+
}
435+
436+
let connection = endpoint.connect().await?;
431437
let client = ExecutorGrpcClient::new(connection);
432438

433439
{

0 commit comments

Comments
 (0)