Skip to content

Commit 1a98c29

Browse files
phillipleblanclukekim
authored andcommitted
Add comprehensive metrics instrumentation for scheduler and executor (#10)
* Add shuffle read metrics extraction and QueryStageExecutor::plan() method - Add public getter methods to PartitionStats (num_rows, num_batches, num_bytes) - Extend QueryStageExecutor trait with plan() method to access underlying ExecutionPlan - Add extract_shuffle_read_metrics() to walk plan tree and sum ShuffleReaderExec partition stats - Record shuffle read metrics (bytes, rows, duration) after successful task execution in executor * Add shuffle locality metrics to ExecutorMetricsCollector, SchedulerMetricsCollector, and ShuffleReaderExec - Add record_shuffle_read_local/remote methods to ExecutorMetricsCollector trait - Add record_task_shuffle_affinity_hit/miss methods to SchedulerMetricsCollector trait - Add ShuffleReadMetricsCallback trait in ballista-core for tracking local vs remote reads - Instrument shuffle_reader.rs to call metrics callback during partition fetches - Add SessionConfigExt methods to pass metrics callback via session config * Add metrics collector to SchedulerState and instrument executor and planning metrics - Add metrics_collector field to SchedulerState struct - Instrument record_planning_duration in submit_job - Instrument record_executor_registered/deregistered and set_active_executor_count - Update all SchedulerState constructors and call sites * Add stage and task lifecycle metrics instrumentation to update_task_status flow * Add shuffle affinity metrics to scheduler task binding * Add actual task scheduling latency tracking - Add schedulable_time_millis field to TaskDescription to track when a task became schedulable (when its stage transitioned to running state) - Update all TaskDescription creation sites to pass RunningStage.stage_running_time - Calculate actual scheduling latency in record_task_scheduled calls by computing the difference between current time and schedulable_time_millis - This enables accurate scheduler_task_scheduling_latency_ms metrics instead of the previous placeholder value of 0 * fix lint
1 parent 9fcff62 commit 1a98c29

33 files changed

Lines changed: 1729 additions & 238 deletions

ballista/core/src/client.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@ use datafusion::error::Result;
4747
use crate::extension::BallistaConfigGrpcEndpoint;
4848
use crate::serde::protobuf;
4949

50-
use crate::utils::{GrpcClientConfig, create_grpc_client_connection};
51-
5250
use crate::utils::create_grpc_client_endpoint;
5351

5452
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};

ballista/core/src/execution_plans/distributed_query.rs

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
use crate::client::BallistaClient;
1919
use crate::config::BallistaConfig;
2020
use crate::extension::{
21-
BallistaConfigGrpcEndpoint, BallistaGrpcMetadataInterceptor, SessionConfigExt,
21+
BallistaConfigGrpcEndpoint, BallistaGrpcMetadataInterceptor,
22+
ResultFetchMetricsCallback, SessionConfigExt,
2223
};
2324
use crate::serde::protobuf::SuccessfulJob;
2425
use crate::serde::protobuf::{
@@ -27,8 +28,6 @@ use crate::serde::protobuf::{
2728
scheduler_grpc_client::SchedulerGrpcClient,
2829
};
2930

30-
use crate::utils::{GrpcClientConfig, create_grpc_client_connection};
31-
3231
use crate::utils::create_grpc_client_endpoint;
3332

3433
use datafusion::arrow::datatypes::SchemaRef;
@@ -248,8 +247,6 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
248247
let metric_total_bytes =
249248
MetricBuilder::new(&self.metrics).counter("transferred_bytes", partition);
250249

251-
252-
253250
let interceptor = context.session_config().ballista_grpc_interceptor();
254251

255252
let customize_endpoint = context
@@ -258,23 +255,22 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
258255

259256
let use_tls = context.session_config().ballista_use_tls();
260257

258+
let result_fetch_callback = context
259+
.session_config()
260+
.ballista_result_fetch_metrics_callback();
261261

262262
let stream = futures::stream::once(
263263
execute_query(
264264
self.scheduler_url.clone(),
265265
self.session_id.clone(),
266266
query,
267-
268-
self.config.default_grpc_client_max_message_size(),
269-
GrpcClientConfig::from(&self.config),
270267
Arc::new(self.metrics.clone()),
271268
partition,
272-
273269
self.config.clone(),
274270
interceptor,
275271
customize_endpoint,
276272
use_tls,
277-
273+
result_fetch_callback,
278274
)
279275
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
280276
)
@@ -306,21 +302,18 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
306302
}
307303
}
308304

305+
#[allow(clippy::too_many_arguments)]
309306
async fn execute_query(
310307
scheduler_url: String,
311308
session_id: String,
312309
query: ExecuteQueryParams,
313-
314-
max_message_size: usize,
315-
grpc_config: GrpcClientConfig,
316310
metrics: Arc<ExecutionPlanMetricsSet>,
317311
partition: usize,
318-
319312
config: BallistaConfig,
320313
grpc_interceptor: Arc<BallistaGrpcMetadataInterceptor>,
321314
customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
322315
use_tls: bool,
323-
316+
result_fetch_callback: Option<Arc<dyn ResultFetchMetricsCallback>>,
324317
) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
325318
// Capture query submission time for total_query_time_ms
326319
let query_start_time = std::time::Instant::now();
@@ -450,12 +443,14 @@ async fn execute_query(
450443
// This could be added in a future enhancement by wrapping the stream.
451444

452445
let streams = partition_location.into_iter().map(move |partition| {
446+
let callback = result_fetch_callback.clone();
453447
let f = fetch_partition(
454448
partition,
455449
max_message_size,
456450
true,
457451
customize_endpoint.clone(),
458452
use_tls,
453+
callback,
459454
)
460455
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
461456

@@ -474,13 +469,29 @@ async fn fetch_partition(
474469
flight_transport: bool,
475470
customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
476471
use_tls: bool,
472+
metrics_callback: Option<Arc<dyn ResultFetchMetricsCallback>>,
477473
) -> Result<SendableRecordBatchStream> {
474+
let start_time = std::time::Instant::now();
475+
478476
let metadata = location.executor_meta.ok_or_else(|| {
479477
DataFusionError::Internal("Received empty executor metadata".to_owned())
480478
})?;
481479
let partition_id = location.partition_id.ok_or_else(|| {
482480
DataFusionError::Internal("Received empty partition id".to_owned())
483481
})?;
482+
483+
// Extract stats before consuming location
484+
let stats = location.partition_stats.as_ref();
485+
#[expect(clippy::cast_sign_loss)]
486+
let expected_bytes = stats.map(|s| s.num_bytes as u64).unwrap_or(0);
487+
#[expect(clippy::cast_sign_loss)]
488+
let expected_rows = stats.map(|s| s.num_rows as u64).unwrap_or(0);
489+
490+
let job_id = partition_id.job_id.clone();
491+
let stage_id = partition_id.stage_id as usize;
492+
let partition = partition_id.partition_id as usize;
493+
let executor_id = metadata.id.clone();
494+
484495
let host = metadata.host.as_str();
485496
let port = metadata.port as u16;
486497
let mut ballista_client = BallistaClient::try_new(
@@ -492,7 +503,8 @@ async fn fetch_partition(
492503
)
493504
.await
494505
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
495-
ballista_client
506+
507+
let stream = ballista_client
496508
.fetch_partition(
497509
&metadata.id,
498510
&partition_id.into(),
@@ -502,5 +514,21 @@ async fn fetch_partition(
502514
flight_transport,
503515
)
504516
.await
505-
.map_err(|e| DataFusionError::External(Box::new(e)))
517+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
518+
519+
// Record metrics after successful fetch
520+
if let Some(callback) = metrics_callback {
521+
let duration_ms = start_time.elapsed().as_millis() as u64;
522+
callback.record_result_fetch(
523+
&job_id,
524+
stage_id,
525+
partition,
526+
&executor_id,
527+
expected_bytes,
528+
expected_rows,
529+
duration_ms,
530+
);
531+
}
532+
533+
Ok(stream)
506534
}

0 commit comments

Comments
 (0)