Add comprehensive metrics instrumentation for scheduler and executor#10
Merged
Conversation
…thod - Add public getter methods to PartitionStats (num_rows, num_batches, num_bytes) - Extend QueryStageExecutor trait with plan() method to access underlying ExecutionPlan - Add extract_shuffle_read_metrics() to walk plan tree and sum ShuffleReaderExec partition stats - Record shuffle read metrics (bytes, rows, duration) after successful task execution in executor
…tricsCollector, and ShuffleReaderExec - Add record_shuffle_read_local/remote methods to ExecutorMetricsCollector trait - Add record_task_shuffle_affinity_hit/miss methods to SchedulerMetricsCollector trait - Add ShuffleReadMetricsCallback trait in ballista-core for tracking local vs remote reads - Instrument shuffle_reader.rs to call metrics callback during partition fetches - Add SessionConfigExt methods to pass metrics callback via session config
…lanning metrics - Add metrics_collector field to SchedulerState struct - Instrument record_planning_duration in submit_job - Instrument record_executor_registered/deregistered and set_active_executor_count - Update all SchedulerState constructors and call sites
- Add schedulable_time_millis field to TaskDescription to track when a task became schedulable (when its stage transitioned to running state) - Update all TaskDescription creation sites to pass RunningStage.stage_running_time - Calculate actual scheduling latency in record_task_scheduled calls by computing the difference between current time and schedulable_time_millis - This enables accurate scheduler_task_scheduling_latency_ms metrics instead of the previous placeholder value of 0
063fc89 to
3ff7f46
Compare
lukekim
pushed a commit
that referenced
this pull request
Jan 23, 2026
…10) * Add shuffle read metrics extraction and QueryStageExecutor::plan() method - Add public getter methods to PartitionStats (num_rows, num_batches, num_bytes) - Extend QueryStageExecutor trait with plan() method to access underlying ExecutionPlan - Add extract_shuffle_read_metrics() to walk plan tree and sum ShuffleReaderExec partition stats - Record shuffle read metrics (bytes, rows, duration) after successful task execution in executor * Add shuffle locality metrics to ExecutorMetricsCollector, SchedulerMetricsCollector, and ShuffleReaderExec - Add record_shuffle_read_local/remote methods to ExecutorMetricsCollector trait - Add record_task_shuffle_affinity_hit/miss methods to SchedulerMetricsCollector trait - Add ShuffleReadMetricsCallback trait in ballista-core for tracking local vs remote reads - Instrument shuffle_reader.rs to call metrics callback during partition fetches - Add SessionConfigExt methods to pass metrics callback via session config * Add metrics collector to SchedulerState and instrument executor and planning metrics - Add metrics_collector field to SchedulerState struct - Instrument record_planning_duration in submit_job - Instrument record_executor_registered/deregistered and set_active_executor_count - Update all SchedulerState constructors and call sites * Add stage and task lifecycle metrics instrumentation to update_task_status flow * Add shuffle affinity metrics to scheduler task binding * Add actual task scheduling latency tracking - Add schedulable_time_millis field to TaskDescription to track when a task became schedulable (when its stage transitioned to running state) - Update all TaskDescription creation sites to pass RunningStage.stage_running_time - Calculate actual scheduling latency in record_task_scheduled calls by computing the difference between current time and schedulable_time_millis - This enables accurate scheduler_task_scheduling_latency_ms metrics instead of the previous placeholder value of 0 * fix lint
lukekim
added a commit
that referenced
this pull request
Jan 23, 2026
* feat: Store shuffles in object store (S3, Azure) * Add comprehensive metrics instrumentation for scheduler and executor (#10) * Add shuffle read metrics extraction and QueryStageExecutor::plan() method - Add public getter methods to PartitionStats (num_rows, num_batches, num_bytes) - Extend QueryStageExecutor trait with plan() method to access underlying ExecutionPlan - Add extract_shuffle_read_metrics() to walk plan tree and sum ShuffleReaderExec partition stats - Record shuffle read metrics (bytes, rows, duration) after successful task execution in executor * Add shuffle locality metrics to ExecutorMetricsCollector, SchedulerMetricsCollector, and ShuffleReaderExec - Add record_shuffle_read_local/remote methods to ExecutorMetricsCollector trait - Add record_task_shuffle_affinity_hit/miss methods to SchedulerMetricsCollector trait - Add ShuffleReadMetricsCallback trait in ballista-core for tracking local vs remote reads - Instrument shuffle_reader.rs to call metrics callback during partition fetches - Add SessionConfigExt methods to pass metrics callback via session config * Add metrics collector to SchedulerState and instrument executor and planning metrics - Add metrics_collector field to SchedulerState struct - Instrument record_planning_duration in submit_job - Instrument record_executor_registered/deregistered and set_active_executor_count - Update all SchedulerState constructors and call sites * Add stage and task lifecycle metrics instrumentation to update_task_status flow * Add shuffle affinity metrics to scheduler task binding * Add actual task scheduling latency tracking - Add schedulable_time_millis field to TaskDescription to track when a task became schedulable (when its stage transitioned to running state) - Update all TaskDescription creation sites to pass RunningStage.stage_running_time - Calculate actual scheduling latency in record_task_scheduled calls by computing the difference between current time and schedulable_time_millis - This enables accurate scheduler_task_scheduling_latency_ms metrics instead of the previous placeholder value of 0 * fix lint * feat: add Vortex columnar format support for shuffle operations (#7) * feat: add Vortex columnar format support for shuffle operations - Introduced Vortex dependencies in Cargo.toml for columnar format handling. - Updated Ballista configuration to support shuffle format selection between Arrow IPC and Vortex. - Implemented Vortex shuffle reader and writer in execution plans. - Enhanced shuffle operations to detect and handle Vortex files. - Added utility functions for writing streams to disk in both Arrow IPC and Vortex formats. - Created a new module for Vortex shuffle operations, including reading and writing logic. - Added tests for Vortex write and read roundtrip functionality. * Fix Clippy and lint * Fix reading of Vortex files * Fix lint * Don't expose final stage * Remove build-binary --------- Co-authored-by: Phillip LeBlanc <phillip@leblanc.tech>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Rationale for this change
This PR adds comprehensive metrics instrumentation to track scheduler and executor performance, enabling better observability into task scheduling, shuffle operations, and resource utilization.
What changes are included in this PR?
This PR includes 6 commits that progressively build out the metrics infrastructure:
1. Add shuffle read metrics extraction and QueryStageExecutor::plan() method
PartitionStats(num_rows, num_batches, num_bytes)QueryStageExecutortrait withplan()method to access underlyingExecutionPlanextract_shuffle_read_metrics()to walk plan tree and sumShuffleReaderExecpartition stats2. Add shuffle locality metrics to ExecutorMetricsCollector, SchedulerMetricsCollector, and ShuffleReaderExec
record_shuffle_read_local/remotemethods toExecutorMetricsCollectortraitrecord_task_shuffle_affinity_hit/missmethods toSchedulerMetricsCollectortraitShuffleReadMetricsCallbacktrait in ballista-core for tracking local vs remote readsshuffle_reader.rsto call metrics callback during partition fetchesSessionConfigExtmethods to pass metrics callback via session config3. Add metrics collector to SchedulerState and instrument executor and planning metrics
metrics_collectorfield toSchedulerStatestructrecord_planning_durationinsubmit_jobrecord_executor_registered/deregisteredandset_active_executor_countSchedulerStateconstructors and call sites4. Add stage and task lifecycle metrics instrumentation to update_task_status flow
5. Add shuffle affinity metrics to scheduler task binding
6. Add actual task scheduling latency tracking
schedulable_time_millisfield toTaskDescriptionto track when a task became schedulable (when its stage transitioned to running state)TaskDescriptioncreation sites to passRunningStage.stage_running_timeschedulable_time_millisscheduler_task_scheduling_latency_msmetrics instead of the previous placeholder value of 0Are there any user-facing changes?
Yes, this PR introduces new metrics that can be collected via the
ExecutorMetricsCollectorandSchedulerMetricsCollectortraits:Executor Metrics:
Scheduler Metrics:
These metrics are exposed through the existing Prometheus metrics endpoint when using the
PrometheusMetricsCollectorimplementation.