diff --git a/lading_payload/proptest-regressions/opentelemetry_metric.txt b/lading_payload/proptest-regressions/opentelemetry_metric.txt index 9bff97216..10d4ab81a 100644 --- a/lading_payload/proptest-regressions/opentelemetry_metric.txt +++ b/lading_payload/proptest-regressions/opentelemetry_metric.txt @@ -17,3 +17,7 @@ cc d32a09383fb7c3f8b6d31d9943046e689fdbd6406aa57267993cd530c5a6eec8 # shrinks to cc 952bb240eb2db0cc80dbae38606f87673f616b6265888ab41d70159d3bb02fc7 # shrinks to seed = 0, total_contexts = 75, attributes_per_resource = 0, scopes_per_resource = 15, attributes_per_scope = 0, metrics_per_scope = 5, attributes_per_metric = 0, steps = 2, budget = 128 cc 6c5fecad6ef2d4a0a58104a348850fdb43be5f0643c8419e9c8f5bc8947ee443 # shrinks to seed = 15466789691079290595, total_contexts = 162, attributes_per_resource = 8, scopes_per_resource = 18, attributes_per_scope = 2, metrics_per_scope = 9, attributes_per_metric = 0, budget = 3400 cc 96fb21ece082cb2a738c23df64ffa4002e9abb97d97c293635ef519949177ad3 # shrinks to seed = 0, total_contexts = 1, attributes_per_resource = 0, scopes_per_resource = 1, attributes_per_scope = 0, metrics_per_scope = 1, attributes_per_metric = 0, budget = 31 +cc 9abe11b8bced96c442d0b6ec33da804a6b90525dc2cbb40bf1df2222c1a3ed30 # shrinks to seed = 16609138957746546345, steps = 2, budget = 1697 +cc 71f9b3dcd5692e290ad9222f2fec063efc6af9a3a76be27bb37934655af2581c +cc d6fb22247744874736a33213ae5e5115d1427e573b108797ceebf6e1ec4918c5 # shrinks to seed = 13411699595081898175, total_contexts = 2, attributes_per_resource = 3, scopes_per_resource = 2, attributes_per_scope = 1, metrics_per_scope = 3, attributes_per_metric = 2, steps = 1, budget = 467 +cc 03f848350451550bb39ecd64ac20132ecbb4aa2d794d9586d6b509872d8018ef # shrinks to seed = 4558513675196720091, total_contexts = 2, attributes_per_resource = 7, scopes_per_resource = 1, attributes_per_scope = 5, metrics_per_scope = 1, attributes_per_metric = 1, steps = 1, budget = 371 diff --git a/lading_payload/src/opentelemetry_metric.rs b/lading_payload/src/opentelemetry_metric.rs index d8f4036be..c3dab541f 100644 --- a/lading_payload/src/opentelemetry_metric.rs +++ b/lading_payload/src/opentelemetry_metric.rs @@ -61,6 +61,9 @@ use unit::UnitGenerator; // smallest_protobuf test const SMALLEST_PROTOBUF: usize = 31; +/// Increment timestamps by 100 milliseconds (in nanoseconds) per tick +const TIME_INCREMENT_NANOS: u64 = 1_000_000; + /// Configure the OpenTelemetry metric payload. #[derive(Debug, Deserialize, SerdeSerialize, Clone, PartialEq, Copy)] #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] @@ -257,8 +260,16 @@ impl Config { #[derive(Debug, Clone)] /// OTLP metric payload pub struct OpentelemetryMetrics { + /// Template pool for metric generation pool: Pool, + /// Scratch buffer for serialization scratch: RefCell, + /// Current tick count for monotonic timing (starts at 0) + tick: u64, + /// Accumulating sum increment, floating point + incr_f: f64, + /// Accumulating sum increment, integer + incr_i: i64, } impl OpentelemetryMetrics { @@ -279,6 +290,9 @@ impl OpentelemetryMetrics { Ok(Self { pool: Pool::new(context_cap, rt_gen), scratch: RefCell::new(BytesMut::with_capacity(4096)), + tick: 0, + incr_f: 0.0, + incr_i: 0, }) } } @@ -287,12 +301,19 @@ impl<'a> SizedGenerator<'a> for OpentelemetryMetrics { type Output = v1::ResourceMetrics; type Error = Error; + /// Generate OTLP metrics with the following enhancements: + /// + /// * Monotonic sums are truly monotonic, incrementing by a random amount each tick + /// * Timestamps advance monotonically based on internal tick counter starting at epoch + /// * Each call advances the tick counter by a random amount (1-60) fn generate(&'a mut self, rng: &mut R, budget: &mut usize) -> Result where R: rand::Rng + ?Sized, { - // Retrieve a template from the pool, modify its timestamp and data - // points to randomize the data we send out. + self.tick += rng.random_range(1..=60); + self.incr_f += rng.random_range(1.0..=100.0); + self.incr_i += rng.random_range(1_i64..=100_i64); + let mut tpl: v1::ResourceMetrics = match self.pool.fetch(rng, budget) { Ok(t) => t.to_owned(), Err(PoolError::EmptyChoice) => { @@ -302,15 +323,16 @@ impl<'a> SizedGenerator<'a> for OpentelemetryMetrics { Err(e) => Err(e)?, }; - // Randomize the data points in each metric. Metric kinds are preserved - // but timestamps are completely random as are point values. + // Update data points in each metric. For gauges we use random values, + // for accumulating sums we increment by a fixed amount per tick. + // All timestamps are updated based on the current tick. for scope_metrics in &mut tpl.scope_metrics { for metric in &mut scope_metrics.metrics { if let Some(data) = &mut metric.data { match data { Data::Gauge(gauge) => { for point in &mut gauge.data_points { - point.time_unix_nano = rng.random(); + point.time_unix_nano = self.tick * TIME_INCREMENT_NANOS; if let Some(value) = &mut point.value { match value { number_data_point::Value::AsDouble(v) => { @@ -324,15 +346,34 @@ impl<'a> SizedGenerator<'a> for OpentelemetryMetrics { } } Data::Sum(sum) => { + let is_accumulating = sum.is_monotonic; for point in &mut sum.data_points { - point.time_unix_nano = rng.random(); - if let Some(value) = &mut point.value { - match value { - number_data_point::Value::AsDouble(v) => { - *v = rng.random(); + point.time_unix_nano = self.tick * TIME_INCREMENT_NANOS; + if is_accumulating { + // For accumulating sums, monotonically + // increase by some factor of `tick_diff` + if let Some(value) = &mut point.value { + match value { + number_data_point::Value::AsDouble(v) => { + *v += self.incr_f; + } + #[allow(clippy::cast_possible_wrap)] + number_data_point::Value::AsInt(v) => { + *v += self.incr_i; + } } - number_data_point::Value::AsInt(v) => { - *v = rng.random(); + } + } else { + // For non-accumulating sums, use random + // values + if let Some(value) = &mut point.value { + match value { + number_data_point::Value::AsDouble(v) => { + *v = rng.random(); + } + number_data_point::Value::AsInt(v) => { + *v = rng.random(); + } } } } @@ -425,7 +466,7 @@ mod test { use prost::Message; use rand::{SeedableRng, rngs::SmallRng}; use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, hash::{DefaultHasher, Hash, Hasher}, }; @@ -823,6 +864,214 @@ mod test { } } + // Property: timestamps are monotonic + proptest! { + #[test] + fn timestamps_increase_monotonically( + seed: u64, + total_contexts in 1..1_000_u32, + attributes_per_resource in 0..20_u8, + scopes_per_resource in 0..20_u8, + attributes_per_scope in 0..20_u8, + metrics_per_scope in 0..20_u8, + attributes_per_metric in 0..10_u8, + steps in 1..u8::MAX, + budget in SMALLEST_PROTOBUF..2048_usize, + ) { + let config = Config { + contexts: Contexts { + total_contexts: ConfRange::Constant(total_contexts), + attributes_per_resource: ConfRange::Constant(attributes_per_resource), + scopes_per_resource: ConfRange::Constant(scopes_per_resource), + attributes_per_scope: ConfRange::Constant(attributes_per_scope), + metrics_per_scope: ConfRange::Constant(metrics_per_scope), + attributes_per_metric: ConfRange::Constant(attributes_per_metric), + }, + ..Default::default() + }; + + let mut budget = budget; + let mut rng = SmallRng::seed_from_u64(seed); + let mut otel_metrics = OpentelemetryMetrics::new(config, &mut rng)?; + + let mut timestamps_by_metric: HashMap> = HashMap::new(); + + for _ in 0..steps { + if let Ok(resource_metric) = otel_metrics.generate(&mut rng, &mut budget) { + for scope_metric in &resource_metric.scope_metrics { + for metric in &scope_metric.metrics { + let id = context_id(&resource_metric); + + if let Some(data) = &metric.data { + match data { + metric::Data::Gauge(gauge) => { + for point in &gauge.data_points { + timestamps_by_metric + .entry(id) + .or_insert_with(Vec::new) + .push(point.time_unix_nano); + } + }, + metric::Data::Sum(sum) => { + for point in &sum.data_points { + timestamps_by_metric + .entry(id) + .or_insert_with(Vec::new) + .push(point.time_unix_nano); + } + }, + _ => {}, + } + } + } + } + } + } + + if !timestamps_by_metric.is_empty() { + // For each metric, verify its timestamps increase monotonically + for (metric_id, timestamps) in ×tamps_by_metric { + if timestamps.len() > 1 { + for i in 1..timestamps.len() { + let current = timestamps[i]; + let previous = timestamps[i-1]; // safety: iterator begins at 1 + prop_assert!( + current >= previous, + "Timestamp for metric {metric_id} did not increase monotonically: current={current}, previous={previous}", + ); + } + } + } + } + } + } + + // Property: tick tally in OpentelemetryMetrics increase with calls to + // `generate`. + proptest! { + #[test] + fn increasing_ticks( + seed: u64, + total_contexts in 1..1_000_u32, + attributes_per_resource in 0..20_u8, + scopes_per_resource in 0..20_u8, + attributes_per_scope in 0..20_u8, + metrics_per_scope in 0..20_u8, + attributes_per_metric in 0..10_u8, + budget in SMALLEST_PROTOBUF..4098_usize, + ) { + let config = Config { + contexts: Contexts { + total_contexts: ConfRange::Constant(total_contexts), + attributes_per_resource: ConfRange::Constant(attributes_per_resource), + scopes_per_resource: ConfRange::Constant(scopes_per_resource), + attributes_per_scope: ConfRange::Constant(attributes_per_scope), + metrics_per_scope: ConfRange::Constant(metrics_per_scope), + attributes_per_metric: ConfRange::Constant(attributes_per_metric), + }, + metric_weights: super::MetricWeights { + gauge: 0, // Only generate sum metrics + sum: 100, + }, + }; + + let mut budget = budget; + let mut rng = SmallRng::seed_from_u64(seed); + let mut otel_metrics = OpentelemetryMetrics::new(config, &mut rng)?; + let prev = otel_metrics.tick; + let _ = otel_metrics.generate(&mut rng, &mut budget); + let cur = otel_metrics.tick; + + prop_assert!(cur > prev, "Ticks did not advance properly: current: {cur}, previous: {prev}"); + } + } + + // Property: accumulated sums are monotonic + proptest! { + #[test] + fn accumulating_sums_only_increase( + seed: u64, + total_contexts in 1..1_000_u32, + attributes_per_resource in 0..20_u8, + scopes_per_resource in 0..20_u8, + attributes_per_scope in 0..20_u8, + metrics_per_scope in 0..20_u8, + attributes_per_metric in 0..10_u8, + budget in SMALLEST_PROTOBUF..512_usize, // see note below about repetition + ) { + let config = Config { + contexts: Contexts { + total_contexts: ConfRange::Constant(total_contexts), + attributes_per_resource: ConfRange::Constant(attributes_per_resource), + scopes_per_resource: ConfRange::Constant(scopes_per_resource), + attributes_per_scope: ConfRange::Constant(attributes_per_scope), + metrics_per_scope: ConfRange::Constant(metrics_per_scope), + attributes_per_metric: ConfRange::Constant(attributes_per_metric), + }, + metric_weights: super::MetricWeights { + gauge: 0, // Only generate sum metrics + sum: 100, + }, + }; + + let mut rng = SmallRng::seed_from_u64(seed); + let mut otel_metrics = OpentelemetryMetrics::new(config, &mut rng)?; + + let mut values: HashMap = HashMap::new(); + + // Generate the initial batch of values. It's entirely possible we + // will not run into the same context_ids again. Please run this + // test with a very high number of cases. We have arbitrarily + // constrained the budget relative to other tests in this module for + // try and force repetition of contexts. + let mut budget = budget; + { + if let Ok(resource_metrics) = otel_metrics.generate(&mut rng, &mut budget) { + let id = context_id(&resource_metrics); + for scope_metric in &resource_metrics.scope_metrics { + for metric in &scope_metric.metrics { + if let Some(metric::Data::Sum(sum)) = &metric.data { + if sum.is_monotonic { + for point in sum.data_points.iter() { + if let Some(number_data_point::Value::AsDouble(v)) = point.value { + values.insert(id, v); + } + } + } + } + } + } + } + } + + let mut budget = budget; + { + if let Ok(resource_metrics) = otel_metrics.generate(&mut rng, &mut budget) { + let id = context_id(&resource_metrics); + for scope_metric in &resource_metrics.scope_metrics { + for metric in &scope_metric.metrics { + if let Some(metric::Data::Sum(sum)) = &metric.data { + if sum.is_monotonic { + for point in sum.data_points.iter() { + if let Some(number_data_point::Value::AsDouble(v)) = point.value { + if let Some(&previous_value) = values.get(&id) { + prop_assert!( + v >= previous_value, + "Monotonic sum decreased for {id}: previous={previous_value}, current={v}", + ); + } + values.insert(id, v); + } + } + } + } + } + } + } + } + } + } + #[test] fn smallest_protobuf() { let data_point = NumberDataPoint { diff --git a/lading_payload/src/opentelemetry_metric/templates.rs b/lading_payload/src/opentelemetry_metric/templates.rs index 5aa20d275..f13d81d9c 100644 --- a/lading_payload/src/opentelemetry_metric/templates.rs +++ b/lading_payload/src/opentelemetry_metric/templates.rs @@ -117,8 +117,8 @@ impl Distribution for StandardUniform { R: Rng + ?Sized, { let value = match rng.random_range(0..=1) { - 0 => metrics::v1::number_data_point::Value::AsDouble(rng.random()), - 1 => metrics::v1::number_data_point::Value::AsInt(rng.random()), + 0 => metrics::v1::number_data_point::Value::AsDouble(0.0), + 1 => metrics::v1::number_data_point::Value::AsInt(0), _ => unreachable!(), };