Skip to content

Commit 0a4174a

Browse files
Add metrics collector to SchedulerState and instrument executor and planning metrics
- Add metrics_collector field to SchedulerState struct - Instrument record_planning_duration in submit_job - Instrument record_executor_registered/deregistered and set_active_executor_count - Update all SchedulerState constructors and call sites
1 parent d7b0bcf commit 0a4174a

4 files changed

Lines changed: 51 additions & 6 deletions

File tree

ballista/scheduler/src/scheduler_server/grpc.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ use std::ops::Deref;
4848

4949
use crate::cluster::{bind_task_bias, bind_task_round_robin};
5050
use crate::config::TaskDistributionPolicy;
51-
use crate::scheduler_server::event::QueryStageSchedulerEvent;
5251
use crate::scheduler_server::SchedulerServer;
53-
use ballista_core::remote_catalog::remote_function_serialize_ext::RemoteFunctionSerializeExt;
52+
use crate::scheduler_server::event::QueryStageSchedulerEvent;
5453
use ballista_core::remote_catalog::catalog_serialize_ext::CatalogSerializeExt;
54+
use ballista_core::remote_catalog::remote_function_serialize_ext::RemoteFunctionSerializeExt;
5555
use std::time::{SystemTime, UNIX_EPOCH};
5656
use tonic::{Request, Response, Status};
5757

@@ -490,6 +490,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
490490
);
491491

492492
let executor_manager = self.state.executor_manager.clone();
493+
let metrics_collector = self.state.metrics_collector.clone();
493494
let event_sender = self.query_stage_event_loop.get_sender().map_err(|e| {
494495
let msg = format!("Get query stage event loop error due to {e:?}");
495496
error!("{msg}");
@@ -499,6 +500,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
499500
Self::remove_executor(
500501
executor_manager,
501502
event_sender,
503+
metrics_collector,
502504
&executor_id,
503505
Some(reason),
504506
self.config.executor_termination_grace_period,
@@ -665,6 +667,7 @@ mod test {
665667
SchedulerState::new_with_default_scheduler_name(
666668
cluster.clone(),
667669
BallistaCodec::default(),
670+
default_metrics_collector().unwrap(),
668671
);
669672
state.init().await?;
670673

@@ -697,6 +700,7 @@ mod test {
697700
SchedulerState::new_with_default_scheduler_name(
698701
cluster.clone(),
699702
BallistaCodec::default(),
703+
default_metrics_collector().unwrap(),
700704
);
701705
state.init().await?;
702706

ballista/scheduler/src/scheduler_server/mod.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
9999
codec,
100100
scheduler_name.clone(),
101101
config.clone(),
102+
metrics_collector.clone(),
102103
));
103104
let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
104105
state.clone(),
@@ -137,6 +138,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
137138
codec,
138139
scheduler_name.clone(),
139140
config.clone(),
141+
metrics_collector.clone(),
140142
task_launcher,
141143
));
142144
let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
@@ -312,6 +314,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
312314
Self::remove_executor(
313315
state.executor_manager.clone(),
314316
sender_clone,
317+
state.metrics_collector.clone(),
315318
&executor_id,
316319
Some(stop_reason.clone()),
317320
0,
@@ -338,6 +341,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
338341
pub(crate) fn remove_executor(
339342
executor_manager: ExecutorManager,
340343
event_sender: EventSender<QueryStageSchedulerEvent>,
344+
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
341345
executor_id: &str,
342346
reason: Option<String>,
343347
wait_secs: u64,
@@ -356,6 +360,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
356360
error!("error removing executor {executor_id}: {e:?}");
357361
}
358362

363+
// Record executor deregistration metric
364+
metrics_collector.record_executor_deregistered(&executor_id);
365+
366+
// Update active executor count
367+
let count = executor_manager.get_alive_executors().len();
368+
metrics_collector.set_active_executor_count(count);
369+
359370
if let Err(e) = event_sender
360371
.post_event(QueryStageSchedulerEvent::ExecutorLost(executor_id, reason))
361372
.await
@@ -366,8 +377,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
366377
}
367378

368379
async fn do_register_executor(&self, metadata: ExecutorMetadata) -> Result<()> {
380+
let executor_id = metadata.id.clone();
369381
let executor_data = ExecutorData {
370-
executor_id: metadata.id.clone(),
382+
executor_id: executor_id.clone(),
371383
total_task_slots: metadata.specification.task_slots,
372384
available_task_slots: metadata.specification.task_slots,
373385
};
@@ -378,6 +390,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
378390
.register_executor(metadata, executor_data)
379391
.await?;
380392

393+
// Record executor registration metric
394+
self.state
395+
.metrics_collector
396+
.record_executor_registered(&executor_id);
397+
398+
// Update active executor count
399+
let count = self.state.executor_manager.get_alive_executors().len();
400+
self.state
401+
.metrics_collector
402+
.set_active_executor_count(count);
403+
381404
// If we are using push-based scheduling then reserve this executors slots and send
382405
// them for scheduling tasks.
383406
if self.state.config.is_push_staged_scheduling() {

ballista/scheduler/src/state/executor_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ use ballista_core::serde::protobuf::{
3636
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
3737

3838
use ballista_core::utils::{
39-
GrpcClientConfig, create_grpc_client_connection, create_grpc_client_endpoint, get_time_before,
39+
GrpcClientConfig, create_grpc_client_connection, create_grpc_client_endpoint,
40+
get_time_before,
4041
};
4142

42-
4343
use dashmap::DashMap;
4444
use log::{debug, error, info, warn};
4545
use std::collections::{HashMap, HashSet};

ballista/scheduler/src/state/mod.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::state::task_manager::{TaskLauncher, TaskManager};
3737

3838
use crate::cluster::{BallistaCluster, BoundTask, ExecutorSlot};
3939
use crate::config::SchedulerConfig;
40+
use crate::metrics::SchedulerMetricsCollector;
4041
use crate::state::execution_graph::TaskDescription;
4142
use ballista_core::error::{BallistaError, Result};
4243
use ballista_core::event_loop::EventSender;
@@ -117,6 +118,8 @@ pub struct SchedulerState<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPl
117118
pub codec: BallistaCodec<T, U>,
118119
/// Scheduler configuration.
119120
pub config: Arc<SchedulerConfig>,
121+
/// Metrics collector for recording scheduler metrics.
122+
pub metrics_collector: Arc<dyn SchedulerMetricsCollector>,
120123
}
121124

122125
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T, U> {
@@ -126,6 +129,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
126129
codec: BallistaCodec<T, U>,
127130
scheduler_name: String,
128131
config: Arc<SchedulerConfig>,
132+
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
129133
) -> Self {
130134
Self {
131135
executor_manager: ExecutorManager::new(
@@ -140,6 +144,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
140144
session_manager: SessionManager::new(cluster.job_state()),
141145
codec,
142146
config,
147+
metrics_collector,
143148
}
144149
}
145150

@@ -148,9 +153,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
148153
pub fn new_with_default_scheduler_name(
149154
cluster: BallistaCluster,
150155
codec: BallistaCodec<T, U>,
156+
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
151157
) -> Self {
152158
let config = Arc::new(SchedulerConfig::default());
153-
SchedulerState::new(cluster, codec, "localhost:50050".to_owned(), config)
159+
SchedulerState::new(
160+
cluster,
161+
codec,
162+
"localhost:50050".to_owned(),
163+
config,
164+
metrics_collector,
165+
)
154166
}
155167

156168
#[allow(dead_code)]
@@ -159,6 +171,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
159171
codec: BallistaCodec<T, U>,
160172
scheduler_name: String,
161173
config: Arc<SchedulerConfig>,
174+
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
162175
dispatcher: Arc<dyn TaskLauncher>,
163176
) -> Self {
164177
Self {
@@ -175,6 +188,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
175188
session_manager: SessionManager::new(cluster.job_state()),
176189
codec,
177190
config,
191+
metrics_collector,
178192
}
179193
}
180194

@@ -491,6 +505,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
491505

492506
let elapsed = start.elapsed();
493507

508+
// Record planning duration metric
509+
self.metrics_collector
510+
.record_planning_duration(job_id, elapsed.as_millis() as u64);
511+
494512
info!("Planned job {job_id} in {elapsed:?}");
495513

496514
Ok(())

0 commit comments

Comments
 (0)