Skip to content

Commit bd748b0

Browse files
Add shuffle locality metrics to ExecutorMetricsCollector, SchedulerMetricsCollector, and ShuffleReaderExec
- Add record_shuffle_read_local/remote methods to ExecutorMetricsCollector trait - Add record_task_shuffle_affinity_hit/miss methods to SchedulerMetricsCollector trait - Add ShuffleReadMetricsCallback trait in ballista-core for tracking local vs remote reads - Instrument shuffle_reader.rs to call metrics callback during partition fetches - Add SessionConfigExt methods to pass metrics callback via session config
1 parent 379cf36 commit bd748b0

4 files changed

Lines changed: 250 additions & 1 deletion

File tree

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ use std::sync::Arc;
2929
use std::task::{Context, Poll};
3030

3131
use crate::client::BallistaClient;
32-
use crate::extension::{BallistaConfigGrpcEndpoint, SessionConfigExt};
32+
use crate::extension::{
33+
BallistaConfigGrpcEndpoint, SessionConfigExt, ShuffleReadMetricsCallback,
34+
};
3335
use crate::serde::scheduler::{PartitionLocation, PartitionStats};
3436

3537
use datafusion::arrow::datatypes::SchemaRef;
@@ -164,6 +166,7 @@ impl ExecutionPlan for ShuffleReaderExec {
164166
let prefer_flight = config.ballista_shuffle_reader_remote_prefer_flight();
165167
let customize_endpoint = config.ballista_override_create_grpc_client_endpoint();
166168
let use_tls = config.ballista_use_tls();
169+
let metrics_callback = config.ballista_shuffle_read_metrics_callback();
167170

168171
if force_remote_read {
169172
debug!(
@@ -199,6 +202,7 @@ impl ExecutionPlan for ShuffleReaderExec {
199202
prefer_flight,
200203
customize_endpoint,
201204
use_tls,
205+
metrics_callback,
202206
);
203207

204208
let result = RecordBatchStreamAdapter::new(
@@ -396,6 +400,7 @@ fn send_fetch_partitions(
396400
flight_transport: bool,
397401
customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
398402
use_tls: bool,
403+
metrics_callback: Option<Arc<dyn ShuffleReadMetricsCallback>>,
399404
) -> AbortableReceiverStream {
400405
let (response_sender, response_receiver) = mpsc::channel(max_request_num);
401406
let semaphore = Arc::new(Semaphore::new(max_request_num));
@@ -413,8 +418,10 @@ fn send_fetch_partitions(
413418
// keep local shuffle files reading in serial order for memory control.
414419
let response_sender_c = response_sender.clone();
415420
let customize_endpoint_c = customize_endpoint.clone();
421+
let metrics_callback_c = metrics_callback.clone();
416422
spawned_tasks.push(SpawnedTask::spawn(async move {
417423
for p in local_locations {
424+
let start_time = std::time::Instant::now();
418425
let r = PartitionReaderEnum::Local
419426
.fetch_partition(
420427
&p,
@@ -424,6 +431,25 @@ fn send_fetch_partitions(
424431
use_tls,
425432
)
426433
.await;
434+
435+
// Record local read metrics if callback is set and read succeeded
436+
if r.is_ok() {
437+
if let Some(ref callback) = metrics_callback_c {
438+
let duration_ms = start_time.elapsed().as_millis() as u64;
439+
let bytes = p.partition_stats.num_bytes().unwrap_or(0);
440+
let rows = p.partition_stats.num_rows().unwrap_or(0);
441+
callback.record_local_read(
442+
&p.partition_id.job_id,
443+
p.partition_id.stage_id,
444+
p.partition_id.partition_id,
445+
&p.executor_meta.id,
446+
bytes,
447+
rows,
448+
duration_ms,
449+
);
450+
}
451+
}
452+
427453
if let Err(e) = response_sender_c.send(r).await {
428454
error!("Fail to send response event to the channel due to {e}");
429455
}
@@ -434,9 +460,11 @@ fn send_fetch_partitions(
434460
let semaphore = semaphore.clone();
435461
let response_sender = response_sender.clone();
436462
let customize_endpoint_c = customize_endpoint.clone();
463+
let metrics_callback_c = metrics_callback.clone();
437464
spawned_tasks.push(SpawnedTask::spawn(async move {
438465
// Block if exceeds max request number.
439466
let permit = semaphore.acquire_owned().await.unwrap();
467+
let start_time = std::time::Instant::now();
440468
let r = PartitionReaderEnum::FlightRemote
441469
.fetch_partition(
442470
&p,
@@ -446,6 +474,25 @@ fn send_fetch_partitions(
446474
use_tls,
447475
)
448476
.await;
477+
478+
// Record remote read metrics if callback is set and read succeeded
479+
if r.is_ok() {
480+
if let Some(ref callback) = metrics_callback_c {
481+
let duration_ms = start_time.elapsed().as_millis() as u64;
482+
let bytes = p.partition_stats.num_bytes().unwrap_or(0);
483+
let rows = p.partition_stats.num_rows().unwrap_or(0);
484+
callback.record_remote_read(
485+
&p.partition_id.job_id,
486+
p.partition_id.stage_id,
487+
p.partition_id.partition_id,
488+
&p.executor_meta.id,
489+
bytes,
490+
rows,
491+
duration_ms,
492+
);
493+
}
494+
}
495+
449496
// Block if the channel buffer is full.
450497
if let Err(e) = response_sender.send(r).await {
451498
error!("Fail to send response event to the channel due to {e}");
@@ -992,6 +1039,7 @@ mod tests {
9921039
true,
9931040
None,
9941041
false,
1042+
None, // No metrics callback in tests
9951043
);
9961044

9971045
let stream = RecordBatchStreamAdapter::new(

ballista/core/src/extension.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,20 @@ pub trait SessionConfigExt {
175175

176176
/// Get whether to use TLS for executor connections
177177
fn ballista_use_tls(&self) -> bool;
178+
179+
/// Set a callback for recording shuffle read metrics (local vs remote).
180+
///
181+
/// This callback will be invoked by the shuffle reader during execution
182+
/// to record detailed metrics about local and remote shuffle reads.
183+
fn with_ballista_shuffle_read_metrics_callback(
184+
self,
185+
callback: Arc<dyn ShuffleReadMetricsCallback>,
186+
) -> Self;
187+
188+
/// Get the shuffle read metrics callback if one has been set.
189+
fn ballista_shuffle_read_metrics_callback(
190+
&self,
191+
) -> Option<Arc<dyn ShuffleReadMetricsCallback>>;
178192
}
179193

180194
/// [SessionConfigHelperExt] is set of [SessionConfig] extension methods
@@ -459,6 +473,21 @@ impl SessionConfigExt for SessionConfig {
459473
.map(|ext| ext.0)
460474
.unwrap_or(false)
461475
}
476+
477+
fn with_ballista_shuffle_read_metrics_callback(
478+
self,
479+
callback: Arc<dyn ShuffleReadMetricsCallback>,
480+
) -> Self {
481+
let extension = ShuffleReadMetricsCallbackExtension::new(callback);
482+
self.with_extension(Arc::new(extension))
483+
}
484+
485+
fn ballista_shuffle_read_metrics_callback(
486+
&self,
487+
) -> Option<Arc<dyn ShuffleReadMetricsCallback>> {
488+
self.get_extension::<ShuffleReadMetricsCallbackExtension>()
489+
.map(|ext| ext.callback())
490+
}
462491
}
463492

464493
impl SessionConfigHelperExt for SessionConfig {
@@ -658,6 +687,79 @@ impl BallistaConfigGrpcEndpoint {
658687
#[derive(Clone, Copy)]
659688
pub struct BallistaUseTls(pub bool);
660689

690+
/// Callback trait for recording shuffle read metrics from the shuffle reader.
691+
///
692+
/// This trait is designed to be passed via session config extension to the
693+
/// shuffle reader, allowing external systems (like Spice) to capture detailed
694+
/// shuffle read locality metrics without creating circular dependencies.
695+
pub trait ShuffleReadMetricsCallback: Send + Sync {
696+
/// Record a local shuffle read operation.
697+
///
698+
/// Called when the shuffle reader successfully reads data from a local file
699+
/// (i.e., the partition was produced by this executor).
700+
///
701+
/// # Arguments
702+
/// * `job_id` - The job identifier
703+
/// * `stage_id` - The stage that is reading the shuffle data
704+
/// * `partition` - The partition being read
705+
/// * `source_executor_id` - The executor that produced the shuffle data (same as current executor for local reads)
706+
/// * `bytes` - Number of bytes read
707+
/// * `rows` - Number of rows read
708+
/// * `duration_ms` - Time taken to read the partition
709+
fn record_local_read(
710+
&self,
711+
job_id: &str,
712+
stage_id: usize,
713+
partition: usize,
714+
source_executor_id: &str,
715+
bytes: u64,
716+
rows: u64,
717+
duration_ms: u64,
718+
);
719+
720+
/// Record a remote shuffle read operation.
721+
///
722+
/// Called when the shuffle reader fetches data from a remote executor
723+
/// via Arrow Flight.
724+
///
725+
/// # Arguments
726+
/// * `job_id` - The job identifier
727+
/// * `stage_id` - The stage that is reading the shuffle data
728+
/// * `partition` - The partition being read
729+
/// * `source_executor_id` - The executor that produced the shuffle data
730+
/// * `bytes` - Number of bytes read
731+
/// * `rows` - Number of rows read
732+
/// * `duration_ms` - Time taken to fetch the partition
733+
fn record_remote_read(
734+
&self,
735+
job_id: &str,
736+
stage_id: usize,
737+
partition: usize,
738+
source_executor_id: &str,
739+
bytes: u64,
740+
rows: u64,
741+
duration_ms: u64,
742+
);
743+
}
744+
745+
/// Session config extension wrapper for the shuffle read metrics callback.
746+
#[derive(Clone)]
747+
pub struct ShuffleReadMetricsCallbackExtension {
748+
callback: Arc<dyn ShuffleReadMetricsCallback>,
749+
}
750+
751+
impl ShuffleReadMetricsCallbackExtension {
752+
/// Create a new extension wrapping the provided callback.
753+
pub fn new(callback: Arc<dyn ShuffleReadMetricsCallback>) -> Self {
754+
Self { callback }
755+
}
756+
757+
/// Get the callback.
758+
pub fn callback(&self) -> Arc<dyn ShuffleReadMetricsCallback> {
759+
Arc::clone(&self.callback)
760+
}
761+
}
762+
661763
#[cfg(test)]
662764
mod test {
663765
use datafusion::{

ballista/executor/src/metrics/mod.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,36 @@ pub trait ExecutorMetricsCollector: Send + Sync {
8686
duration_ms: u64,
8787
);
8888

89+
/// Record local shuffle read metrics (data read from local disk).
90+
///
91+
/// Called when shuffle data is read from a local file. This means the partition
92+
/// was written by this same executor in a previous stage, avoiding network transfer.
93+
fn record_shuffle_read_local(
94+
&self,
95+
job_id: &str,
96+
stage_id: usize,
97+
partition: usize,
98+
bytes: u64,
99+
rows: u64,
100+
duration_ms: u64,
101+
);
102+
103+
/// Record remote shuffle read metrics (data fetched from another executor).
104+
///
105+
/// Called when shuffle data must be fetched over the network from another
106+
/// executor that produced the partition. The `source_executor_id` identifies
107+
/// the executor that holds the shuffle data.
108+
fn record_shuffle_read_remote(
109+
&self,
110+
job_id: &str,
111+
stage_id: usize,
112+
partition: usize,
113+
source_executor_id: &str,
114+
bytes: u64,
115+
rows: u64,
116+
duration_ms: u64,
117+
);
118+
89119
/// Record executor memory availability.
90120
///
91121
/// Called periodically (e.g., during heartbeat) to report the executor's
@@ -154,6 +184,35 @@ impl ExecutorMetricsCollector for LoggingMetricsCollector {
154184
);
155185
}
156186

187+
fn record_shuffle_read_local(
188+
&self,
189+
job_id: &str,
190+
stage_id: usize,
191+
partition: usize,
192+
bytes: u64,
193+
rows: u64,
194+
duration_ms: u64,
195+
) {
196+
info!(
197+
"=== [{job_id}/{stage_id}/{partition}] Local shuffle read: {bytes} bytes, {rows} rows in {duration_ms}ms ==="
198+
);
199+
}
200+
201+
fn record_shuffle_read_remote(
202+
&self,
203+
job_id: &str,
204+
stage_id: usize,
205+
partition: usize,
206+
source_executor_id: &str,
207+
bytes: u64,
208+
rows: u64,
209+
duration_ms: u64,
210+
) {
211+
info!(
212+
"=== [{job_id}/{stage_id}/{partition}] Remote shuffle read from {source_executor_id}: {bytes} bytes, {rows} rows in {duration_ms}ms ==="
213+
);
214+
}
215+
157216
fn record_memory_available(&self, available_bytes: u64) {
158217
info!("=== Executor memory available: {available_bytes} bytes ===");
159218
}

ballista/scheduler/src/metrics/mod.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,32 @@ pub trait SchedulerMetricsCollector: Send + Sync {
134134
/// stage-level retries and tracks individual task retry attempts.
135135
fn record_task_retry(&self, job_id: &str, stage_id: usize);
136136

137+
/// Record a shuffle affinity hit - task was assigned to an executor that has
138+
/// local shuffle data from a parent stage.
139+
///
140+
/// Called when the scheduler assigns a task to an executor that already has
141+
/// the required shuffle partitions from upstream stages stored locally.
142+
/// This indicates the task can read shuffle data without network transfer.
143+
fn record_task_shuffle_affinity_hit(
144+
&self,
145+
job_id: &str,
146+
stage_id: usize,
147+
executor_id: &str,
148+
);
149+
150+
/// Record a shuffle affinity miss - task was assigned to an executor that does
151+
/// NOT have local shuffle data from a parent stage.
152+
///
153+
/// Called when the scheduler assigns a task to an executor that does not have
154+
/// the required shuffle partitions locally. This indicates the task will need
155+
/// to fetch shuffle data over the network from other executors.
156+
fn record_task_shuffle_affinity_miss(
157+
&self,
158+
job_id: &str,
159+
stage_id: usize,
160+
executor_id: &str,
161+
);
162+
137163
// =========================================================================
138164
// Executor management events (new)
139165
// =========================================================================
@@ -207,6 +233,20 @@ impl SchedulerMetricsCollector for NoopMetricsCollector {
207233
) {
208234
}
209235
fn record_task_retry(&self, _job_id: &str, _stage_id: usize) {}
236+
fn record_task_shuffle_affinity_hit(
237+
&self,
238+
_job_id: &str,
239+
_stage_id: usize,
240+
_executor_id: &str,
241+
) {
242+
}
243+
fn record_task_shuffle_affinity_miss(
244+
&self,
245+
_job_id: &str,
246+
_stage_id: usize,
247+
_executor_id: &str,
248+
) {
249+
}
210250

211251
// Executor management
212252
fn set_active_executor_count(&self, _count: usize) {}

0 commit comments

Comments
 (0)