Skip to content

Commit 1f9e2b9

Browse files
Add run-level 'outcome' dimension to all telemetry metrics (#191)
Introduce a typed RunOutcome enum (Success, QueryFailure, PipelineFailure, ValidationTimeout, Cancelled) and add it as an 'outcome' dimension on every emitted OTel metric. This lets dashboards filter runs by their terminal state. Key changes: - RunOutcome enum in test-framework/src/metrics/mod.rs with as_str() for the OTel attribute value and Display that preserves the failure reason for PipelineFailure(String). - Event loop in load/mod.rs now breaks with Option<RunOutcome> instead of Option<String>, so each exit path carries a typed outcome. - After the loop, outcome is derived (from the loop result + test.succeeded()) and pushed onto metric_attributes before any metrics are recorded. - Telemetry is always emitted before error propagation, so even failed runs produce metrics with the outcome dimension.
1 parent 9a80630 commit 1f9e2b9

2 files changed

Lines changed: 113 additions & 34 deletions

File tree

  • crates/test-framework/src/metrics
  • src/commands/load

crates/test-framework/src/metrics/mod.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,48 @@ use crate::{TestType, git};
3636

3737
const FLOAT_ERROR_MARGIN: f64 = 0.0001;
3838

39+
/// The outcome of a benchmark run, applied as a dimension to all emitted metrics.
40+
///
41+
/// This allows filtering runs by their terminal state in dashboards and queries.
42+
/// For example, filtering to `outcome = "success"` excludes runs that had query
43+
/// failures, pipeline errors, or were interrupted.
44+
#[derive(Debug, Clone, PartialEq, Eq)]
45+
pub enum RunOutcome {
46+
/// All queries passed and the run completed successfully.
47+
Success,
48+
/// One or more queries returned execution errors.
49+
QueryFailure,
50+
/// The data ingestion/ETL pipeline failed.
51+
PipelineFailure(String),
52+
/// Checkpoint validation did not converge within the timeout.
53+
ValidationTimeout,
54+
/// The run was interrupted by the user (e.g., ctrl-c).
55+
Cancelled,
56+
}
57+
58+
impl RunOutcome {
59+
/// Returns the string representation used as the OTel attribute value.
60+
#[must_use]
61+
pub fn as_str(&self) -> &'static str {
62+
match self {
63+
Self::Success => "success",
64+
Self::QueryFailure => "query_failure",
65+
Self::PipelineFailure(_) => "pipeline_failure",
66+
Self::ValidationTimeout => "validation_timeout",
67+
Self::Cancelled => "cancelled",
68+
}
69+
}
70+
}
71+
72+
impl std::fmt::Display for RunOutcome {
73+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74+
match self {
75+
Self::PipelineFailure(reason) => write!(f, "pipeline_failure: {reason}"),
76+
other => f.write_str(other.as_str()),
77+
}
78+
}
79+
}
80+
3981
#[expect(
4082
clippy::must_use_candidate,
4183
clippy::cast_possible_wrap,

src/commands/load/mod.rs

Lines changed: 71 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ use test_framework::{
2828
TestType, anyhow,
2929
arrow::util::pretty::print_batches,
3030
execution::QueryExecutor,
31-
metrics::{MetricCollector, NoExtendedMetrics, QueryMetrics, QueryStatus, StatisticsCollector},
31+
metrics::{
32+
MetricCollector, NoExtendedMetrics, QueryMetrics, QueryStatus, RunOutcome,
33+
StatisticsCollector,
34+
},
3235
opentelemetry::KeyValue,
3336
opentelemetry::metrics::{Counter, Gauge},
3437
opentelemetry_sdk::Resource,
@@ -54,11 +57,14 @@ struct SutInstruments {
5457
ingestion_bytes_total: Gauge<u64>,
5558
}
5659

57-
fn run_metric_attributes(common_args: &CommonArgs) -> Vec<KeyValue> {
58-
vec![KeyValue::new(
59-
"executor_instance_type",
60-
common_args.executor_instance_type.clone(),
61-
)]
60+
fn run_metric_attributes(common_args: &CommonArgs, run_id: uuid::Uuid) -> Vec<KeyValue> {
61+
vec![
62+
KeyValue::new(
63+
"executor_instance_type",
64+
common_args.executor_instance_type.clone(),
65+
),
66+
KeyValue::new("run_id", run_id.to_string()),
67+
]
6268
}
6369

6470
fn log_sut_metrics_snapshot(response: &MetricsResponse) {
@@ -168,7 +174,7 @@ fn spawn_sut_metrics_scraper(
168174
run_id: uuid::Uuid,
169175
token: CancellationToken,
170176
interval: Duration,
171-
attributes: Vec<KeyValue>,
177+
attributes: Arc<std::sync::RwLock<Vec<KeyValue>>>,
172178
instruments: SutInstruments,
173179
) -> tokio::task::JoinHandle<Option<MetricsResponse>> {
174180
tokio::spawn(async move {
@@ -187,11 +193,12 @@ fn spawn_sut_metrics_scraper(
187193
let metrics_result = adapter.lock().await.metrics(run_id, false).await;
188194
match metrics_result {
189195
Ok(resp) => {
196+
let attrs = attributes.read().expect("SUT attributes lock poisoned");
190197
log_sut_metrics_snapshot(&resp);
191198
record_sut_metrics(
192199
&resp,
193200
&instruments,
194-
&attributes,
201+
&attrs,
195202
&mut prev_cpu_usage_seconds,
196203
&mut prev_disk_read_bytes,
197204
&mut prev_disk_write_bytes,
@@ -210,11 +217,12 @@ fn spawn_sut_metrics_scraper(
210217
() = token.cancelled() => {
211218
// Final scrape before exiting
212219
if let Ok(resp) = adapter.lock().await.metrics(run_id, true).await {
220+
let attrs = attributes.read().expect("SUT attributes lock poisoned");
213221
log_sut_metrics_snapshot(&resp);
214222
record_sut_metrics(
215223
&resp,
216224
&instruments,
217-
&attributes,
225+
&attrs,
218226
&mut prev_cpu_usage_seconds,
219227
&mut prev_disk_read_bytes,
220228
&mut prev_disk_write_bytes,
@@ -584,7 +592,7 @@ pub(crate) async fn run(
584592
checkpoint_dir: Option<&Path>,
585593
query_catalog_namespace: Option<String>,
586594
) -> anyhow::Result<()> {
587-
let metric_attributes = run_metric_attributes(common_args);
595+
let metric_attributes = run_metric_attributes(common_args, run_id);
588596

589597
scenario.load_query_set()?;
590598

@@ -625,7 +633,8 @@ pub(crate) async fn run(
625633
// SUT metrics are always periodically exported to the Arrow backend (SPICEAI_BENCHMARK_METRICS_KEY).
626634
// When --otlp-endpoint is configured, they are also exported there.
627635
let sut_scraper_token = CancellationToken::new();
628-
let (sut_scraper_handle, sut_pipeline) = if common_args.scrape_sut_metrics
636+
let (sut_scraper_handle, sut_pipeline, sut_shared_attributes) = if common_args
637+
.scrape_sut_metrics
629638
&& (common_args.system_adapter_stdio_cmd.is_some()
630639
|| common_args.system_adapter_http_url.is_some())
631640
{
@@ -646,29 +655,25 @@ pub(crate) async fn run(
646655
ingestion_rows_total: m.u64_gauge("ingestion_rows_total").build(),
647656
ingestion_bytes_total: m.u64_gauge("ingestion_bytes_total").build(),
648657
};
649-
let mut sut_attributes = metric_attributes.clone();
650-
sut_attributes.push(KeyValue::new("run_id", run_id.to_string()));
658+
let sut_attributes = Arc::new(std::sync::RwLock::new(metric_attributes.clone()));
651659
println!("SUT metrics scraping enabled (run_id={run_id})");
652660
(
653661
Some(spawn_sut_metrics_scraper(
654662
system_adapter_client,
655663
run_id,
656664
sut_scraper_token.clone(),
657665
Duration::from_secs(5),
658-
sut_attributes,
666+
Arc::clone(&sut_attributes),
659667
instruments,
660668
)),
661669
Some(sut_pipeline),
670+
Some(sut_attributes),
662671
)
663672
} else {
664-
(None, None)
673+
(None, None, None)
665674
};
666675

667-
// Record client concurrency as a gauge
668-
crate::metrics::ACTIVE_CONNECTIONS.record(
669-
common_args.concurrency.try_into().unwrap_or(0),
670-
&metric_attributes,
671-
);
676+
// ACTIVE_CONNECTIONS is recorded post-loop so it carries the `outcome` dimension.
672677

673678
let mut test_builder = NotStarted::new()
674679
.with_parallel_count(common_args.concurrency)
@@ -736,7 +741,11 @@ pub(crate) async fn run(
736741
// validation would be triggered before resuming.
737742
//
738743
// If interrupted (ctrl-c), cancel both the test and the ETL pipeline.
739-
let etl_error: Option<String> = loop {
744+
// Collect checkpoint E2E latency samples during the loop, then emit
745+
// them post-loop so they carry the final `outcome` dimension.
746+
let mut checkpoint_e2e_latency_samples: Vec<f64> = Vec::new();
747+
748+
let run_outcome: Option<RunOutcome> = loop {
740749
tokio::select! {
741750
// ETL state changed — check if stopped or paused
742751
_ = etl_state_rx.changed() => {
@@ -777,24 +786,21 @@ pub(crate) async fn run(
777786

778787
match result {
779788
CheckpointValidationResult::Converged { e2e_latency_ms } => {
780-
crate::metrics::E2E_LATENCY_MS
781-
.record(e2e_latency_ms, &metric_attributes);
789+
checkpoint_e2e_latency_samples.push(e2e_latency_ms);
782790
}
783791
CheckpointValidationResult::Interrupted => {
784792
eprintln!("Interrupt received during checkpoint validation, stopping...");
785793
shutdown_token.cancel();
786794
etl_pipeline.cancel();
787-
break Some("Interrupted by user".to_string());
795+
break Some(RunOutcome::Cancelled);
788796
}
789797
CheckpointValidationResult::TimedOut => {
790798
eprintln!(
791799
"Checkpoint {checkpoint_idx} validation timed out after 600s without convergence, aborting run"
792800
);
793801
shutdown_token.cancel();
794802
etl_pipeline.cancel();
795-
break Some(format!(
796-
"Checkpoint {checkpoint_idx} validation timed out after 600s"
797-
));
803+
break Some(RunOutcome::ValidationTimeout);
798804
}
799805
}
800806
}
@@ -817,7 +823,7 @@ pub(crate) async fn run(
817823
if let Err(e) = etl_pipeline.continue_pipeline() {
818824
eprintln!("Failed to continue ETL pipeline after pause: {e}");
819825
shutdown_token.cancel();
820-
break Some(format!("Failed to continue ETL pipeline: {e}"));
826+
break Some(RunOutcome::PipelineFailure(format!("Failed to continue ETL pipeline: {e}")));
821827
}
822828
tracing::info!("ETL pipeline resumed");
823829
}
@@ -853,7 +859,7 @@ pub(crate) async fn run(
853859
PipelineState::Stopped(StopReason::Error(ref e)) => {
854860
eprintln!("ETL pipeline failed: {e}");
855861
shutdown_token.cancel();
856-
break Some(e.clone());
862+
break Some(RunOutcome::PipelineFailure(e.clone()));
857863
}
858864
PipelineState::Stopped(StopReason::Cancelled) => {
859865
println!("ETL pipeline was cancelled, stopping benchmark...");
@@ -868,7 +874,7 @@ pub(crate) async fn run(
868874
println!("Interrupt received, stopping benchmark...");
869875
shutdown_token.cancel();
870876
etl_pipeline.cancel();
871-
break None;
877+
break Some(RunOutcome::Cancelled);
872878
}
873879
}
874880
};
@@ -880,10 +886,28 @@ pub(crate) async fn run(
880886
}
881887
};
882888

883-
// Propagate ETL error after collecting the test result
884-
if let Some(etl_err) = etl_error {
885-
return Err(anyhow::anyhow!("ETL pipeline failed: {etl_err}"));
889+
// Determine the run outcome from the loop exit reason and test results.
890+
// This is resolved before recording any metrics so every emitted metric
891+
// carries the `outcome` dimension.
892+
let outcome = match &run_outcome {
893+
Some(outcome) => outcome.clone(),
894+
None if test.succeeded() => RunOutcome::Success,
895+
None => RunOutcome::QueryFailure,
896+
};
897+
898+
// Add outcome as a dimension on all subsequently recorded metrics.
899+
let mut metric_attributes = metric_attributes;
900+
metric_attributes.push(KeyValue::new("outcome", outcome.as_str()));
901+
902+
// Record deferred metrics now that outcome is available.
903+
crate::metrics::ACTIVE_CONNECTIONS.record(
904+
common_args.concurrency.try_into().unwrap_or(0),
905+
&metric_attributes,
906+
);
907+
for sample in &checkpoint_e2e_latency_samples {
908+
crate::metrics::E2E_LATENCY_MS.record(*sample, &metric_attributes);
886909
}
910+
887911
test.get_query_durations().statistical_set()?;
888912

889913
// Get all query durations for overall statistics before ending the test
@@ -940,6 +964,13 @@ pub(crate) async fn run(
940964
}
941965
}
942966

967+
// Inject outcome into shared SUT attributes so the final scrape carries it.
968+
if let Some(ref sut_attrs) = sut_shared_attributes {
969+
sut_attrs
970+
.write()
971+
.expect("SUT attributes lock poisoned")
972+
.push(KeyValue::new("outcome", outcome.as_str()));
973+
}
943974
// Stop SUT metrics scraper and flush its pipeline
944975
sut_scraper_token.cancel();
945976
if let Some(handle) = sut_scraper_handle
@@ -967,9 +998,15 @@ pub(crate) async fn run(
967998
exporter.shutdown().await;
968999
}
9691000

970-
println!("Benchmark completed");
1001+
println!("Benchmark completed (outcome: {outcome})");
9711002

1003+
// Always emit telemetry — even on failure — so the outcome dimension is recorded.
9721004
telemetry.emit().await?;
9731005

1006+
// Propagate failure after telemetry has been emitted.
1007+
if let Some(failure_outcome) = run_outcome {
1008+
return Err(anyhow::anyhow!("Benchmark run failed: {failure_outcome}"));
1009+
}
1010+
9741011
Ok(())
9751012
}

0 commit comments

Comments
 (0)