Skip to content

Commit 82ebc10

Browse files
Make TLS a cluster configuration option, not configurable per executor
1 parent 123697f commit 82ebc10

16 files changed

Lines changed: 42 additions & 39 deletions

File tree

ballista/core/proto/ballista.proto

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,7 @@ message ExecutorMetadata {
305305
string host = 2;
306306
uint32 port = 3;
307307
uint32 grpc_port = 4;
308-
bool use_tls = 5;
309-
ExecutorSpecification specification = 6;
308+
ExecutorSpecification specification = 5;
310309
}
311310

312311

@@ -317,8 +316,7 @@ message ExecutorRegistration {
317316
optional string host = 2;
318317
uint32 port = 3;
319318
uint32 grpc_port = 4;
320-
bool use_tls = 5;
321-
ExecutorSpecification specification = 6;
319+
ExecutorSpecification specification = 5;
322320
}
323321

324322
message ExecutorHeartbeat {

ballista/core/src/execution_plans/distributed_query.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,8 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
242242
.session_config()
243243
.ballista_override_create_grpc_client_endpoint();
244244

245+
let use_tls = context.session_config().ballista_use_tls();
246+
245247
let stream = futures::stream::once(
246248
execute_query(
247249
self.scheduler_url.clone(),
@@ -250,6 +252,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
250252
self.config.clone(),
251253
interceptor,
252254
customize_endpoint,
255+
use_tls,
253256
)
254257
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
255258
)
@@ -288,6 +291,7 @@ async fn execute_query(
288291
config: BallistaConfig,
289292
grpc_interceptor: Arc<BallistaGrpcMetadataInterceptor>,
290293
customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
294+
use_tls: bool,
291295
) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
292296
info!("Connecting to Ballista scheduler at {scheduler_url}");
293297
// TODO reuse the scheduler to avoid connecting to the Ballista scheduler again and again
@@ -391,6 +395,7 @@ async fn execute_query(
391395
max_message_size,
392396
true,
393397
customize_endpoint.clone(),
398+
use_tls,
394399
)
395400
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
396401

@@ -408,6 +413,7 @@ async fn fetch_partition(
408413
max_message_size: usize,
409414
flight_transport: bool,
410415
customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
416+
use_tls: bool,
411417
) -> Result<SendableRecordBatchStream> {
412418
let metadata = location.executor_meta.ok_or_else(|| {
413419
DataFusionError::Internal("Received empty executor metadata".to_owned())
@@ -421,7 +427,7 @@ async fn fetch_partition(
421427
host,
422428
port,
423429
max_message_size,
424-
metadata.use_tls,
430+
use_tls,
425431
customize_endpoint,
426432
)
427433
.await

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ impl ExecutionPlan for ShuffleReaderExec {
163163
let force_remote_read = config.ballista_shuffle_reader_force_remote_read();
164164
let prefer_flight = config.ballista_shuffle_reader_remote_prefer_flight();
165165
let customize_endpoint = config.ballista_override_create_grpc_client_endpoint();
166+
let use_tls = config.ballista_use_tls();
166167

167168
if force_remote_read {
168169
debug!(
@@ -197,6 +198,7 @@ impl ExecutionPlan for ShuffleReaderExec {
197198
force_remote_read,
198199
prefer_flight,
199200
customize_endpoint,
201+
use_tls,
200202
);
201203

202204
let result = RecordBatchStreamAdapter::new(
@@ -392,6 +394,7 @@ fn send_fetch_partitions(
392394
force_remote_read: bool,
393395
flight_transport: bool,
394396
customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
397+
use_tls: bool,
395398
) -> AbortableReceiverStream {
396399
let (response_sender, response_receiver) = mpsc::channel(max_request_num);
397400
let semaphore = Arc::new(Semaphore::new(max_request_num));
@@ -417,6 +420,7 @@ fn send_fetch_partitions(
417420
max_message_size,
418421
flight_transport,
419422
customize_endpoint_c.clone(),
423+
use_tls,
420424
)
421425
.await;
422426
if let Err(e) = response_sender_c.send(r).await {
@@ -438,6 +442,7 @@ fn send_fetch_partitions(
438442
max_message_size,
439443
flight_transport,
440444
customize_endpoint_c,
445+
use_tls,
441446
)
442447
.await;
443448
// Block if the channel buffer is full.
@@ -466,6 +471,7 @@ trait PartitionReader: Send + Sync + Clone {
466471
max_message_size: usize,
467472
flight_transport: bool,
468473
customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
474+
use_tls: bool,
469475
) -> result::Result<SendableRecordBatchStream, BallistaError>;
470476
}
471477

@@ -486,6 +492,7 @@ impl PartitionReader for PartitionReaderEnum {
486492
max_message_size: usize,
487493
flight_transport: bool,
488494
customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
495+
use_tls: bool,
489496
) -> result::Result<SendableRecordBatchStream, BallistaError> {
490497
match self {
491498
PartitionReaderEnum::FlightRemote => {
@@ -494,6 +501,7 @@ impl PartitionReader for PartitionReaderEnum {
494501
max_message_size,
495502
flight_transport,
496503
customize_endpoint,
504+
use_tls,
497505
)
498506
.await
499507
}
@@ -510,6 +518,7 @@ async fn fetch_partition_remote(
510518
max_message_size: usize,
511519
flight_transport: bool,
512520
customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
521+
use_tls: bool,
513522
) -> result::Result<SendableRecordBatchStream, BallistaError> {
514523
let metadata = &location.executor_meta;
515524
let partition_id = &location.partition_id;
@@ -521,7 +530,7 @@ async fn fetch_partition_remote(
521530
host,
522531
port,
523532
max_message_size,
524-
metadata.use_tls,
533+
use_tls,
525534
customize_endpoint,
526535
)
527536
.await
@@ -696,7 +705,6 @@ mod tests {
696705
port: 7070,
697706
grpc_port: 8080,
698707
specification: ExecutorSpecification { task_slots: 1 },
699-
use_tls: false,
700708
},
701709
partition_stats: PartitionStats {
702710
num_rows: Some(1),
@@ -746,7 +754,6 @@ mod tests {
746754
port: 7070,
747755
grpc_port: 8080,
748756
specification: ExecutorSpecification { task_slots: 1 },
749-
use_tls: false,
750757
},
751758
partition_stats: PartitionStats {
752759
num_rows: Some(1),
@@ -797,7 +804,6 @@ mod tests {
797804
port: 7070,
798805
grpc_port: 8080,
799806
specification: ExecutorSpecification { task_slots: 1 },
800-
use_tls: false,
801807
},
802808
partition_stats: PartitionStats {
803809
num_rows: Some(1),
@@ -848,7 +854,6 @@ mod tests {
848854
port: 7070,
849855
grpc_port: 8080,
850856
specification: ExecutorSpecification { task_slots: 1 },
851-
use_tls: false,
852857
},
853858
partition_stats: Default::default(),
854859
path: "test_path".to_string(),
@@ -985,6 +990,7 @@ mod tests {
985990
false,
986991
true,
987992
None,
993+
false,
988994
);
989995

990996
let stream = RecordBatchStreamAdapter::new(
@@ -1011,7 +1017,6 @@ mod tests {
10111017
port: 50051,
10121018
grpc_port: 50052,
10131019
specification: ExecutorSpecification { task_slots: 12 },
1014-
use_tls: false,
10151020
},
10161021
partition_stats: Default::default(),
10171022
path: path.clone(),

ballista/core/src/extension.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,12 @@ pub trait SessionConfigExt {
166166
fn ballista_override_create_grpc_client_endpoint(
167167
&self,
168168
) -> Option<Arc<BallistaConfigGrpcEndpoint>>;
169+
170+
/// Set whether to use TLS for executor connections (cluster-wide setting)
171+
fn with_ballista_use_tls(self, use_tls: bool) -> Self;
172+
173+
/// Get whether to use TLS for executor connections
174+
fn ballista_use_tls(&self) -> bool;
169175
}
170176

171177
/// [SessionConfigHelperExt] is set of [SessionConfig] extension methods
@@ -440,6 +446,16 @@ impl SessionConfigExt for SessionConfig {
440446
) -> Option<Arc<BallistaConfigGrpcEndpoint>> {
441447
self.get_extension::<BallistaConfigGrpcEndpoint>()
442448
}
449+
450+
fn with_ballista_use_tls(self, use_tls: bool) -> Self {
451+
self.with_extension(Arc::new(BallistaUseTls(use_tls)))
452+
}
453+
454+
fn ballista_use_tls(&self) -> bool {
455+
self.get_extension::<BallistaUseTls>()
456+
.map(|ext| ext.0)
457+
.unwrap_or(false)
458+
}
443459
}
444460

445461
impl SessionConfigHelperExt for SessionConfig {
@@ -643,6 +659,10 @@ impl BallistaConfigGrpcEndpoint {
643659
}
644660
}
645661

662+
/// Wrapper for cluster-wide TLS configuration
663+
#[derive(Clone, Copy)]
664+
pub struct BallistaUseTls(pub bool);
665+
646666
#[cfg(test)]
647667
mod test {
648668
use datafusion::{

ballista/core/src/serde/generated/ballista.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -470,9 +470,7 @@ pub struct ExecutorMetadata {
470470
pub port: u32,
471471
#[prost(uint32, tag = "4")]
472472
pub grpc_port: u32,
473-
#[prost(bool, tag = "5")]
474-
pub use_tls: bool,
475-
#[prost(message, optional, tag = "6")]
473+
#[prost(message, optional, tag = "5")]
476474
pub specification: ::core::option::Option<ExecutorSpecification>,
477475
}
478476
/// Used for scheduler-executor
@@ -487,9 +485,7 @@ pub struct ExecutorRegistration {
487485
pub port: u32,
488486
#[prost(uint32, tag = "4")]
489487
pub grpc_port: u32,
490-
#[prost(bool, tag = "5")]
491-
pub use_tls: bool,
492-
#[prost(message, optional, tag = "6")]
488+
#[prost(message, optional, tag = "5")]
493489
pub specification: ::core::option::Option<ExecutorSpecification>,
494490
}
495491
#[derive(Clone, PartialEq, ::prost::Message)]

ballista/core/src/serde/scheduler/from_proto.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,6 @@ impl Into<ExecutorMetadata> for protobuf::ExecutorMetadata {
236236
port: self.port as u16,
237237
grpc_port: self.grpc_port as u16,
238238
specification: self.specification.unwrap().into(),
239-
use_tls: self.use_tls,
240239
}
241240
}
242241
}

ballista/core/src/serde/scheduler/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ pub struct ExecutorMetadata {
8080
pub port: u16,
8181
pub grpc_port: u16,
8282
pub specification: ExecutorSpecification,
83-
pub use_tls: bool,
8483
}
8584

8685
/// Specification of an executor, indicting executor resources, like total task slots

ballista/core/src/serde/scheduler/to_proto.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,6 @@ impl Into<protobuf::ExecutorMetadata> for ExecutorMetadata {
207207
port: self.port as u32,
208208
grpc_port: self.grpc_port as u32,
209209
specification: Some(self.specification.into()),
210-
use_tls: self.use_tls,
211210
}
212211
}
213212
}

ballista/executor/src/executor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,6 @@ mod test {
357357
grpc_port: 0,
358358
specification: None,
359359
host: None,
360-
use_tls: false,
361360
};
362361
let config_producer = Arc::new(default_config_producer);
363362
let ctx = SessionContext::new();

ballista/executor/src/executor_process.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,6 @@ pub async fn start_executor_process(
204204
resource: Some(Resource::TaskSlots(concurrent_tasks as u32)),
205205
}],
206206
}),
207-
// TODO: tls is only supported when manually starting a scheduler and its flight service
208-
use_tls: false,
209207
};
210208

211209
// put them to session config
@@ -471,8 +469,6 @@ pub async fn start_executor_process(
471469
resource: Some(Resource::TaskSlots(concurrent_tasks as u32)),
472470
}],
473471
}),
474-
// TODO: tls is only supported when manually starting an executor poll loop and its flight service
475-
use_tls: false,
476472
}),
477473
})
478474
.await

0 commit comments

Comments
 (0)