From 507fe1cd1ccbf5d9e187eacc662d4c163e7634a7 Mon Sep 17 00:00:00 2001 From: jerbly Date: Sun, 4 May 2025 14:07:15 -0400 Subject: [PATCH 01/15] Initial metric support --- crates/weaver_live_check/src/advice.rs | 19 ++++- crates/weaver_live_check/src/lib.rs | 10 +++ crates/weaver_live_check/src/live_checker.rs | 19 ++++- crates/weaver_live_check/src/sample_metric.rs | 75 +++++++++++++++++++ src/registry/otlp/metric_conversion.rs | 41 ++++++++++ src/registry/otlp/mod.rs | 1 + src/registry/otlp/otlp_ingester.rs | 42 ++++++++++- 7 files changed, 200 insertions(+), 7 deletions(-) create mode 100644 crates/weaver_live_check/src/sample_metric.rs create mode 100644 src/registry/otlp/metric_conversion.rs diff --git a/crates/weaver_live_check/src/advice.rs b/crates/weaver_live_check/src/advice.rs index 7ba659d2..b4bf7479 100644 --- a/crates/weaver_live_check/src/advice.rs +++ b/crates/weaver_live_check/src/advice.rs @@ -89,7 +89,7 @@ impl Advisor for StabilityAdvisor { &mut self, sample: SampleRef<'_>, registry_attribute: Option<&Attribute>, - _registry_group: Option<&ResolvedGroup>, + registry_group: Option<&ResolvedGroup>, ) -> Result, Error> { match sample { SampleRef::Attribute(_sample_attribute) => { @@ -109,6 +109,23 @@ impl Advisor for StabilityAdvisor { } Ok(advices) } + SampleRef::Metric(_sample_metric) => { + let mut advices = Vec::new(); + if let Some(group) = registry_group { + match group.stability { + Some(ref stability) if *stability != Stability::Stable => { + advices.push(Advice { + advice_type: "stability".to_owned(), + value: Value::String(stability.to_string()), + message: "Is not stable".to_owned(), + advice_level: AdviceLevel::Improvement, + }); + } + _ => {} + } + } + Ok(advices) + } _ => Ok(Vec::new()), } } diff --git a/crates/weaver_live_check/src/lib.rs b/crates/weaver_live_check/src/lib.rs index ee79013d..8068c9a8 100644 --- a/crates/weaver_live_check/src/lib.rs +++ b/crates/weaver_live_check/src/lib.rs @@ -7,6 +7,7 @@ use std::collections::HashMap; use live_checker::LiveChecker; use miette::Diagnostic; use sample_attribute::SampleAttribute; +use sample_metric::SampleMetric; use sample_resource::SampleResource; use sample_span::{SampleSpan, SampleSpanEvent, SampleSpanLink}; use serde::{Deserialize, Serialize}; @@ -24,6 +25,8 @@ pub mod json_stdin_ingester; pub mod live_checker; /// The intermediary format for attributes pub mod sample_attribute; +/// The intermediary format for metrics +pub mod sample_metric; /// An intermediary format for resources pub mod sample_resource; /// The intermediary format for spans @@ -37,6 +40,8 @@ pub mod text_stdin_ingester; pub const MISSING_ATTRIBUTE_ADVICE_TYPE: &str = "missing_attribute"; /// Template Attribute advice type pub const TEMPLATE_ATTRIBUTE_ADVICE_TYPE: &str = "template_attribute"; +/// Missing Metric advice type +pub const MISSING_METRIC_ADVICE_TYPE: &str = "missing_metric"; /// Weaver live check errors #[derive(thiserror::Error, Debug, Clone, PartialEq, Serialize, Diagnostic)] @@ -94,6 +99,8 @@ pub enum Sample { SpanLink(SampleSpanLink), /// A sample resource Resource(SampleResource), + /// A sample metric + Metric(SampleMetric), } /// Represents a sample entity with a reference to the inner type @@ -110,6 +117,8 @@ pub enum SampleRef<'a> { SpanLink(&'a SampleSpanLink), /// A sample resource Resource(&'a SampleResource), + /// A sample metric + Metric(&'a SampleMetric), } // Dispatch the live check to the sample type @@ -125,6 +134,7 @@ impl LiveCheckRunner for Sample { Sample::SpanEvent(span_event) => span_event.run_live_check(live_checker, stats), Sample::SpanLink(span_link) => span_link.run_live_check(live_checker, stats), Sample::Resource(resource) => resource.run_live_check(live_checker, stats), + Sample::Metric(metric) => metric.run_live_check(live_checker, stats), } } } diff --git a/crates/weaver_live_check/src/live_checker.rs b/crates/weaver_live_check/src/live_checker.rs index 2c2e2cb9..24526046 100644 --- a/crates/weaver_live_check/src/live_checker.rs +++ b/crates/weaver_live_check/src/live_checker.rs @@ -4,9 +4,9 @@ use serde::Serialize; use std::collections::HashMap; -use weaver_semconv::attribute::AttributeType; +use weaver_semconv::{attribute::AttributeType, group::GroupType, semconv}; -use weaver_forge::registry::ResolvedRegistry; +use weaver_forge::registry::{ResolvedGroup, ResolvedRegistry}; use weaver_resolved_schema::attribute::Attribute; use crate::advice::Advisor; @@ -18,6 +18,7 @@ pub struct LiveChecker { pub registry: ResolvedRegistry, semconv_attributes: HashMap, semconv_templates: HashMap, + semconv_metrics: HashMap, /// The advisors to run #[serde(skip)] pub advisors: Vec>, @@ -33,8 +34,15 @@ impl LiveChecker { let mut semconv_attributes = HashMap::new(); let mut semconv_templates = HashMap::new(); let mut templates_by_length = Vec::new(); + // Hashmap of metrics by name + let mut semconv_metrics = HashMap::new(); for group in ®istry.groups { + if group.r#type == GroupType::Metric { + if let Some(metric_name) = &group.metric_name { + let _ = semconv_metrics.insert(metric_name.clone(), group.clone()); + } + } for attribute in &group.attributes { match attribute.r#type { AttributeType::Template(_) => { @@ -56,6 +64,7 @@ impl LiveChecker { registry, semconv_attributes, semconv_templates, + semconv_metrics, advisors, templates_by_length, } @@ -72,6 +81,12 @@ impl LiveChecker { self.semconv_attributes.get(name) } + /// Find a metric in the registry + #[must_use] + pub fn find_metric(&self, name: &str) -> Option<&ResolvedGroup> { + self.semconv_metrics.get(name) + } + /// Find a template in the registry #[must_use] pub fn find_template(&self, attribute_name: &str) -> Option<&Attribute> { diff --git a/crates/weaver_live_check/src/sample_metric.rs b/crates/weaver_live_check/src/sample_metric.rs new file mode 100644 index 00000000..3cfb290d --- /dev/null +++ b/crates/weaver_live_check/src/sample_metric.rs @@ -0,0 +1,75 @@ +// SPDX-License-Identifier: Apache-2.0 + +//! Intermediary format for telemetry sample spans + +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use weaver_checker::violation::{Advice, AdviceLevel}; + +use crate::{ + live_checker::LiveChecker, sample_attribute::SampleAttribute, Error, LiveCheckResult, + LiveCheckRunner, LiveCheckStatistics, SampleRef, MISSING_METRIC_ADVICE_TYPE, +}; + +/// Represents a sample telemetry span parsed from any source +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SampleMetric { + /// Metric name. + pub name: String, + /// Set of attributes + // #[serde(default)] + // pub attributes: Vec, + /// Type of the metric (e.g. gauge, histogram, ...). + pub instrument: Instrument, + /// Unit of the metric. + pub unit: String, + /// Live check result + pub live_check_result: Option, +} + +/// The type of the metric. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Instrument { + /// An up-down counter metric. + UpDownCounter, + /// A counter metric. + Counter, + /// A gauge metric. + Gauge, + /// A histogram metric. + Histogram, +} + +impl LiveCheckRunner for SampleMetric { + fn run_live_check( + &mut self, + live_checker: &mut LiveChecker, + stats: &mut LiveCheckStatistics, + ) -> Result<(), Error> { + let mut result = LiveCheckResult::new(); + // find the metric in the registry + let semconv_metric = if let Some(group) = live_checker.find_metric(&self.name) { + Some(group.clone()) + } else { + result.add_advice(Advice { + advice_type: MISSING_METRIC_ADVICE_TYPE.to_owned(), + value: Value::String(self.name.clone()), + message: "Does not exist in the registry".to_owned(), + advice_level: AdviceLevel::Violation, + }); + None + }; + for advisor in live_checker.advisors.iter_mut() { + let advice_list = + advisor.advise(SampleRef::Metric(self), None, semconv_metric.as_ref())?; + result.add_advice_list(advice_list); + } + // TODO for each data point... + + self.live_check_result = Some(result); + stats.inc_entity_count("metric"); + stats.maybe_add_live_check_result(self.live_check_result.as_ref()); + Ok(()) + } +} diff --git a/src/registry/otlp/metric_conversion.rs b/src/registry/otlp/metric_conversion.rs new file mode 100644 index 00000000..2d2d168f --- /dev/null +++ b/src/registry/otlp/metric_conversion.rs @@ -0,0 +1,41 @@ +// SPDX-License-Identifier: Apache-2.0 + +//! Conversion routines for OTLP to Sample + +use weaver_live_check::sample_metric::{Instrument, SampleMetric}; + +use super::{ + grpc_stubs::proto::metrics::v1::{metric::Data, Metric}, + Error, +}; + +/// Converts an OTLP metric to a SampleMetric +pub fn otlp_metric_to_sample(otlp_metric: Metric) -> SampleMetric { + SampleMetric { + name: otlp_metric.name, + instrument: otlp_data_to_instrument(otlp_metric.data), + unit: otlp_metric.unit, + live_check_result: None, + } +} + +/// Converts OTLP data to a SampleMetric instrument +/// Mapping: +/// counter → Sum with is_monotonic: true +/// updowncounter → Sum with is_monotonic: false +/// gauge → Gauge +/// histogram → Histogram +fn otlp_data_to_instrument(data: Option) -> Instrument { + match data { + Some(Data::Sum(sum)) => { + if sum.is_monotonic { + Instrument::Counter + } else { + Instrument::UpDownCounter + } + } + Some(Data::Gauge(_)) => Instrument::Gauge, + Some(Data::Histogram(_)) => Instrument::Histogram, + _ => Instrument::Gauge, // TODO Default to Gauge if unknown? + } +} diff --git a/src/registry/otlp/mod.rs b/src/registry/otlp/mod.rs index 6a82b3fa..9a6abee1 100644 --- a/src/registry/otlp/mod.rs +++ b/src/registry/otlp/mod.rs @@ -2,6 +2,7 @@ //! A basic OTLP receiver integrated into Weaver. +pub mod metric_conversion; pub mod otlp_ingester; use grpc_stubs::proto::collector::logs::v1::logs_service_server::{LogsService, LogsServiceServer}; diff --git a/src/registry/otlp/otlp_ingester.rs b/src/registry/otlp/otlp_ingester.rs index fd41f5e8..50d30817 100644 --- a/src/registry/otlp/otlp_ingester.rs +++ b/src/registry/otlp/otlp_ingester.rs @@ -1,3 +1,6 @@ +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP ingester use std::time::Duration; use log::info; @@ -13,7 +16,9 @@ use weaver_semconv::group::SpanKindSpec; use super::{ grpc_stubs::proto::common::v1::{AnyValue, KeyValue}, - listen_otlp_requests, OtlpRequest, + listen_otlp_requests, + metric_conversion::otlp_metric_to_sample, + OtlpRequest, }; /// An ingester for OTLP data @@ -102,9 +107,38 @@ impl OtlpIterator { // TODO Implement the checking logic for logs Some(0) } - OtlpRequest::Metrics(_metrics) => { - // TODO Implement the checking logic for metrics - Some(0) + OtlpRequest::Metrics(metrics) => { + for resource_metric in metrics.resource_metrics { + if let Some(resource) = resource_metric.resource { + let mut sample_resource = SampleResource { + attributes: Vec::new(), + live_check_result: None, + }; + for attribute in resource.attributes { + sample_resource + .attributes + .push(Self::sample_attribute_from_key_value(&attribute)); + } + self.buffer.push(Sample::Resource(sample_resource)); + } + + for scope_metric in resource_metric.scope_metrics { + if let Some(scope) = scope_metric.scope { + // TODO SampleInstrumentationScope? + for attribute in scope.attributes { + self.buffer.push(Sample::Attribute( + Self::sample_attribute_from_key_value(&attribute), + )); + } + } + + for metric in scope_metric.metrics { + let sample_metric = Sample::Metric(otlp_metric_to_sample(metric)); + self.buffer.push(sample_metric); + } + } + } + Some(self.buffer.len()) } OtlpRequest::Traces(trace) => { for resource_span in trace.resource_spans { From 0882e88be1a06cefe6524eca5e81751eab83533f Mon Sep 17 00:00:00 2001 From: jerbly Date: Sun, 4 May 2025 15:31:34 -0400 Subject: [PATCH 02/15] internal advisor support for basic metrics --- crates/weaver_live_check/src/advice.rs | 86 ++++++++++++++----- crates/weaver_live_check/src/lib.rs | 3 + crates/weaver_live_check/src/live_checker.rs | 33 +++---- .../weaver_live_check/src/sample_attribute.rs | 12 ++- crates/weaver_live_check/src/sample_metric.rs | 23 +---- src/registry/otlp/metric_conversion.rs | 15 ++-- 6 files changed, 104 insertions(+), 68 deletions(-) diff --git a/crates/weaver_live_check/src/advice.rs b/crates/weaver_live_check/src/advice.rs index b4bf7479..6f55a370 100644 --- a/crates/weaver_live_check/src/advice.rs +++ b/crates/weaver_live_check/src/advice.rs @@ -2,9 +2,9 @@ //! Builtin advisors -use std::{collections::BTreeMap, path::PathBuf}; +use std::{collections::BTreeMap, path::PathBuf, rc::Rc}; -use serde::Serialize; +use serde::{de, Serialize}; use serde_json::Value; use weaver_checker::{ violation::{Advice, AdviceLevel, Violation}, @@ -37,19 +37,27 @@ pub trait Advisor { fn advise( &mut self, sample: SampleRef<'_>, - registry_attribute: Option<&Attribute>, - registry_group: Option<&ResolvedGroup>, + registry_attribute: Option<&Rc>, + registry_group: Option<&Rc>, ) -> Result, Error>; } +fn deprecated_to_value(deprecated: &Deprecated) -> Value { + match deprecated { + Deprecated::Renamed { .. } => Value::String("renamed".to_owned()), + Deprecated::Obsoleted { .. } => Value::String("obsoleted".to_owned()), + Deprecated::Uncategorized { .. } => Value::String("uncategorized".to_owned()), + } +} + /// An advisor that checks if an attribute is deprecated pub struct DeprecatedAdvisor; impl Advisor for DeprecatedAdvisor { fn advise( &mut self, sample: SampleRef<'_>, - registry_attribute: Option<&Attribute>, - _registry_group: Option<&ResolvedGroup>, + registry_attribute: Option<&Rc>, + registry_group: Option<&Rc>, ) -> Result, Error> { match sample { SampleRef::Attribute(_sample_attribute) => { @@ -58,15 +66,21 @@ impl Advisor for DeprecatedAdvisor { if let Some(deprecated) = &attribute.deprecated { advices.push(Advice { advice_type: "deprecated".to_owned(), - value: match deprecated { - Deprecated::Renamed { .. } => Value::String("renamed".to_owned()), - Deprecated::Obsoleted { .. } => { - Value::String("obsoleted".to_owned()) - } - Deprecated::Uncategorized { .. } => { - Value::String("uncategorized".to_owned()) - } - }, + value: deprecated_to_value(deprecated), + message: deprecated.to_string(), + advice_level: AdviceLevel::Violation, + }); + } + } + Ok(advices) + } + SampleRef::Metric(_sample_metric) => { + let mut advices = Vec::new(); + if let Some(group) = registry_group { + if let Some(deprecated) = &group.deprecated { + advices.push(Advice { + advice_type: "deprecated".to_owned(), + value: deprecated_to_value(deprecated), message: deprecated.to_string(), advice_level: AdviceLevel::Violation, }); @@ -88,8 +102,8 @@ impl Advisor for StabilityAdvisor { fn advise( &mut self, sample: SampleRef<'_>, - registry_attribute: Option<&Attribute>, - registry_group: Option<&ResolvedGroup>, + registry_attribute: Option<&Rc>, + registry_group: Option<&Rc>, ) -> Result, Error> { match sample { SampleRef::Attribute(_sample_attribute) => { @@ -137,8 +151,8 @@ impl Advisor for TypeAdvisor { fn advise( &mut self, sample: SampleRef<'_>, - registry_attribute: Option<&Attribute>, - _registry_group: Option<&ResolvedGroup>, + registry_attribute: Option<&Rc>, + registry_group: Option<&Rc>, ) -> Result, Error> { match sample { SampleRef::Attribute(sample_attribute) => { @@ -195,6 +209,32 @@ impl Advisor for TypeAdvisor { _ => Ok(Vec::new()), } } + SampleRef::Metric(sample_metric) => { + let mut advice_list = Vec::new(); + if let Some(semconv_metric) = registry_group { + if let Some(semconv_instrument) = &semconv_metric.instrument { + if semconv_instrument != &sample_metric.instrument { + advice_list.push(Advice { + advice_type: "instrument_mismatch".to_owned(), + value: Value::String(sample_metric.instrument.to_string()), + message: format!("Instrument should be `{}`", semconv_instrument), + advice_level: AdviceLevel::Violation, + }); + } + } + if let Some(semconv_unit) = &semconv_metric.unit { + if semconv_unit != &sample_metric.unit { + advice_list.push(Advice { + advice_type: "unit_mismatch".to_owned(), + value: Value::String(sample_metric.unit.clone()), + message: format!("Unit should be `{}`", semconv_unit), + advice_level: AdviceLevel::Violation, + }); + } + } + } + Ok(advice_list) + } _ => Ok(Vec::new()), } } @@ -206,8 +246,8 @@ impl Advisor for EnumAdvisor { fn advise( &mut self, sample: SampleRef<'_>, - registry_attribute: Option<&Attribute>, - _registry_group: Option<&ResolvedGroup>, + registry_attribute: Option<&Rc>, + _registry_group: Option<&Rc>, ) -> Result, Error> { match sample { SampleRef::Attribute(sample_attribute) => { @@ -355,8 +395,8 @@ impl Advisor for RegoAdvisor { fn advise( &mut self, sample: SampleRef<'_>, - _registry_attribute: Option<&Attribute>, - _registry_group: Option<&ResolvedGroup>, + _registry_attribute: Option<&Rc>, + _registry_group: Option<&Rc>, ) -> Result, Error> { self.check(sample) } diff --git a/crates/weaver_live_check/src/lib.rs b/crates/weaver_live_check/src/lib.rs index 8068c9a8..c0b71acf 100644 --- a/crates/weaver_live_check/src/lib.rs +++ b/crates/weaver_live_check/src/lib.rs @@ -3,6 +3,7 @@ //! This crate provides the weaver_live_check library use std::collections::HashMap; +use std::rc::Rc; use live_checker::LiveChecker; use miette::Diagnostic; @@ -14,6 +15,8 @@ use serde::{Deserialize, Serialize}; use weaver_checker::violation::{Advice, AdviceLevel}; use weaver_common::diagnostic::{DiagnosticMessage, DiagnosticMessages}; use weaver_forge::registry::ResolvedRegistry; +use weaver_forge::registry::ResolvedGroup; +use weaver_resolved_schema::attribute::Attribute; /// Advisors for live checks pub mod advice; diff --git a/crates/weaver_live_check/src/live_checker.rs b/crates/weaver_live_check/src/live_checker.rs index 24526046..5292204c 100644 --- a/crates/weaver_live_check/src/live_checker.rs +++ b/crates/weaver_live_check/src/live_checker.rs @@ -4,6 +4,7 @@ use serde::Serialize; use std::collections::HashMap; +use std::rc::Rc; use weaver_semconv::{attribute::AttributeType, group::GroupType, semconv}; use weaver_forge::registry::{ResolvedGroup, ResolvedRegistry}; @@ -16,14 +17,14 @@ use crate::advice::Advisor; pub struct LiveChecker { /// The resolved registry pub registry: ResolvedRegistry, - semconv_attributes: HashMap, - semconv_templates: HashMap, - semconv_metrics: HashMap, + semconv_attributes: HashMap>, + semconv_templates: HashMap>, + semconv_metrics: HashMap>, /// The advisors to run #[serde(skip)] pub advisors: Vec>, #[serde(skip)] - templates_by_length: Vec<(String, Attribute)>, + templates_by_length: Vec<(String, Rc)>, } impl LiveChecker { @@ -40,18 +41,20 @@ impl LiveChecker { for group in ®istry.groups { if group.r#type == GroupType::Metric { if let Some(metric_name) = &group.metric_name { - let _ = semconv_metrics.insert(metric_name.clone(), group.clone()); + let group_rc = Rc::new(group.clone()); + let _ = semconv_metrics.insert(metric_name.clone(), group_rc); } } for attribute in &group.attributes { + let attribute_rc = Rc::new(attribute.clone()); match attribute.r#type { AttributeType::Template(_) => { - templates_by_length.push((attribute.name.clone(), attribute.clone())); - let _ = semconv_templates.insert(attribute.name.clone(), attribute.clone()); + templates_by_length + .push((attribute.name.clone(), Rc::clone(&attribute_rc))); + let _ = semconv_templates.insert(attribute.name.clone(), attribute_rc); } _ => { - let _ = - semconv_attributes.insert(attribute.name.clone(), attribute.clone()); + let _ = semconv_attributes.insert(attribute.name.clone(), attribute_rc); } } } @@ -77,23 +80,23 @@ impl LiveChecker { /// Find an attribute in the registry #[must_use] - pub fn find_attribute(&self, name: &str) -> Option<&Attribute> { - self.semconv_attributes.get(name) + pub fn find_attribute(&self, name: &str) -> Option> { + self.semconv_attributes.get(name).map(Rc::clone) } /// Find a metric in the registry #[must_use] - pub fn find_metric(&self, name: &str) -> Option<&ResolvedGroup> { - self.semconv_metrics.get(name) + pub fn find_metric(&self, name: &str) -> Option> { + self.semconv_metrics.get(name).map(Rc::clone) } /// Find a template in the registry #[must_use] - pub fn find_template(&self, attribute_name: &str) -> Option<&Attribute> { + pub fn find_template(&self, attribute_name: &str) -> Option> { // Use the pre-sorted list to find the first (longest) matching template for (template_name, attribute) in &self.templates_by_length { if attribute_name.starts_with(template_name) { - return Some(attribute); + return Some(Rc::clone(attribute)); } } None diff --git a/crates/weaver_live_check/src/sample_attribute.rs b/crates/weaver_live_check/src/sample_attribute.rs index e9dcfad6..3f5296c5 100644 --- a/crates/weaver_live_check/src/sample_attribute.rs +++ b/crates/weaver_live_check/src/sample_attribute.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; +use std::rc::Rc; use weaver_checker::violation::{Advice, AdviceLevel}; use weaver_semconv::attribute::{AttributeType, PrimitiveOrArrayTypeSpec}; @@ -171,9 +172,9 @@ impl LiveCheckRunner for SampleAttribute { // find the attribute in the registry let semconv_attribute = { if let Some(attribute) = live_checker.find_attribute(&self.name) { - Some(attribute.clone()) + Some(attribute) } else { - live_checker.find_template(&self.name).cloned() + live_checker.find_template(&self.name) } }; @@ -200,8 +201,11 @@ impl LiveCheckRunner for SampleAttribute { // run advisors on the attribute for advisor in live_checker.advisors.iter_mut() { - let advice_list = - advisor.advise(SampleRef::Attribute(self), semconv_attribute.as_ref(), None)?; + let advice_list = advisor.advise( + SampleRef::Attribute(self), + semconv_attribute.as_ref(), + None, + )?; result.add_advice_list(advice_list); } self.live_check_result = Some(result); diff --git a/crates/weaver_live_check/src/sample_metric.rs b/crates/weaver_live_check/src/sample_metric.rs index 3cfb290d..9d64dae3 100644 --- a/crates/weaver_live_check/src/sample_metric.rs +++ b/crates/weaver_live_check/src/sample_metric.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use weaver_checker::violation::{Advice, AdviceLevel}; +use weaver_semconv::group::InstrumentSpec; use crate::{ live_checker::LiveChecker, sample_attribute::SampleAttribute, Error, LiveCheckResult, @@ -20,27 +21,13 @@ pub struct SampleMetric { // #[serde(default)] // pub attributes: Vec, /// Type of the metric (e.g. gauge, histogram, ...). - pub instrument: Instrument, + pub instrument: InstrumentSpec, /// Unit of the metric. pub unit: String, /// Live check result pub live_check_result: Option, } -/// The type of the metric. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -pub enum Instrument { - /// An up-down counter metric. - UpDownCounter, - /// A counter metric. - Counter, - /// A gauge metric. - Gauge, - /// A histogram metric. - Histogram, -} - impl LiveCheckRunner for SampleMetric { fn run_live_check( &mut self, @@ -49,16 +36,14 @@ impl LiveCheckRunner for SampleMetric { ) -> Result<(), Error> { let mut result = LiveCheckResult::new(); // find the metric in the registry - let semconv_metric = if let Some(group) = live_checker.find_metric(&self.name) { - Some(group.clone()) - } else { + let semconv_metric = live_checker.find_metric(&self.name); + if semconv_metric.is_none() { result.add_advice(Advice { advice_type: MISSING_METRIC_ADVICE_TYPE.to_owned(), value: Value::String(self.name.clone()), message: "Does not exist in the registry".to_owned(), advice_level: AdviceLevel::Violation, }); - None }; for advisor in live_checker.advisors.iter_mut() { let advice_list = diff --git a/src/registry/otlp/metric_conversion.rs b/src/registry/otlp/metric_conversion.rs index 2d2d168f..11b34270 100644 --- a/src/registry/otlp/metric_conversion.rs +++ b/src/registry/otlp/metric_conversion.rs @@ -2,7 +2,8 @@ //! Conversion routines for OTLP to Sample -use weaver_live_check::sample_metric::{Instrument, SampleMetric}; +use weaver_live_check::sample_metric::SampleMetric; +use weaver_semconv::group::InstrumentSpec; use super::{ grpc_stubs::proto::metrics::v1::{metric::Data, Metric}, @@ -25,17 +26,17 @@ pub fn otlp_metric_to_sample(otlp_metric: Metric) -> SampleMetric { /// updowncounter → Sum with is_monotonic: false /// gauge → Gauge /// histogram → Histogram -fn otlp_data_to_instrument(data: Option) -> Instrument { +fn otlp_data_to_instrument(data: Option) -> InstrumentSpec { match data { Some(Data::Sum(sum)) => { if sum.is_monotonic { - Instrument::Counter + InstrumentSpec::Counter } else { - Instrument::UpDownCounter + InstrumentSpec::UpDownCounter } } - Some(Data::Gauge(_)) => Instrument::Gauge, - Some(Data::Histogram(_)) => Instrument::Histogram, - _ => Instrument::Gauge, // TODO Default to Gauge if unknown? + Some(Data::Gauge(_)) => InstrumentSpec::Gauge, + Some(Data::Histogram(_)) => InstrumentSpec::Histogram, + _ => InstrumentSpec::Gauge, // TODO Default to Gauge if unknown? } } From 1a9b3e138e39f513af4e957972f5e6f3bd45743d Mon Sep 17 00:00:00 2001 From: jerbly Date: Sun, 4 May 2025 15:55:01 -0400 Subject: [PATCH 03/15] initial ansi output for metrics --- crates/weaver_live_check/src/advice.rs | 2 +- crates/weaver_live_check/src/lib.rs | 3 --- crates/weaver_live_check/src/live_checker.rs | 2 +- crates/weaver_live_check/src/sample_attribute.rs | 8 ++------ defaults/live_check_templates/ansi/live_check_macros.j2 | 7 +++++++ src/registry/otlp/metric_conversion.rs | 5 +---- 6 files changed, 12 insertions(+), 15 deletions(-) diff --git a/crates/weaver_live_check/src/advice.rs b/crates/weaver_live_check/src/advice.rs index 6f55a370..d3581a52 100644 --- a/crates/weaver_live_check/src/advice.rs +++ b/crates/weaver_live_check/src/advice.rs @@ -4,7 +4,7 @@ use std::{collections::BTreeMap, path::PathBuf, rc::Rc}; -use serde::{de, Serialize}; +use serde::Serialize; use serde_json::Value; use weaver_checker::{ violation::{Advice, AdviceLevel, Violation}, diff --git a/crates/weaver_live_check/src/lib.rs b/crates/weaver_live_check/src/lib.rs index c0b71acf..8068c9a8 100644 --- a/crates/weaver_live_check/src/lib.rs +++ b/crates/weaver_live_check/src/lib.rs @@ -3,7 +3,6 @@ //! This crate provides the weaver_live_check library use std::collections::HashMap; -use std::rc::Rc; use live_checker::LiveChecker; use miette::Diagnostic; @@ -15,8 +14,6 @@ use serde::{Deserialize, Serialize}; use weaver_checker::violation::{Advice, AdviceLevel}; use weaver_common::diagnostic::{DiagnosticMessage, DiagnosticMessages}; use weaver_forge::registry::ResolvedRegistry; -use weaver_forge::registry::ResolvedGroup; -use weaver_resolved_schema::attribute::Attribute; /// Advisors for live checks pub mod advice; diff --git a/crates/weaver_live_check/src/live_checker.rs b/crates/weaver_live_check/src/live_checker.rs index 5292204c..e5c4b4f0 100644 --- a/crates/weaver_live_check/src/live_checker.rs +++ b/crates/weaver_live_check/src/live_checker.rs @@ -5,7 +5,7 @@ use serde::Serialize; use std::collections::HashMap; use std::rc::Rc; -use weaver_semconv::{attribute::AttributeType, group::GroupType, semconv}; +use weaver_semconv::{attribute::AttributeType, group::GroupType}; use weaver_forge::registry::{ResolvedGroup, ResolvedRegistry}; use weaver_resolved_schema::attribute::Attribute; diff --git a/crates/weaver_live_check/src/sample_attribute.rs b/crates/weaver_live_check/src/sample_attribute.rs index 3f5296c5..6a19e67b 100644 --- a/crates/weaver_live_check/src/sample_attribute.rs +++ b/crates/weaver_live_check/src/sample_attribute.rs @@ -4,7 +4,6 @@ use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use std::rc::Rc; use weaver_checker::violation::{Advice, AdviceLevel}; use weaver_semconv::attribute::{AttributeType, PrimitiveOrArrayTypeSpec}; @@ -201,11 +200,8 @@ impl LiveCheckRunner for SampleAttribute { // run advisors on the attribute for advisor in live_checker.advisors.iter_mut() { - let advice_list = advisor.advise( - SampleRef::Attribute(self), - semconv_attribute.as_ref(), - None, - )?; + let advice_list = + advisor.advise(SampleRef::Attribute(self), semconv_attribute.as_ref(), None)?; result.add_advice_list(advice_list); } self.live_check_result = Some(result); diff --git a/defaults/live_check_templates/ansi/live_check_macros.j2 b/defaults/live_check_templates/ansi/live_check_macros.j2 index 821849de..bfe94417 100644 --- a/defaults/live_check_templates/ansi/live_check_macros.j2 +++ b/defaults/live_check_templates/ansi/live_check_macros.j2 @@ -72,6 +72,11 @@ {%- endfor %} {% endmacro %} +{% macro display_metric(metric, indent=0) %} +{{ " " * indent }}{{ ("Metric") | ansi_bright_cyan | ansi_bold }} {{ display_sample_header(metric.live_check_result.highest_advice_level, metric.name) }} `{{ metric.instrument }}`, `{{ metric.unit }}` +{{ display_advice(metric.live_check_result.all_advice, indent) }} +{%- endmacro %} + {%- macro display_sample(sample, indent=0) -%} {%- if sample.span is defined -%} {{ " " * indent }}{{ ("Span") | ansi_bright_cyan | ansi_bold }} {{ display_sample_header(sample.span.live_check_result.highest_advice_level, sample.span.name) }} `{{ sample.span.kind }}` @@ -93,6 +98,8 @@ {{ display_span_link(sample.span_link, indent) }} {%- elif sample.resource is defined -%} {{ display_resource(sample.resource, indent) }} +{%- elif sample.metric is defined -%} +{{ display_metric(sample.metric, indent) }} {%- endif -%} {%- endmacro -%} diff --git a/src/registry/otlp/metric_conversion.rs b/src/registry/otlp/metric_conversion.rs index 11b34270..c116f1ac 100644 --- a/src/registry/otlp/metric_conversion.rs +++ b/src/registry/otlp/metric_conversion.rs @@ -5,10 +5,7 @@ use weaver_live_check::sample_metric::SampleMetric; use weaver_semconv::group::InstrumentSpec; -use super::{ - grpc_stubs::proto::metrics::v1::{metric::Data, Metric}, - Error, -}; +use super::grpc_stubs::proto::metrics::v1::{metric::Data, Metric}; /// Converts an OTLP metric to a SampleMetric pub fn otlp_metric_to_sample(otlp_metric: Metric) -> SampleMetric { From da3112b2f78d1893459eb13d18760b54e15ee838 Mon Sep 17 00:00:00 2001 From: jerbly Date: Sun, 4 May 2025 20:26:43 -0400 Subject: [PATCH 04/15] number data points --- crates/weaver_live_check/src/advice.rs | 21 +++ crates/weaver_live_check/src/lib.rs | 4 +- crates/weaver_live_check/src/sample_metric.rs | 69 ++++++++- .../ansi/live_check_macros.j2 | 11 ++ src/registry/otlp/conversion.rs | 137 ++++++++++++++++++ src/registry/otlp/metric_conversion.rs | 39 ----- src/registry/otlp/mod.rs | 2 +- src/registry/otlp/otlp_ingester.rs | 81 ++--------- 8 files changed, 253 insertions(+), 111 deletions(-) create mode 100644 src/registry/otlp/conversion.rs delete mode 100644 src/registry/otlp/metric_conversion.rs diff --git a/crates/weaver_live_check/src/advice.rs b/crates/weaver_live_check/src/advice.rs index d3581a52..0f2420b9 100644 --- a/crates/weaver_live_check/src/advice.rs +++ b/crates/weaver_live_check/src/advice.rs @@ -235,6 +235,27 @@ impl Advisor for TypeAdvisor { } Ok(advice_list) } + SampleRef::NumberDataPoint(sample_number_data_point) => { + let mut advice_list = Vec::new(); + if let Some(semconv_metric) = registry_group { + for semconv_attribute in semconv_metric.attributes.iter() { + // Check if the attribute is present in the sample + if !sample_number_data_point + .attributes + .iter() + .any(|attribute| attribute.name == semconv_attribute.name) + { + advice_list.push(Advice { + advice_type: "attribute_required".to_owned(), + value: Value::String(semconv_attribute.name.clone()), + message: "Attribute is required".to_owned(), + advice_level: AdviceLevel::Violation, + }); + } + } + } + Ok(advice_list) + } _ => Ok(Vec::new()), } } diff --git a/crates/weaver_live_check/src/lib.rs b/crates/weaver_live_check/src/lib.rs index 8068c9a8..32ecba92 100644 --- a/crates/weaver_live_check/src/lib.rs +++ b/crates/weaver_live_check/src/lib.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use live_checker::LiveChecker; use miette::Diagnostic; use sample_attribute::SampleAttribute; -use sample_metric::SampleMetric; +use sample_metric::{NumberDataPoint, SampleMetric}; use sample_resource::SampleResource; use sample_span::{SampleSpan, SampleSpanEvent, SampleSpanLink}; use serde::{Deserialize, Serialize}; @@ -119,6 +119,8 @@ pub enum SampleRef<'a> { Resource(&'a SampleResource), /// A sample metric Metric(&'a SampleMetric), + /// A sample number data point + NumberDataPoint(&'a NumberDataPoint), } // Dispatch the live check to the sample type diff --git a/crates/weaver_live_check/src/sample_metric.rs b/crates/weaver_live_check/src/sample_metric.rs index 9d64dae3..44e13f46 100644 --- a/crates/weaver_live_check/src/sample_metric.rs +++ b/crates/weaver_live_check/src/sample_metric.rs @@ -12,6 +12,52 @@ use crate::{ LiveCheckRunner, LiveCheckStatistics, SampleRef, MISSING_METRIC_ADVICE_TYPE, }; +/// The data point types of a metric +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum DataPoints { + /// Number data points + Number(Vec), + /// Histogram data points + Histogram(Vec), + /// Exponential histogram data points + ExponentialHistogram(Vec), + /// Summary data points + Summary(Vec), +} + +/// Represents a single data point of a metric +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct NumberDataPoint { + /// The value of the data point + pub value: Value, + /// The attributes of the data point + pub attributes: Vec, + /// Live check result + pub live_check_result: Option, +} + +/// Represents a single histogram data point of a metric +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct HistogramDataPoint { + /// The attributes of the data point + pub attributes: Vec, +} + +/// Represents a single exponential histogram data point of a metric +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ExponentialHistogramDataPoint { + /// The attributes of the data point + pub attributes: Vec, +} + +/// Represents a single summary data point of a metric +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SummaryDataPoint { + /// The attributes of the data point + pub attributes: Vec, +} + /// Represents a sample telemetry span parsed from any source #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct SampleMetric { @@ -24,6 +70,8 @@ pub struct SampleMetric { pub instrument: InstrumentSpec, /// Unit of the metric. pub unit: String, + /// Data points of the metric. + pub data_points: Option, /// Live check result pub live_check_result: Option, } @@ -50,7 +98,26 @@ impl LiveCheckRunner for SampleMetric { advisor.advise(SampleRef::Metric(self), None, semconv_metric.as_ref())?; result.add_advice_list(advice_list); } - // TODO for each data point... + // Get advice for the data points + if let Some(DataPoints::Number(number_data_points)) = &mut self.data_points { + for point in number_data_points.iter_mut() { + // TODO Get advice on the data point + let mut point_result = LiveCheckResult::new(); + for advisor in live_checker.advisors.iter_mut() { + let advice_list = advisor.advise( + SampleRef::NumberDataPoint(point), + None, + semconv_metric.as_ref(), + )?; + point_result.add_advice_list(advice_list); + } + point.live_check_result = Some(point_result); + + for attribute in &mut point.attributes { + attribute.run_live_check(live_checker, stats)?; + } + } + } self.live_check_result = Some(result); stats.inc_entity_count("metric"); diff --git a/defaults/live_check_templates/ansi/live_check_macros.j2 b/defaults/live_check_templates/ansi/live_check_macros.j2 index bfe94417..33884f36 100644 --- a/defaults/live_check_templates/ansi/live_check_macros.j2 +++ b/defaults/live_check_templates/ansi/live_check_macros.j2 @@ -72,9 +72,20 @@ {%- endfor %} {% endmacro %} +{% macro display_data_point(data_point, indent=0) %} +{{ " " * indent }}{{ ("Data point") | ansi_bright_cyan | ansi_bold }} {{ data_point.value }} +{{ display_advice(data_point.live_check_result.all_advice, indent) }} +{%- for attribute in data_point.attributes %} +{{ display_attribute(attribute, indent + 4) }} +{%- endfor %} +{% endmacro %} + {% macro display_metric(metric, indent=0) %} {{ " " * indent }}{{ ("Metric") | ansi_bright_cyan | ansi_bold }} {{ display_sample_header(metric.live_check_result.highest_advice_level, metric.name) }} `{{ metric.instrument }}`, `{{ metric.unit }}` {{ display_advice(metric.live_check_result.all_advice, indent) }} +{%- for data_point in metric.data_points %} +{{ display_data_point(data_point, indent + 4) }} +{%- endfor %} {%- endmacro %} {%- macro display_sample(sample, indent=0) -%} diff --git a/src/registry/otlp/conversion.rs b/src/registry/otlp/conversion.rs new file mode 100644 index 00000000..95187974 --- /dev/null +++ b/src/registry/otlp/conversion.rs @@ -0,0 +1,137 @@ +// SPDX-License-Identifier: Apache-2.0 + +//! Conversion routines for OTLP to Sample + +use serde_json::{json, Value}; +use weaver_live_check::{ + sample_attribute::SampleAttribute, + sample_metric::{DataPoints, SampleMetric}, +}; +use weaver_semconv::group::{InstrumentSpec, SpanKindSpec}; + +use super::grpc_stubs::proto::{ + common::v1::{AnyValue, KeyValue}, + metrics::v1::{metric::Data, Metric, NumberDataPoint}, +}; + +fn maybe_to_json(value: Option) -> Option { + if let Some(value) = value { + if let Some(value) = value.value { + use crate::registry::otlp::grpc_stubs::proto::common::v1::any_value::Value as GrpcValue; + match value { + GrpcValue::StringValue(string) => Some(Value::String(string)), + GrpcValue::IntValue(int_value) => Some(Value::Number(int_value.into())), + GrpcValue::DoubleValue(double_value) => Some(json!(double_value)), + GrpcValue::BoolValue(bool_value) => Some(Value::Bool(bool_value)), + GrpcValue::ArrayValue(array_value) => { + let mut vec = Vec::new(); + for value in array_value.values { + if let Some(value) = maybe_to_json(Some(value)) { + vec.push(value); + } + } + Some(Value::Array(vec)) + } + _ => None, + } + } else { + None + } + } else { + None + } +} + +/// Converts an OTLP KeyValue to a SampleAttribute +pub fn sample_attribute_from_key_value(key_value: &KeyValue) -> SampleAttribute { + let value = maybe_to_json(key_value.value.clone()); + let r#type = match value { + Some(ref val) => SampleAttribute::infer_type(val), + None => None, + }; + SampleAttribute { + name: key_value.key.clone(), + value, + r#type, + live_check_result: None, + } +} + +/// Converts an OTLP span kind to a SpanKindSpec +pub fn span_kind_from_otlp_kind(kind: i32) -> SpanKindSpec { + match kind { + 2 => SpanKindSpec::Server, + 3 => SpanKindSpec::Client, + 4 => SpanKindSpec::Producer, + 5 => SpanKindSpec::Consumer, + _ => SpanKindSpec::Internal, + } +} + +/// Converts an OTLP metric to a SampleMetric +pub fn otlp_metric_to_sample(otlp_metric: Metric) -> SampleMetric { + SampleMetric { + name: otlp_metric.name, + instrument: otlp_data_to_instrument(&otlp_metric.data), + unit: otlp_metric.unit, + data_points: otlp_data_to_data_points(&otlp_metric.data), + live_check_result: None, + } +} + +/// Converts OTLP data to a SampleMetric instrument +/// Mapping: +/// counter → Sum with is_monotonic: true +/// updowncounter → Sum with is_monotonic: false +/// gauge → Gauge +/// histogram → Histogram +fn otlp_data_to_instrument(data: &Option) -> InstrumentSpec { + match data { + Some(Data::Sum(sum)) => { + if sum.is_monotonic { + InstrumentSpec::Counter + } else { + InstrumentSpec::UpDownCounter + } + } + Some(Data::Gauge(_)) => InstrumentSpec::Gauge, + Some(Data::Histogram(_)) => InstrumentSpec::Histogram, + _ => InstrumentSpec::Gauge, // TODO Default to Gauge if unknown? + } +} + +/// Converts OTLP data to SampleMetric data points +fn otlp_data_to_data_points(data: &Option) -> Option { + match data { + Some(Data::Sum(sum)) => Some(otlp_number_data_points(&sum.data_points)), + _ => None, + } +} + +/// Converts OTLP Number data points to DataPoints::Number +fn otlp_number_data_points(otlp: &Vec) -> DataPoints { + let mut data_points = Vec::new(); + for point in otlp { + let live_check_point = weaver_live_check::sample_metric::NumberDataPoint { + value: match point.value { + Some(value) => match value { + super::grpc_stubs::proto::metrics::v1::number_data_point::Value::AsDouble( + double, + ) => json!(double), + super::grpc_stubs::proto::metrics::v1::number_data_point::Value::AsInt(int) => { + Value::Number(int.into()) + } + }, + None => Value::Null, + }, + attributes: point + .attributes + .iter() + .map(sample_attribute_from_key_value) + .collect(), + live_check_result: None, + }; + data_points.push(live_check_point); + } + DataPoints::Number(data_points) +} diff --git a/src/registry/otlp/metric_conversion.rs b/src/registry/otlp/metric_conversion.rs deleted file mode 100644 index c116f1ac..00000000 --- a/src/registry/otlp/metric_conversion.rs +++ /dev/null @@ -1,39 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -//! Conversion routines for OTLP to Sample - -use weaver_live_check::sample_metric::SampleMetric; -use weaver_semconv::group::InstrumentSpec; - -use super::grpc_stubs::proto::metrics::v1::{metric::Data, Metric}; - -/// Converts an OTLP metric to a SampleMetric -pub fn otlp_metric_to_sample(otlp_metric: Metric) -> SampleMetric { - SampleMetric { - name: otlp_metric.name, - instrument: otlp_data_to_instrument(otlp_metric.data), - unit: otlp_metric.unit, - live_check_result: None, - } -} - -/// Converts OTLP data to a SampleMetric instrument -/// Mapping: -/// counter → Sum with is_monotonic: true -/// updowncounter → Sum with is_monotonic: false -/// gauge → Gauge -/// histogram → Histogram -fn otlp_data_to_instrument(data: Option) -> InstrumentSpec { - match data { - Some(Data::Sum(sum)) => { - if sum.is_monotonic { - InstrumentSpec::Counter - } else { - InstrumentSpec::UpDownCounter - } - } - Some(Data::Gauge(_)) => InstrumentSpec::Gauge, - Some(Data::Histogram(_)) => InstrumentSpec::Histogram, - _ => InstrumentSpec::Gauge, // TODO Default to Gauge if unknown? - } -} diff --git a/src/registry/otlp/mod.rs b/src/registry/otlp/mod.rs index 9a6abee1..31769e7f 100644 --- a/src/registry/otlp/mod.rs +++ b/src/registry/otlp/mod.rs @@ -2,7 +2,7 @@ //! A basic OTLP receiver integrated into Weaver. -pub mod metric_conversion; +pub mod conversion; pub mod otlp_ingester; use grpc_stubs::proto::collector::logs::v1::logs_service_server::{LogsService, LogsServiceServer}; diff --git a/src/registry/otlp/otlp_ingester.rs b/src/registry/otlp/otlp_ingester.rs index 50d30817..d7686a76 100644 --- a/src/registry/otlp/otlp_ingester.rs +++ b/src/registry/otlp/otlp_ingester.rs @@ -4,21 +4,18 @@ use std::time::Duration; use log::info; -use serde_json::{json, Value}; use weaver_common::log_info; use weaver_live_check::{ - sample_attribute::SampleAttribute, sample_resource::SampleResource, sample_span::{SampleSpan, SampleSpanEvent, SampleSpanLink}, Error, Ingester, Sample, }; -use weaver_semconv::group::SpanKindSpec; use super::{ - grpc_stubs::proto::common::v1::{AnyValue, KeyValue}, - listen_otlp_requests, - metric_conversion::otlp_metric_to_sample, - OtlpRequest, + conversion::{ + otlp_metric_to_sample, sample_attribute_from_key_value, span_kind_from_otlp_kind, + }, + listen_otlp_requests, OtlpRequest, }; /// An ingester for OTLP data @@ -47,60 +44,6 @@ impl OtlpIterator { } } - fn maybe_to_json(value: Option) -> Option { - if let Some(value) = value { - if let Some(value) = value.value { - use crate::registry::otlp::grpc_stubs::proto::common::v1::any_value::Value as GrpcValue; - match value { - GrpcValue::StringValue(string) => Some(Value::String(string)), - GrpcValue::IntValue(int_value) => Some(Value::Number(int_value.into())), - GrpcValue::DoubleValue(double_value) => Some(json!(double_value)), - GrpcValue::BoolValue(bool_value) => Some(Value::Bool(bool_value)), - GrpcValue::ArrayValue(array_value) => { - let mut vec = Vec::new(); - for value in array_value.values { - if let Some(value) = Self::maybe_to_json(Some(value)) { - vec.push(value); - } - } - Some(Value::Array(vec)) - } - _ => None, - } - } else { - None - } - } else { - None - } - } - - // TODO Ideally this would be a TryFrom in the SampleAttribute but requires - // the grpc_stubs to be in another crate - fn sample_attribute_from_key_value(key_value: &KeyValue) -> SampleAttribute { - let value = Self::maybe_to_json(key_value.value.clone()); - let r#type = match value { - Some(ref val) => SampleAttribute::infer_type(val), - None => None, - }; - SampleAttribute { - name: key_value.key.clone(), - value, - r#type, - live_check_result: None, - } - } - - fn span_kind_from_otlp_kind(kind: i32) -> SpanKindSpec { - match kind { - 2 => SpanKindSpec::Server, - 3 => SpanKindSpec::Client, - 4 => SpanKindSpec::Producer, - 5 => SpanKindSpec::Consumer, - _ => SpanKindSpec::Internal, - } - } - fn fill_buffer_from_request(&mut self, request: OtlpRequest) -> Option { match request { OtlpRequest::Logs(_logs) => { @@ -117,7 +60,7 @@ impl OtlpIterator { for attribute in resource.attributes { sample_resource .attributes - .push(Self::sample_attribute_from_key_value(&attribute)); + .push(sample_attribute_from_key_value(&attribute)); } self.buffer.push(Sample::Resource(sample_resource)); } @@ -127,7 +70,7 @@ impl OtlpIterator { // TODO SampleInstrumentationScope? for attribute in scope.attributes { self.buffer.push(Sample::Attribute( - Self::sample_attribute_from_key_value(&attribute), + sample_attribute_from_key_value(&attribute), )); } } @@ -150,7 +93,7 @@ impl OtlpIterator { for attribute in resource.attributes { sample_resource .attributes - .push(Self::sample_attribute_from_key_value(&attribute)); + .push(sample_attribute_from_key_value(&attribute)); } self.buffer.push(Sample::Resource(sample_resource)); } @@ -160,7 +103,7 @@ impl OtlpIterator { // TODO SampleInstrumentationScope? for attribute in scope.attributes { self.buffer.push(Sample::Attribute( - Self::sample_attribute_from_key_value(&attribute), + sample_attribute_from_key_value(&attribute), )); } } @@ -168,7 +111,7 @@ impl OtlpIterator { for span in scope_span.spans { let mut sample_span = SampleSpan { name: span.name, - kind: Self::span_kind_from_otlp_kind(span.kind), + kind: span_kind_from_otlp_kind(span.kind), attributes: Vec::new(), span_events: Vec::new(), span_links: Vec::new(), @@ -177,7 +120,7 @@ impl OtlpIterator { for attribute in span.attributes { sample_span .attributes - .push(Self::sample_attribute_from_key_value(&attribute)); + .push(sample_attribute_from_key_value(&attribute)); } for event in span.events { let mut sample_event = SampleSpanEvent { @@ -188,7 +131,7 @@ impl OtlpIterator { for attribute in event.attributes { sample_event .attributes - .push(Self::sample_attribute_from_key_value(&attribute)); + .push(sample_attribute_from_key_value(&attribute)); } sample_span.span_events.push(sample_event); } @@ -200,7 +143,7 @@ impl OtlpIterator { for attribute in link.attributes { sample_link .attributes - .push(Self::sample_attribute_from_key_value(&attribute)); + .push(sample_attribute_from_key_value(&attribute)); } sample_span.span_links.push(sample_link); } From 2115b9987f861c5ed60eeab525d6f3bc757ed786 Mon Sep 17 00:00:00 2001 From: jerbly Date: Mon, 5 May 2025 12:03:37 -0400 Subject: [PATCH 05/15] added histogram support --- crates/weaver_live_check/src/lib.rs | 4 +- crates/weaver_live_check/src/sample_metric.rs | 64 +++++++++++++++---- .../ansi/live_check_macros.j2 | 10 ++- src/registry/otlp/conversion.rs | 28 +++++++- 4 files changed, 89 insertions(+), 17 deletions(-) diff --git a/crates/weaver_live_check/src/lib.rs b/crates/weaver_live_check/src/lib.rs index 32ecba92..fb66521c 100644 --- a/crates/weaver_live_check/src/lib.rs +++ b/crates/weaver_live_check/src/lib.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use live_checker::LiveChecker; use miette::Diagnostic; use sample_attribute::SampleAttribute; -use sample_metric::{NumberDataPoint, SampleMetric}; +use sample_metric::{HistogramDataPoint, NumberDataPoint, SampleMetric}; use sample_resource::SampleResource; use sample_span::{SampleSpan, SampleSpanEvent, SampleSpanLink}; use serde::{Deserialize, Serialize}; @@ -121,6 +121,8 @@ pub enum SampleRef<'a> { Metric(&'a SampleMetric), /// A sample number data point NumberDataPoint(&'a NumberDataPoint), + /// A sample histogram data point + HistogramDataPoint(&'a HistogramDataPoint), } // Dispatch the live check to the sample type diff --git a/crates/weaver_live_check/src/sample_metric.rs b/crates/weaver_live_check/src/sample_metric.rs index 44e13f46..f68a6380 100644 --- a/crates/weaver_live_check/src/sample_metric.rs +++ b/crates/weaver_live_check/src/sample_metric.rs @@ -42,6 +42,18 @@ pub struct NumberDataPoint { pub struct HistogramDataPoint { /// The attributes of the data point pub attributes: Vec, + /// The count of the data point + pub count: u64, + /// The sum of the data point + pub sum: Option, + /// The bucket counts of the data point + pub bucket_counts: Vec, + /// The minimum of the data point + pub min: Option, + /// The maximum of the data point + pub max: Option, + /// Live check result + pub live_check_result: Option, } /// Represents a single exponential histogram data point of a metric @@ -99,24 +111,48 @@ impl LiveCheckRunner for SampleMetric { result.add_advice_list(advice_list); } // Get advice for the data points - if let Some(DataPoints::Number(number_data_points)) = &mut self.data_points { - for point in number_data_points.iter_mut() { - // TODO Get advice on the data point - let mut point_result = LiveCheckResult::new(); - for advisor in live_checker.advisors.iter_mut() { - let advice_list = advisor.advise( - SampleRef::NumberDataPoint(point), - None, - semconv_metric.as_ref(), - )?; - point_result.add_advice_list(advice_list); + match &mut self.data_points { + Some(DataPoints::Number(number_data_points)) => { + for point in number_data_points.iter_mut() { + let mut point_result = LiveCheckResult::new(); + for advisor in live_checker.advisors.iter_mut() { + let advice_list = advisor.advise( + SampleRef::NumberDataPoint(point), + None, + semconv_metric.as_ref(), + )?; + point_result.add_advice_list(advice_list); + } + point.live_check_result = Some(point_result); + stats.inc_entity_count("number_data_point"); + stats.maybe_add_live_check_result(point.live_check_result.as_ref()); + + for attribute in &mut point.attributes { + attribute.run_live_check(live_checker, stats)?; + } } - point.live_check_result = Some(point_result); + } + Some(DataPoints::Histogram(histogram_data_points)) => { + for point in histogram_data_points.iter_mut() { + let mut point_result = LiveCheckResult::new(); + for advisor in live_checker.advisors.iter_mut() { + let advice_list = advisor.advise( + SampleRef::HistogramDataPoint(point), + None, + semconv_metric.as_ref(), + )?; + point_result.add_advice_list(advice_list); + } + point.live_check_result = Some(point_result); + stats.inc_entity_count("histogram_data_point"); + stats.maybe_add_live_check_result(point.live_check_result.as_ref()); - for attribute in &mut point.attributes { - attribute.run_live_check(live_checker, stats)?; + for attribute in &mut point.attributes { + attribute.run_live_check(live_checker, stats)?; + } } } + _ => (), } self.live_check_result = Some(result); diff --git a/defaults/live_check_templates/ansi/live_check_macros.j2 b/defaults/live_check_templates/ansi/live_check_macros.j2 index 33884f36..2da92123 100644 --- a/defaults/live_check_templates/ansi/live_check_macros.j2 +++ b/defaults/live_check_templates/ansi/live_check_macros.j2 @@ -72,8 +72,16 @@ {%- endfor %} {% endmacro %} +{% macro summarize_data_point_value(data_point) %} +{%- if data_point.value is defined -%} +{{ data_point.value }} +{%- else -%} +sum={{ data_point.sum }}, min={{ data_point.min }}, max={{ data_point.max }} +{%- endif -%} +{% endmacro %} + {% macro display_data_point(data_point, indent=0) %} -{{ " " * indent }}{{ ("Data point") | ansi_bright_cyan | ansi_bold }} {{ data_point.value }} +{{ " " * indent }}{{ ("Data point") | ansi_bright_cyan | ansi_bold }} {{ summarize_data_point_value(data_point) }} {{ display_advice(data_point.live_check_result.all_advice, indent) }} {%- for attribute in data_point.attributes %} {{ display_attribute(attribute, indent + 4) }} diff --git a/src/registry/otlp/conversion.rs b/src/registry/otlp/conversion.rs index 95187974..d2de19d9 100644 --- a/src/registry/otlp/conversion.rs +++ b/src/registry/otlp/conversion.rs @@ -11,7 +11,7 @@ use weaver_semconv::group::{InstrumentSpec, SpanKindSpec}; use super::grpc_stubs::proto::{ common::v1::{AnyValue, KeyValue}, - metrics::v1::{metric::Data, Metric, NumberDataPoint}, + metrics::v1::{metric::Data, HistogramDataPoint, Metric, NumberDataPoint}, }; fn maybe_to_json(value: Option) -> Option { @@ -104,10 +104,36 @@ fn otlp_data_to_instrument(data: &Option) -> InstrumentSpec { fn otlp_data_to_data_points(data: &Option) -> Option { match data { Some(Data::Sum(sum)) => Some(otlp_number_data_points(&sum.data_points)), + Some(Data::Gauge(gauge)) => Some(otlp_number_data_points(&gauge.data_points)), + Some(Data::Histogram(histogram)) => { + Some(otlp_histogram_data_points(&histogram.data_points)) + } _ => None, } } +/// Converts OTLP Histogram data points to DataPoints::Histogram +fn otlp_histogram_data_points(otlp: &Vec) -> DataPoints { + let mut data_points = Vec::new(); + for point in otlp { + let live_check_point = weaver_live_check::sample_metric::HistogramDataPoint { + attributes: point + .attributes + .iter() + .map(sample_attribute_from_key_value) + .collect(), + count: point.count, + sum: point.sum, + bucket_counts: point.bucket_counts.clone(), + min: point.min, + max: point.max, + live_check_result: None, + }; + data_points.push(live_check_point); + } + DataPoints::Histogram(data_points) +} + /// Converts OTLP Number data points to DataPoints::Number fn otlp_number_data_points(otlp: &Vec) -> DataPoints { let mut data_points = Vec::new(); From a6e74260c53b2235666fc109bad759c11111faea Mon Sep 17 00:00:00 2001 From: jerbly Date: Mon, 5 May 2025 13:20:43 -0400 Subject: [PATCH 06/15] improve histogram ansi --- defaults/live_check_templates/ansi/live_check_macros.j2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/defaults/live_check_templates/ansi/live_check_macros.j2 b/defaults/live_check_templates/ansi/live_check_macros.j2 index 2da92123..f775a30e 100644 --- a/defaults/live_check_templates/ansi/live_check_macros.j2 +++ b/defaults/live_check_templates/ansi/live_check_macros.j2 @@ -76,7 +76,7 @@ {%- if data_point.value is defined -%} {{ data_point.value }} {%- else -%} -sum={{ data_point.sum }}, min={{ data_point.min }}, max={{ data_point.max }} +count={{ data_point.count }}, sum={{ data_point.sum }}, min={{ data_point.min }}, max={{ data_point.max }} {%- endif -%} {% endmacro %} From 7c62eba200e415bd2a1f300dc09675518b1477a0 Mon Sep 17 00:00:00 2001 From: jerbly Date: Mon, 5 May 2025 13:28:22 -0400 Subject: [PATCH 07/15] fix up after merge --- src/registry/otlp/conversion.rs | 19 +++++++++++++++++++ src/registry/otlp/otlp_ingester.rs | 7 ++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/registry/otlp/conversion.rs b/src/registry/otlp/conversion.rs index d2de19d9..62fb8c5d 100644 --- a/src/registry/otlp/conversion.rs +++ b/src/registry/otlp/conversion.rs @@ -6,6 +6,7 @@ use serde_json::{json, Value}; use weaver_live_check::{ sample_attribute::SampleAttribute, sample_metric::{DataPoints, SampleMetric}, + sample_span::{Status, StatusCode}, }; use weaver_semconv::group::{InstrumentSpec, SpanKindSpec}; @@ -68,6 +69,24 @@ pub fn span_kind_from_otlp_kind(kind: i32) -> SpanKindSpec { } } +/// Converts an OTLP status to a Status +pub fn status_from_otlp_status( + status: Option, +) -> Option { + if let Some(status) = status { + let code = match status.code { + 1 => StatusCode::Ok, + 2 => StatusCode::Error, + _ => StatusCode::Unset, + }; + return Some(Status { + code, + message: status.message, + }); + } + None +} + /// Converts an OTLP metric to a SampleMetric pub fn otlp_metric_to_sample(otlp_metric: Metric) -> SampleMetric { SampleMetric { diff --git a/src/registry/otlp/otlp_ingester.rs b/src/registry/otlp/otlp_ingester.rs index d74089e3..4d622faf 100644 --- a/src/registry/otlp/otlp_ingester.rs +++ b/src/registry/otlp/otlp_ingester.rs @@ -7,13 +7,14 @@ use log::info; use weaver_common::log_info; use weaver_live_check::{ sample_resource::SampleResource, - sample_span::{SampleSpan, SampleSpanEvent, SampleSpanLink, Status, StatusCode}, + sample_span::{SampleSpan, SampleSpanEvent, SampleSpanLink}, Error, Ingester, Sample, }; use super::{ conversion::{ otlp_metric_to_sample, sample_attribute_from_key_value, span_kind_from_otlp_kind, + status_from_otlp_status, }, listen_otlp_requests, OtlpRequest, }; @@ -111,8 +112,8 @@ impl OtlpIterator { for span in scope_span.spans { let mut sample_span = SampleSpan { name: span.name, - kind: Self::span_kind_from_otlp_kind(span.kind), - status: Self::status_from_otlp_status(span.status), + kind: span_kind_from_otlp_kind(span.kind), + status: status_from_otlp_status(span.status), attributes: Vec::new(), span_events: Vec::new(), span_links: Vec::new(), From 23f0a0c8006a5fd75afe4907760d77421579a468 Mon Sep 17 00:00:00 2001 From: jerbly Date: Mon, 5 May 2025 15:45:40 -0400 Subject: [PATCH 08/15] metrics test --- crates/weaver_live_check/data/metrics.json | 98 +++++++++ .../data/model/metrics/metrics.yaml | 54 +++++ crates/weaver_live_check/src/live_checker.rs | 193 ++++++++++++++++++ 3 files changed, 345 insertions(+) create mode 100644 crates/weaver_live_check/data/metrics.json create mode 100644 crates/weaver_live_check/data/model/metrics/metrics.yaml diff --git a/crates/weaver_live_check/data/metrics.json b/crates/weaver_live_check/data/metrics.json new file mode 100644 index 00000000..210719e6 --- /dev/null +++ b/crates/weaver_live_check/data/metrics.json @@ -0,0 +1,98 @@ +[ + { + "metric": { + "data_points": [ + { + "attributes": [ + { + "name": "state", + "value": "used" + } + ], + "value": 26963050496 + }, + { + "attributes": [ + { + "name": "state", + "value": "free" + } + ], + "value": 586153984 + }, + { + "attributes": [ + { + "name": "system.memory.state", + "value": "inactive" + } + ], + "value": 6810533888 + } + ], + "instrument": "updowncounter", + "name": "system.memory.usage", + "unit": "By" + } + }, + { + "metric": { + "data_points": [ + { + "attributes": [], + "bucket_counts": [ + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0 + ], + "count": 1, + "max": 7.0015, + "min": 7.0015, + "sum": 7.0015 + } + ], + "instrument": "histogram", + "name": "rpc.client.duration", + "unit": "ms" + } + }, + { + "metric": { + "data_points": [ + { + "attributes": [], + "value": 151552000 + } + ], + "instrument": "gauge", + "name": "otelcol_process_memory_rss", + "unit": "By" + } + }, + { + "metric": { + "data_points": [ + { + "attributes": [], + "value": 39585616 + } + ], + "instrument": "counter", + "name": "otelcol_process_runtime_total_alloc_bytes", + "unit": "By" + } + } +] \ No newline at end of file diff --git a/crates/weaver_live_check/data/model/metrics/metrics.yaml b/crates/weaver_live_check/data/model/metrics/metrics.yaml new file mode 100644 index 00000000..58522f70 --- /dev/null +++ b/crates/weaver_live_check/data/model/metrics/metrics.yaml @@ -0,0 +1,54 @@ +groups: + - id: registry.system.memory + type: attribute_group + display_name: System Memory Attributes + brief: "Describes System Memory attributes" + attributes: + - id: system.memory.state + type: + members: + - id: used + value: "used" + stability: development + - id: free + value: "free" + stability: development + - id: shared + value: "shared" + stability: development + deprecated: "Removed, report shared memory usage with `metric.system.memory.shared` metric" + - id: buffers + value: "buffers" + stability: development + - id: cached + value: "cached" + stability: development + stability: development + brief: "The memory state" + examples: ["free", "cached"] + + # system.* metrics + - id: metric.system.uptime + type: metric + metric_name: system.uptime + stability: development + brief: "The time the system has been running" + note: | + Instrumentations SHOULD use a gauge with type `double` and measure uptime in seconds as a floating point number with the highest precision available. + The actual accuracy would depend on the instrumentation and operating system. + instrument: gauge + unit: "s" + + # system.memory.* metrics + - id: metric.system.memory.usage + type: metric + metric_name: system.memory.usage + stability: development + brief: "Reports memory in use by state." + note: | + The sum over all `system.memory.state` values SHOULD equal the total memory + available on the system, that is `system.memory.limit`. + instrument: updowncounter + unit: "By" + attributes: + - ref: system.memory.state diff --git a/crates/weaver_live_check/src/live_checker.rs b/crates/weaver_live_check/src/live_checker.rs index 9119833d..1bf7ab65 100644 --- a/crates/weaver_live_check/src/live_checker.rs +++ b/crates/weaver_live_check/src/live_checker.rs @@ -435,6 +435,137 @@ mod tests { } } + fn make_metrics_registry() -> ResolvedRegistry { + ResolvedRegistry { + registry_url: "TEST_METRICS".to_owned(), + groups: vec![ + // Attribute group for system memory + ResolvedGroup { + id: "registry.system.memory".to_owned(), + r#type: GroupType::AttributeGroup, + brief: "Describes System Memory attributes".to_owned(), + note: "".to_owned(), + prefix: "".to_owned(), + entity_associations: vec![], + extends: None, + stability: None, + deprecated: None, + attributes: vec![ + Attribute { + name: "system.memory.state".to_owned(), + r#type: AttributeType::Enum { + allow_custom_values: None, + members: vec![ + EnumEntriesSpec { + id: "used".to_owned(), + value: ValueSpec::String("used".to_owned()), + brief: None, + note: None, + stability: Some(Stability::Development), + deprecated: None, + }, + EnumEntriesSpec { + id: "free".to_owned(), + value: ValueSpec::String("free".to_owned()), + brief: None, + note: None, + stability: Some(Stability::Development), + deprecated: None, + }, + ], + }, + examples: Some(Examples::Strings(vec![ + "free".to_owned(), + "cached".to_owned(), + ])), + brief: "The memory state".to_owned(), + tag: None, + requirement_level: RequirementLevel::Recommended { text: "".to_owned() }, + sampling_relevant: None, + note: "".to_owned(), + stability: Some(Stability::Development), + deprecated: None, + prefix: false, + tags: None, + value: None, + annotations: None, + }, + ], + span_kind: None, + events: vec![], + metric_name: None, + instrument: None, + unit: None, + name: None, + lineage: None, + display_name: Some("System Memory Attributes".to_owned()), + body: None, + }, + // System uptime metric + ResolvedGroup { + id: "metric.system.uptime".to_owned(), + r#type: GroupType::Metric, + brief: "The time the system has been running".to_owned(), + note: "Instrumentations SHOULD use a gauge with type `double` and measure uptime in seconds as a floating point number with the highest precision available.\nThe actual accuracy would depend on the instrumentation and operating system.".to_owned(), + prefix: "".to_owned(), + entity_associations: vec![], + extends: None, + stability: Some(Stability::Development), + deprecated: None, + attributes: vec![], + span_kind: None, + events: vec![], + metric_name: Some("system.uptime".to_owned()), + instrument: Some(weaver_semconv::group::InstrumentSpec::Gauge), + unit: Some("s".to_owned()), + name: None, + lineage: None, + display_name: None, + body: None, + }, + // System memory usage metric + ResolvedGroup { + id: "metric.system.memory.usage".to_owned(), + r#type: GroupType::Metric, + brief: "Reports memory in use by state.".to_owned(), + note: "The sum over all `system.memory.state` values SHOULD equal the total memory\navailable on the system, that is `system.memory.limit`.".to_owned(), + prefix: "".to_owned(), + entity_associations: vec![], + extends: None, + stability: Some(Stability::Development), + deprecated: None, + attributes: vec![ + Attribute { + name: "system.memory.state".to_owned(), + r#type: AttributeType::PrimitiveOrArray(PrimitiveOrArrayTypeSpec::String), + examples: None, + brief: "The memory state".to_owned(), + tag: None, + requirement_level: RequirementLevel::Recommended { text: "".to_owned() }, + sampling_relevant: None, + note: "".to_owned(), + stability: Some(Stability::Development), + deprecated: None, + prefix: false, + tags: None, + value: None, + annotations: None, + }, + ], + span_kind: None, + events: vec![], + metric_name: Some("system.memory.usage".to_owned()), + instrument: Some(weaver_semconv::group::InstrumentSpec::UpDownCounter), + unit: Some("By".to_owned()), + name: None, + lineage: None, + display_name: None, + body: None, + }, + ], + } + } + #[test] fn test_custom_rego() { let registry = ResolvedRegistry { @@ -603,6 +734,68 @@ mod tests { ); } + #[test] + fn test_json_metric() { + let registry = make_metrics_registry(); + + // Load samples from JSON file + let path = "data/metrics.json"; + let mut samples: Vec = + serde_json::from_reader(File::open(path).expect("Unable to open file")) + .expect("Unable to parse JSON"); + + let advisors: Vec> = vec![ + Box::new(DeprecatedAdvisor), + Box::new(StabilityAdvisor), + Box::new(TypeAdvisor), + Box::new(EnumAdvisor), + ]; + + let mut live_checker = LiveChecker::new(registry, advisors); + let rego_advisor = + RegoAdvisor::new(&live_checker, &None, &None).expect("Failed to create Rego advisor"); + live_checker.add_advisor(Box::new(rego_advisor)); + + let mut stats = LiveCheckStatistics::new(&live_checker.registry); + for sample in &mut samples { + let result = sample.run_live_check(&mut live_checker, &mut stats); + assert!(result.is_ok()); + } + stats.finalize(); + /* Assert on these: + total_entities_by_type: { + "histogram_data_point": 1, + "number_data_point": 5, + "metric": 4, + "attribute": 3, + }, + no_advice_count: 4, + advice_type_counts: { + "attribute_required": 2, + "missing_attribute": 2, + "stability": 2, + "missing_metric": 3, + "missing_namespace": 2, + }, */ + // Check the statistics + assert_eq!( + stats.total_entities_by_type.get("histogram_data_point"), + Some(&1) + ); + assert_eq!( + stats.total_entities_by_type.get("number_data_point"), + Some(&5) + ); + assert_eq!(stats.total_entities_by_type.get("metric"), Some(&4)); + assert_eq!(stats.total_entities_by_type.get("attribute"), Some(&3)); + assert_eq!(stats.no_advice_count, 4); + assert_eq!(stats.advice_type_counts.get("attribute_required"), Some(&2)); + assert_eq!(stats.advice_type_counts.get("missing_attribute"), Some(&2)); + assert_eq!(stats.advice_type_counts.get("stability"), Some(&2)); + assert_eq!(stats.advice_type_counts.get("missing_metric"), Some(&3)); + assert_eq!(stats.advice_type_counts.get("missing_namespace"), Some(&2)); + } + #[test] fn test_bad_custom_rego() { let registry = ResolvedRegistry { From f1f41eb2a66db486c8e2347789c410faaa37ac91 Mon Sep 17 00:00:00 2001 From: jerbly Date: Mon, 5 May 2025 16:00:55 -0400 Subject: [PATCH 09/15] rename sample structs --- crates/weaver_live_check/src/lib.rs | 6 +++--- crates/weaver_live_check/src/sample_metric.rs | 16 ++++++++-------- src/registry/otlp/conversion.rs | 4 ++-- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/crates/weaver_live_check/src/lib.rs b/crates/weaver_live_check/src/lib.rs index fb66521c..3fe4f68c 100644 --- a/crates/weaver_live_check/src/lib.rs +++ b/crates/weaver_live_check/src/lib.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use live_checker::LiveChecker; use miette::Diagnostic; use sample_attribute::SampleAttribute; -use sample_metric::{HistogramDataPoint, NumberDataPoint, SampleMetric}; +use sample_metric::{SampleHistogramDataPoint, SampleMetric, SampleNumberDataPoint}; use sample_resource::SampleResource; use sample_span::{SampleSpan, SampleSpanEvent, SampleSpanLink}; use serde::{Deserialize, Serialize}; @@ -120,9 +120,9 @@ pub enum SampleRef<'a> { /// A sample metric Metric(&'a SampleMetric), /// A sample number data point - NumberDataPoint(&'a NumberDataPoint), + NumberDataPoint(&'a SampleNumberDataPoint), /// A sample histogram data point - HistogramDataPoint(&'a HistogramDataPoint), + HistogramDataPoint(&'a SampleHistogramDataPoint), } // Dispatch the live check to the sample type diff --git a/crates/weaver_live_check/src/sample_metric.rs b/crates/weaver_live_check/src/sample_metric.rs index f68a6380..7a42c083 100644 --- a/crates/weaver_live_check/src/sample_metric.rs +++ b/crates/weaver_live_check/src/sample_metric.rs @@ -17,18 +17,18 @@ use crate::{ #[serde(untagged)] pub enum DataPoints { /// Number data points - Number(Vec), + Number(Vec), /// Histogram data points - Histogram(Vec), + Histogram(Vec), /// Exponential histogram data points - ExponentialHistogram(Vec), + ExponentialHistogram(Vec), /// Summary data points - Summary(Vec), + Summary(Vec), } /// Represents a single data point of a metric #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct NumberDataPoint { +pub struct SampleNumberDataPoint { /// The value of the data point pub value: Value, /// The attributes of the data point @@ -39,7 +39,7 @@ pub struct NumberDataPoint { /// Represents a single histogram data point of a metric #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct HistogramDataPoint { +pub struct SampleHistogramDataPoint { /// The attributes of the data point pub attributes: Vec, /// The count of the data point @@ -58,14 +58,14 @@ pub struct HistogramDataPoint { /// Represents a single exponential histogram data point of a metric #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct ExponentialHistogramDataPoint { +pub struct SampleExponentialHistogramDataPoint { /// The attributes of the data point pub attributes: Vec, } /// Represents a single summary data point of a metric #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct SummaryDataPoint { +pub struct SampleSummaryDataPoint { /// The attributes of the data point pub attributes: Vec, } diff --git a/src/registry/otlp/conversion.rs b/src/registry/otlp/conversion.rs index 62fb8c5d..75968875 100644 --- a/src/registry/otlp/conversion.rs +++ b/src/registry/otlp/conversion.rs @@ -135,7 +135,7 @@ fn otlp_data_to_data_points(data: &Option) -> Option { fn otlp_histogram_data_points(otlp: &Vec) -> DataPoints { let mut data_points = Vec::new(); for point in otlp { - let live_check_point = weaver_live_check::sample_metric::HistogramDataPoint { + let live_check_point = weaver_live_check::sample_metric::SampleHistogramDataPoint { attributes: point .attributes .iter() @@ -157,7 +157,7 @@ fn otlp_histogram_data_points(otlp: &Vec) -> DataPoints { fn otlp_number_data_points(otlp: &Vec) -> DataPoints { let mut data_points = Vec::new(); for point in otlp { - let live_check_point = weaver_live_check::sample_metric::NumberDataPoint { + let live_check_point = weaver_live_check::sample_metric::SampleNumberDataPoint { value: match point.value { Some(value) => match value { super::grpc_stubs::proto::metrics::v1::number_data_point::Value::AsDouble( From 63204e508927648bae250da3e0e28aa4a87839d8 Mon Sep 17 00:00:00 2001 From: jerbly Date: Mon, 5 May 2025 20:08:03 -0400 Subject: [PATCH 10/15] stats, rego and docs --- crates/weaver_live_check/README.md | 4 +- crates/weaver_live_check/src/lib.rs | 51 ++++++++++++++++--- crates/weaver_live_check/src/live_checker.rs | 5 ++ crates/weaver_live_check/src/sample_metric.rs | 5 +- .../ansi/live_check_macros.j2 | 2 +- defaults/policies/live_check_advice/otel.rego | 10 ++++ 6 files changed, 66 insertions(+), 11 deletions(-) diff --git a/crates/weaver_live_check/README.md b/crates/weaver_live_check/README.md index 17343ddb..4dbb6ce0 100644 --- a/crates/weaver_live_check/README.md +++ b/crates/weaver_live_check/README.md @@ -202,7 +202,9 @@ These should be self-explanatory, but: - `no_advice_count` is the number of samples that received no advice - `seen_registry_attributes` is a record of how many times each attribute in the registry was seen in the samples - `seen_non_registry_attributes` is a record of how many times each non-registry attribute was seen in the samples -- `registry_coverage` is the fraction of seen registry attributes over the total registry attributes +- `seen_registry_metrics` is a record of how many times each metric in the registry was seen in the samples +- `seen_non_registry_attributes` is a record of how many times each non-registry metric was seen in the samples +- `registry_coverage` is the fraction of seen registry entities over the total registry entities This could be parsed for a more sophisticated way to determine pass/fail in CI for example. diff --git a/crates/weaver_live_check/src/lib.rs b/crates/weaver_live_check/src/lib.rs index 3fe4f68c..cc044265 100644 --- a/crates/weaver_live_check/src/lib.rs +++ b/crates/weaver_live_check/src/lib.rs @@ -14,6 +14,7 @@ use serde::{Deserialize, Serialize}; use weaver_checker::violation::{Advice, AdviceLevel}; use weaver_common::diagnostic::{DiagnosticMessage, DiagnosticMessages}; use weaver_forge::registry::ResolvedRegistry; +use weaver_semconv::group::GroupType; /// Advisors for live checks pub mod advice; @@ -209,17 +210,21 @@ pub struct LiveCheckStatistics { pub total_advisories: usize, /// The number of each advice level pub advice_level_counts: HashMap, - /// The number of attributes with each highest advice level + /// The number of entities with each highest advice level pub highest_advice_level_counts: HashMap, - /// The number of attributes with no advice + /// The number of entities with no advice pub no_advice_count: usize, - /// The number of attributes with each advice type + /// The number of entities with each advice type pub advice_type_counts: HashMap, /// The number of each attribute seen from the registry pub seen_registry_attributes: HashMap, /// The number of each non-registry attribute seen pub seen_non_registry_attributes: HashMap, - /// Fraction of the registry covered by the attributes + /// The number of each metric seen from the registry + pub seen_registry_metrics: HashMap, + /// The number of each non-registry metric seen + pub seen_non_registry_metrics: HashMap, + /// Fraction of the registry covered by the attributes and metrics pub registry_coverage: f32, } @@ -228,12 +233,18 @@ impl LiveCheckStatistics { #[must_use] pub fn new(registry: &ResolvedRegistry) -> Self { let mut seen_attributes = HashMap::new(); + let mut seen_metrics = HashMap::new(); for group in ®istry.groups { for attribute in &group.attributes { if attribute.deprecated.is_none() { let _ = seen_attributes.insert(attribute.name.clone(), 0); } } + if group.r#type == GroupType::Metric && group.deprecated.is_none() { + if let Some(metric_name) = &group.metric_name { + let _ = seen_metrics.insert(metric_name.clone(), 0); + } + } } LiveCheckStatistics { total_entities: 0, @@ -245,6 +256,8 @@ impl LiveCheckStatistics { advice_type_counts: HashMap::new(), seen_registry_attributes: seen_attributes, seen_non_registry_attributes: HashMap::new(), + seen_registry_metrics: seen_metrics, + seen_non_registry_metrics: HashMap::new(), registry_coverage: 0.0, } } @@ -320,6 +333,20 @@ impl LiveCheckStatistics { } } + /// Add metric name to coverage + pub fn add_metric_name_to_coverage(&mut self, seen_metric_name: String) { + if let Some(count) = self.seen_registry_metrics.get_mut(&seen_metric_name) { + // This is a registry metric + *count += 1; + } else { + // This is a non-registry metric + *self + .seen_non_registry_metrics + .entry(seen_metric_name) + .or_insert(0) += 1; + } + } + /// Are there any violations in the statistics? #[must_use] pub fn has_violations(&self) -> bool { @@ -330,16 +357,26 @@ impl LiveCheckStatistics { /// Finalize the statistics pub fn finalize(&mut self) { // Calculate the registry coverage - // non-zero attributes / total attributes + // (non-zero attributes + non-zero metrics) / (total attributes + total metrics) let non_zero_attributes = self .seen_registry_attributes .values() .filter(|&&count| count > 0) .count(); let total_registry_attributes = self.seen_registry_attributes.len(); - if total_registry_attributes > 0 { + + let non_zero_metrics = self + .seen_registry_metrics + .values() + .filter(|&&count| count > 0) + .count(); + let total_registry_metrics = self.seen_registry_metrics.len(); + + let total_registry_items = total_registry_attributes + total_registry_metrics; + + if total_registry_items > 0 { self.registry_coverage = - (non_zero_attributes as f32) / (total_registry_attributes as f32); + ((non_zero_attributes + non_zero_metrics) as f32) / (total_registry_items as f32); } else { self.registry_coverage = 0.0; } diff --git a/crates/weaver_live_check/src/live_checker.rs b/crates/weaver_live_check/src/live_checker.rs index 1bf7ab65..0fe21f0c 100644 --- a/crates/weaver_live_check/src/live_checker.rs +++ b/crates/weaver_live_check/src/live_checker.rs @@ -794,6 +794,11 @@ mod tests { assert_eq!(stats.advice_type_counts.get("stability"), Some(&2)); assert_eq!(stats.advice_type_counts.get("missing_metric"), Some(&3)); assert_eq!(stats.advice_type_counts.get("missing_namespace"), Some(&2)); + assert_eq!( + stats.seen_registry_metrics.get("system.memory.usage"), + Some(&1) + ); + assert_eq!(stats.seen_non_registry_metrics.len(), 3); } #[test] diff --git a/crates/weaver_live_check/src/sample_metric.rs b/crates/weaver_live_check/src/sample_metric.rs index 7a42c083..91917a28 100644 --- a/crates/weaver_live_check/src/sample_metric.rs +++ b/crates/weaver_live_check/src/sample_metric.rs @@ -124,7 +124,7 @@ impl LiveCheckRunner for SampleMetric { point_result.add_advice_list(advice_list); } point.live_check_result = Some(point_result); - stats.inc_entity_count("number_data_point"); + stats.inc_entity_count("data_point"); stats.maybe_add_live_check_result(point.live_check_result.as_ref()); for attribute in &mut point.attributes { @@ -144,7 +144,7 @@ impl LiveCheckRunner for SampleMetric { point_result.add_advice_list(advice_list); } point.live_check_result = Some(point_result); - stats.inc_entity_count("histogram_data_point"); + stats.inc_entity_count("data_point"); stats.maybe_add_live_check_result(point.live_check_result.as_ref()); for attribute in &mut point.attributes { @@ -158,6 +158,7 @@ impl LiveCheckRunner for SampleMetric { self.live_check_result = Some(result); stats.inc_entity_count("metric"); stats.maybe_add_live_check_result(self.live_check_result.as_ref()); + stats.add_metric_name_to_coverage(self.name.clone()); Ok(()) } } diff --git a/defaults/live_check_templates/ansi/live_check_macros.j2 b/defaults/live_check_templates/ansi/live_check_macros.j2 index f775a30e..03fb196e 100644 --- a/defaults/live_check_templates/ansi/live_check_macros.j2 +++ b/defaults/live_check_templates/ansi/live_check_macros.j2 @@ -27,7 +27,7 @@ {% endif %} {{ ("Registry coverage") | ansi_blue | ansi_bold }} - - attributes seen: {{ (statistics.registry_coverage * 100) | round(2) }}% + - entities seen: {{ (statistics.registry_coverage * 100) | round(2) }}% {% endmacro %} {% macro display_advice(all_advice, indent=0) %} diff --git a/defaults/policies/live_check_advice/otel.rego b/defaults/policies/live_check_advice/otel.rego index 84f53f1e..0c258ea1 100644 --- a/defaults/policies/live_check_advice/otel.rego +++ b/defaults/policies/live_check_advice/otel.rego @@ -40,6 +40,16 @@ deny contains make_advice(advice_type, advice_level, value, message) if { message := "Does not match name formatting rules" } +# checks metric name format +deny contains make_advice(advice_type, advice_level, value, message) if { + input.metric + not regex.match(name_regex, input.metric.name) + advice_type := "invalid_format" + advice_level := "violation" + value := input.metric.name + message := "Does not match name formatting rules" +} + # checks attribute namespace doesn't collide with existing attributes deny contains make_advice(advice_type, advice_level, value, message) if { input.attribute From 3cd7a8f662dab4a43c8130da3a77c03b3212a6dc Mon Sep 17 00:00:00 2001 From: jerbly Date: Mon, 5 May 2025 20:18:27 -0400 Subject: [PATCH 11/15] fix test --- crates/weaver_live_check/src/live_checker.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/crates/weaver_live_check/src/live_checker.rs b/crates/weaver_live_check/src/live_checker.rs index 0fe21f0c..9cf3d7a3 100644 --- a/crates/weaver_live_check/src/live_checker.rs +++ b/crates/weaver_live_check/src/live_checker.rs @@ -764,8 +764,7 @@ mod tests { stats.finalize(); /* Assert on these: total_entities_by_type: { - "histogram_data_point": 1, - "number_data_point": 5, + "data_point": 6, "metric": 4, "attribute": 3, }, @@ -778,14 +777,7 @@ mod tests { "missing_namespace": 2, }, */ // Check the statistics - assert_eq!( - stats.total_entities_by_type.get("histogram_data_point"), - Some(&1) - ); - assert_eq!( - stats.total_entities_by_type.get("number_data_point"), - Some(&5) - ); + assert_eq!(stats.total_entities_by_type.get("data_point"), Some(&6)); assert_eq!(stats.total_entities_by_type.get("metric"), Some(&4)); assert_eq!(stats.total_entities_by_type.get("attribute"), Some(&3)); assert_eq!(stats.no_advice_count, 4); From 8247a074799caf04c56eb0800c13c7afb6430859 Mon Sep 17 00:00:00 2001 From: jerbly Date: Tue, 6 May 2025 10:41:40 -0400 Subject: [PATCH 12/15] histogram attributes --- crates/weaver_live_check/src/advice.rs | 61 ++++++++++++++++++-------- 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/crates/weaver_live_check/src/advice.rs b/crates/weaver_live_check/src/advice.rs index 0f2420b9..e773791d 100644 --- a/crates/weaver_live_check/src/advice.rs +++ b/crates/weaver_live_check/src/advice.rs @@ -18,7 +18,7 @@ use weaver_semconv::{ stability::Stability, }; -use crate::{live_checker::LiveChecker, Error, SampleRef}; +use crate::{live_checker::LiveChecker, sample_attribute::SampleAttribute, Error, SampleRef}; /// Embedded default live check rego policies pub const DEFAULT_LIVE_CHECK_REGO: &str = @@ -147,6 +147,31 @@ impl Advisor for StabilityAdvisor { /// An advisor that checks if an attribute has the correct type pub struct TypeAdvisor; + +/// Checks if required attributes from a resolved group are present in a list of attributes +/// Returns a list of advice for missing attributes +fn check_required_attributes( + required_attributes: &[Attribute], + attributes: &[SampleAttribute], +) -> Vec { + let mut advice_list = Vec::new(); + for required_attribute in required_attributes { + // Check if the attribute is present in the sample + if !attributes + .iter() + .any(|attribute| attribute.name == required_attribute.name) + { + advice_list.push(Advice { + advice_type: "attribute_required".to_owned(), + value: Value::String(required_attribute.name.clone()), + message: "Attribute is required".to_owned(), + advice_level: AdviceLevel::Violation, + }); + } + } + advice_list +} + impl Advisor for TypeAdvisor { fn advise( &mut self, @@ -210,6 +235,7 @@ impl Advisor for TypeAdvisor { } } SampleRef::Metric(sample_metric) => { + // Check the instrument and unit of the metric let mut advice_list = Vec::new(); if let Some(semconv_metric) = registry_group { if let Some(semconv_instrument) = &semconv_metric.instrument { @@ -236,25 +262,24 @@ impl Advisor for TypeAdvisor { Ok(advice_list) } SampleRef::NumberDataPoint(sample_number_data_point) => { - let mut advice_list = Vec::new(); if let Some(semconv_metric) = registry_group { - for semconv_attribute in semconv_metric.attributes.iter() { - // Check if the attribute is present in the sample - if !sample_number_data_point - .attributes - .iter() - .any(|attribute| attribute.name == semconv_attribute.name) - { - advice_list.push(Advice { - advice_type: "attribute_required".to_owned(), - value: Value::String(semconv_attribute.name.clone()), - message: "Attribute is required".to_owned(), - advice_level: AdviceLevel::Violation, - }); - } - } + Ok(check_required_attributes( + &semconv_metric.attributes, + &sample_number_data_point.attributes, + )) + } else { + Ok(Vec::new()) + } + } + SampleRef::HistogramDataPoint(sample_histogram_data_point) => { + if let Some(semconv_metric) = registry_group { + Ok(check_required_attributes( + &semconv_metric.attributes, + &sample_histogram_data_point.attributes, + )) + } else { + Ok(Vec::new()) } - Ok(advice_list) } _ => Ok(Vec::new()), } From 6eda51902cbba8c8ab9d4e9ec8e320eca5265532 Mon Sep 17 00:00:00 2001 From: jerbly Date: Tue, 6 May 2025 20:21:04 -0400 Subject: [PATCH 13/15] Rego registry inputs, example, test, docs --- crates/weaver_live_check/README.md | 15 ++++++-- crates/weaver_live_check/data/metrics.json | 2 +- .../data/policies/live_check_advice/test.rego | 28 ++++++++++---- crates/weaver_live_check/src/advice.rs | 18 +++++++-- crates/weaver_live_check/src/live_checker.rs | 38 +++++++++++++++++-- defaults/policies/live_check_advice/otel.rego | 34 ++++++++--------- 6 files changed, 100 insertions(+), 35 deletions(-) diff --git a/crates/weaver_live_check/README.md b/crates/weaver_live_check/README.md index 4dbb6ce0..2a4bbbe9 100644 --- a/crates/weaver_live_check/README.md +++ b/crates/weaver_live_check/README.md @@ -107,6 +107,9 @@ As mentioned, a list of `Advice` is returned in the report for each sample entit } ``` +> **Note** +> The `live_check_result` object augments the sample entity at the pertinent level in the structure. If the structure is `metric`->`[number_data_point]`->`[attribute]`, advice should be give at the `number_data_point` level for, say, required attributes that have not been supplied. Whereas, attribute advice, like `missing_attribute` in the JSON above, is given at the attribute level. + ### Custom advisors Use the `--advice-policies` command line option to provide a path to a directory containing Rego policies with the `live_check_advice` package name. Here's a very simple example that rejects any attribute name containing the string "test": @@ -118,8 +121,8 @@ import rego.v1 # checks attribute name contains the word "test" deny contains make_advice(advice_type, advice_level, value, message) if { - input.attribute - value := input.attribute.name + input.sample.attribute + value := input.sample.attribute.name contains(value, "test") advice_type := "contains_test" advice_level := "violation" @@ -135,7 +138,13 @@ make_advice(advice_type, advice_level, value, message) := { } ``` -`input` contains the sample entity for assessment wrapped in a type e.g. `input.attribute` or `input.span`. `data` contains a structure derived from the supplied `Registry`. A jq preprocessor takes the `Registry` (and maps for attributes and templates) to produce the `data` for the policy. If the jq is simply `.` this will passthrough as-is. Preprocessing is used to improve Rego performance and to simplify policy definitions. With this model `data` is processed once whereas the Rego policy runs for every sample entity as it arrives in the stream. +`input.sample` contains the sample entity for assessment wrapped in a type e.g. `input.sample.attribute` or `input.sample.span`. + +`input.registry_attribute`, when present, contains the matching attribute definition from the supplied registry. + +`input.registry_group`, when present, contains the matching group definition from the supplied registry. + +`data` contains a structure derived from the supplied `Registry`. A jq preprocessor takes the `Registry` (and maps for attributes and templates) to produce the `data` for the policy. If the jq is simply `.` this will passthrough as-is. Preprocessing is used to improve Rego performance and to simplify policy definitions. With this model `data` is processed once whereas the Rego policy runs for every sample entity as it arrives in the stream. To override the default Otel jq preprocessor provide a path to the jq file through the `--advice-preprocessor` option. diff --git a/crates/weaver_live_check/data/metrics.json b/crates/weaver_live_check/data/metrics.json index 210719e6..f9dad36b 100644 --- a/crates/weaver_live_check/data/metrics.json +++ b/crates/weaver_live_check/data/metrics.json @@ -27,7 +27,7 @@ "value": "inactive" } ], - "value": 6810533888 + "value": 681053388.8 } ], "instrument": "updowncounter", diff --git a/crates/weaver_live_check/data/policies/live_check_advice/test.rego b/crates/weaver_live_check/data/policies/live_check_advice/test.rego index 9115f8ab..24bb538b 100644 --- a/crates/weaver_live_check/data/policies/live_check_advice/test.rego +++ b/crates/weaver_live_check/data/policies/live_check_advice/test.rego @@ -4,8 +4,8 @@ import rego.v1 # checks attribute name contains the word "test" deny contains make_advice(advice_type, advice_level, value, message) if { - input.attribute - value := input.attribute.name + input.sample.attribute + value := input.sample.attribute.name contains(value, "test") advice_type := "contains_test" advice_level := "violation" @@ -14,8 +14,8 @@ deny contains make_advice(advice_type, advice_level, value, message) if { # checks span name contains the word "test" deny contains make_advice(advice_type, advice_level, value, message) if { - input.span - value := input.span.name + input.sample.span + value := input.sample.span.name contains(value, "test") advice_type := "contains_test" advice_level := "violation" @@ -24,8 +24,8 @@ deny contains make_advice(advice_type, advice_level, value, message) if { # checks span status message contains the word "test" deny contains make_advice(advice_type, advice_level, value, message) if { - input.span - value := input.span.status.message + input.sample.span + value := input.sample.span.status.message contains(value, "test") advice_type := "contains_test_in_status" advice_level := "violation" @@ -34,14 +34,26 @@ deny contains make_advice(advice_type, advice_level, value, message) if { # checks span_event name contains the word "test" deny contains make_advice(advice_type, advice_level, value, message) if { - input.span_event - value := input.span_event.name + input.sample.span_event + value := input.sample.span_event.name contains(value, "test") advice_type := "contains_test" advice_level := "violation" message := "Name must not contain 'test'" } +# This example shows how to use the registry_group provided in the input. +# If the metric's unit is "By" the value in this data-point must be an integer. +deny contains make_advice(advice_type, advice_level, value, message) if { + input.sample.number_data_point + value := input.sample.number_data_point.value + input.registry_group.unit == "By" + value != floor(value) # not a good type check, but serves as an example + advice_type := "invalid_data_point_value" + advice_level := "violation" + message := "Value must be an integer when unit is 'By'" +} + make_advice(advice_type, advice_level, value, message) := { "type": "advice", "advice_type": advice_type, diff --git a/crates/weaver_live_check/src/advice.rs b/crates/weaver_live_check/src/advice.rs index e773791d..1d066f2f 100644 --- a/crates/weaver_live_check/src/advice.rs +++ b/crates/weaver_live_check/src/advice.rs @@ -437,13 +437,25 @@ impl RegoAdvisor { } } +/// Input data for the check function +#[derive(Serialize)] +struct RegoInput<'a> { + sample: SampleRef<'a>, + registry_attribute: Option<&'a Rc>, + registry_group: Option<&'a Rc>, +} + impl Advisor for RegoAdvisor { fn advise( &mut self, sample: SampleRef<'_>, - _registry_attribute: Option<&Rc>, - _registry_group: Option<&Rc>, + registry_attribute: Option<&Rc>, + registry_group: Option<&Rc>, ) -> Result, Error> { - self.check(sample) + self.check(RegoInput { + sample, + registry_attribute, + registry_group, + }) } } diff --git a/crates/weaver_live_check/src/live_checker.rs b/crates/weaver_live_check/src/live_checker.rs index 9cf3d7a3..8f83bab3 100644 --- a/crates/weaver_live_check/src/live_checker.rs +++ b/crates/weaver_live_check/src/live_checker.rs @@ -123,7 +123,7 @@ mod tests { AttributeType, EnumEntriesSpec, Examples, PrimitiveOrArrayTypeSpec, RequirementLevel, TemplateTypeSpec, ValueSpec, }, - group::{GroupType, SpanKindSpec}, + group::{GroupType, InstrumentSpec, SpanKindSpec}, stability::Stability, }; @@ -516,7 +516,7 @@ mod tests { span_kind: None, events: vec![], metric_name: Some("system.uptime".to_owned()), - instrument: Some(weaver_semconv::group::InstrumentSpec::Gauge), + instrument: Some(InstrumentSpec::Gauge), unit: Some("s".to_owned()), name: None, lineage: None, @@ -555,7 +555,7 @@ mod tests { span_kind: None, events: vec![], metric_name: Some("system.memory.usage".to_owned()), - instrument: Some(weaver_semconv::group::InstrumentSpec::UpDownCounter), + instrument: Some(InstrumentSpec::UpDownCounter), unit: Some("By".to_owned()), name: None, lineage: None, @@ -793,6 +793,38 @@ mod tests { assert_eq!(stats.seen_non_registry_metrics.len(), 3); } + #[test] + fn test_json_metric_custom_rego() { + let registry = make_metrics_registry(); + + // Load samples from JSON file + let path = "data/metrics.json"; + let mut samples: Vec = + serde_json::from_reader(File::open(path).expect("Unable to open file")) + .expect("Unable to parse JSON"); + + let mut live_checker = LiveChecker::new(registry, vec![]); + let rego_advisor = RegoAdvisor::new( + &live_checker, + &Some("data/policies/live_check_advice/".into()), + &Some("data/jq/test.jq".into()), + ) + .expect("Failed to create Rego advisor"); + live_checker.add_advisor(Box::new(rego_advisor)); + + let mut stats = LiveCheckStatistics::new(&live_checker.registry); + for sample in &mut samples { + let result = sample.run_live_check(&mut live_checker, &mut stats); + println!("{:?}", result); + assert!(result.is_ok()); + } + stats.finalize(); + assert_eq!( + stats.advice_type_counts.get("invalid_data_point_value"), + Some(&1) + ); + } + #[test] fn test_bad_custom_rego() { let registry = ResolvedRegistry { diff --git a/defaults/policies/live_check_advice/otel.rego b/defaults/policies/live_check_advice/otel.rego index 0c258ea1..6333afd4 100644 --- a/defaults/policies/live_check_advice/otel.rego +++ b/defaults/policies/live_check_advice/otel.rego @@ -22,43 +22,43 @@ concat(".", array.slice(parts, 0, i)) | # checks attribute has a namespace deny contains make_advice(advice_type, advice_level, value, message) if { - input.attribute - not contains(input.attribute.name, ".") + input.sample.attribute + not contains(input.sample.attribute.name, ".") advice_type := "missing_namespace" advice_level := "improvement" - value := input.attribute.name + value := input.sample.attribute.name message := "Does not have a namespace" } # checks attribute name format deny contains make_advice(advice_type, advice_level, value, message) if { - input.attribute - not regex.match(name_regex, input.attribute.name) + input.sample.attribute + not regex.match(name_regex, input.sample.attribute.name) advice_type := "invalid_format" advice_level := "violation" - value := input.attribute.name + value := input.sample.attribute.name message := "Does not match name formatting rules" } # checks metric name format deny contains make_advice(advice_type, advice_level, value, message) if { - input.metric - not regex.match(name_regex, input.metric.name) + input.sample.metric + not regex.match(name_regex, input.sample.metric.name) advice_type := "invalid_format" advice_level := "violation" - value := input.metric.name + value := input.sample.metric.name message := "Does not match name formatting rules" } # checks attribute namespace doesn't collide with existing attributes deny contains make_advice(advice_type, advice_level, value, message) if { - input.attribute + input.sample.attribute # Skip if no namespace - contains(input.attribute.name, ".") + contains(input.sample.attribute.name, ".") # Get input namespaces - namespaces := derive_namespaces(input.attribute.name) + namespaces := derive_namespaces(input.sample.attribute.name) # Find collision some value in namespaces @@ -71,15 +71,15 @@ deny contains make_advice(advice_type, advice_level, value, message) if { # provides advice if the attribute extends an existing namespace deny contains make_advice(advice_type, advice_level, value, message) if { - input.attribute + input.sample.attribute # Skip checks first (fail fast) - contains(input.attribute.name, ".") # Must have at least one namespace - not is_template_type(input.attribute.name) - not is_registry_attribute(input.attribute.name) + contains(input.sample.attribute.name, ".") # Must have at least one namespace + not is_template_type(input.sample.attribute.name) + not is_registry_attribute(input.sample.attribute.name) # Get input namespaces - namespaces := derive_namespaces(input.attribute.name) + namespaces := derive_namespaces(input.sample.attribute.name) # Find matches - check keys in set matches := [ns | some ns in namespaces; namespaces_to_check_set[ns] != null] From 4416af5a53ab37e9269225d766ee4bed1ea9f04e Mon Sep 17 00:00:00 2001 From: jerbly Date: Sun, 11 May 2025 15:30:23 -0400 Subject: [PATCH 14/15] feat: add schemars support for JSON schema generation in weaver_live_check and weaver_checker --- Cargo.lock | 2 ++ crates/weaver_checker/Cargo.toml | 1 + crates/weaver_checker/src/violation.rs | 7 +++++-- crates/weaver_live_check/Cargo.toml | 1 + crates/weaver_live_check/src/lib.rs | 13 +++++++++++-- crates/weaver_live_check/src/sample_attribute.rs | 3 ++- crates/weaver_live_check/src/sample_metric.rs | 16 +++++++--------- crates/weaver_live_check/src/sample_resource.rs | 3 ++- crates/weaver_live_check/src/sample_span.rs | 11 ++++++----- 9 files changed, 37 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index db6c5d72..eed403d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5947,6 +5947,7 @@ dependencies = [ "globset", "miette", "regorus", + "schemars", "serde", "serde_json", "serde_yaml", @@ -6065,6 +6066,7 @@ name = "weaver_live_check" version = "0.15.0" dependencies = [ "miette", + "schemars", "serde", "serde_json", "tempfile", diff --git a/crates/weaver_checker/Cargo.toml b/crates/weaver_checker/Cargo.toml index f7028d3b..0283b8fa 100644 --- a/crates/weaver_checker/Cargo.toml +++ b/crates/weaver_checker/Cargo.toml @@ -23,6 +23,7 @@ serde_yaml.workspace = true walkdir.workspace = true globset.workspace = true miette.workspace = true +schemars.workspace = true regorus = { version = "0.4.0", default-features = false, features = [ "std", diff --git a/crates/weaver_checker/src/violation.rs b/crates/weaver_checker/src/violation.rs index 0d1bb256..ae79ce4a 100644 --- a/crates/weaver_checker/src/violation.rs +++ b/crates/weaver_checker/src/violation.rs @@ -2,6 +2,7 @@ //! Definition of a policy violation. +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::fmt::{Display, Formatter}; @@ -72,7 +73,9 @@ impl Violation { } /// The level of an advice -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Eq, Hash)] +#[derive( + Debug, Clone, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Eq, Hash, JsonSchema, +)] #[serde(rename_all = "snake_case")] pub enum AdviceLevel { /// Useful context without action needed @@ -84,7 +87,7 @@ pub enum AdviceLevel { } /// Represents a live check advice -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct Advice { /// The type of advice e.g. "is_deprecated" pub advice_type: String, diff --git a/crates/weaver_live_check/Cargo.toml b/crates/weaver_live_check/Cargo.toml index dcbf6a14..8fbe0989 100644 --- a/crates/weaver_live_check/Cargo.toml +++ b/crates/weaver_live_check/Cargo.toml @@ -19,6 +19,7 @@ thiserror.workspace = true serde.workspace = true serde_json.workspace = true miette.workspace = true +schemars.workspace = true [dev-dependencies] tempfile = "3.18.0" diff --git a/crates/weaver_live_check/src/lib.rs b/crates/weaver_live_check/src/lib.rs index cc044265..70e8accd 100644 --- a/crates/weaver_live_check/src/lib.rs +++ b/crates/weaver_live_check/src/lib.rs @@ -10,6 +10,7 @@ use sample_attribute::SampleAttribute; use sample_metric::{SampleHistogramDataPoint, SampleMetric, SampleNumberDataPoint}; use sample_resource::SampleResource; use sample_span::{SampleSpan, SampleSpanEvent, SampleSpanLink}; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use weaver_checker::violation::{Advice, AdviceLevel}; use weaver_common::diagnostic::{DiagnosticMessage, DiagnosticMessages}; @@ -87,7 +88,7 @@ pub trait Ingester { } /// Represents a sample entity -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "snake_case")] pub enum Sample { /// A sample attribute @@ -145,7 +146,7 @@ impl LiveCheckRunner for Sample { } /// Represents a live check result -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct LiveCheckResult { /// Advice on the entity pub all_advice: Vec, @@ -392,3 +393,11 @@ pub trait LiveCheckRunner { stats: &mut LiveCheckStatistics, ) -> Result<(), Error>; } + +/// Get the JSON schema for the Sample struct +pub fn get_json_schema() -> Result { + let schema = schemars::schema_for!(Sample); + serde_json::to_string_pretty(&schema).map_err(|e| Error::OutputError { + error: e.to_string(), + }) +} diff --git a/crates/weaver_live_check/src/sample_attribute.rs b/crates/weaver_live_check/src/sample_attribute.rs index 6a19e67b..e0b975da 100644 --- a/crates/weaver_live_check/src/sample_attribute.rs +++ b/crates/weaver_live_check/src/sample_attribute.rs @@ -2,6 +2,7 @@ //! Intermediary format for telemetry sample attributes +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use weaver_checker::violation::{Advice, AdviceLevel}; @@ -13,7 +14,7 @@ use crate::{ }; /// Represents a sample telemetry attribute parsed from any source -#[derive(Debug, Clone, PartialEq, Serialize)] +#[derive(Debug, Clone, PartialEq, Serialize, JsonSchema)] pub struct SampleAttribute { /// The name of the attribute pub name: String, diff --git a/crates/weaver_live_check/src/sample_metric.rs b/crates/weaver_live_check/src/sample_metric.rs index 91917a28..dadb727c 100644 --- a/crates/weaver_live_check/src/sample_metric.rs +++ b/crates/weaver_live_check/src/sample_metric.rs @@ -2,6 +2,7 @@ //! Intermediary format for telemetry sample spans +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; use weaver_checker::violation::{Advice, AdviceLevel}; @@ -13,7 +14,7 @@ use crate::{ }; /// The data point types of a metric -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] #[serde(untagged)] pub enum DataPoints { /// Number data points @@ -27,7 +28,7 @@ pub enum DataPoints { } /// Represents a single data point of a metric -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct SampleNumberDataPoint { /// The value of the data point pub value: Value, @@ -38,7 +39,7 @@ pub struct SampleNumberDataPoint { } /// Represents a single histogram data point of a metric -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct SampleHistogramDataPoint { /// The attributes of the data point pub attributes: Vec, @@ -57,27 +58,24 @@ pub struct SampleHistogramDataPoint { } /// Represents a single exponential histogram data point of a metric -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct SampleExponentialHistogramDataPoint { /// The attributes of the data point pub attributes: Vec, } /// Represents a single summary data point of a metric -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct SampleSummaryDataPoint { /// The attributes of the data point pub attributes: Vec, } /// Represents a sample telemetry span parsed from any source -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct SampleMetric { /// Metric name. pub name: String, - /// Set of attributes - // #[serde(default)] - // pub attributes: Vec, /// Type of the metric (e.g. gauge, histogram, ...). pub instrument: InstrumentSpec, /// Unit of the metric. diff --git a/crates/weaver_live_check/src/sample_resource.rs b/crates/weaver_live_check/src/sample_resource.rs index 4681cacd..bb5eb2c2 100644 --- a/crates/weaver_live_check/src/sample_resource.rs +++ b/crates/weaver_live_check/src/sample_resource.rs @@ -2,6 +2,7 @@ //! Intermediary format for telemetry sample resources +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use crate::{ @@ -10,7 +11,7 @@ use crate::{ }; /// Represents a resource -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct SampleResource { /// The attributes of the resource #[serde(default)] diff --git a/crates/weaver_live_check/src/sample_span.rs b/crates/weaver_live_check/src/sample_span.rs index b9619e9c..95d09d0d 100644 --- a/crates/weaver_live_check/src/sample_span.rs +++ b/crates/weaver_live_check/src/sample_span.rs @@ -2,6 +2,7 @@ //! Intermediary format for telemetry sample spans +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use weaver_semconv::group::SpanKindSpec; @@ -11,7 +12,7 @@ use crate::{ }; /// The status code of the span -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "snake_case")] pub enum StatusCode { /// The status is unset @@ -23,7 +24,7 @@ pub enum StatusCode { } /// The status code and message of the span -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct Status { /// The status code pub code: StatusCode, @@ -32,7 +33,7 @@ pub struct Status { } /// Represents a sample telemetry span parsed from any source -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct SampleSpan { /// The name of the span pub name: String, @@ -81,7 +82,7 @@ impl LiveCheckRunner for SampleSpan { } /// Represents a span event -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct SampleSpanEvent { /// The name of the event pub name: String, @@ -114,7 +115,7 @@ impl LiveCheckRunner for SampleSpanEvent { } /// Represents a span link -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct SampleSpanLink { /// The attributes of the link #[serde(default)] From 728aabe5d474ac320dc480214aa9a926b0f8395d Mon Sep 17 00:00:00 2001 From: jerbly Date: Sun, 11 May 2025 20:09:19 -0400 Subject: [PATCH 15/15] added handling for exponentialhistogram and summary --- crates/weaver_live_check/src/advice.rs | 43 +++++++++-- crates/weaver_live_check/src/live_checker.rs | 73 +++++++++++++++++++ crates/weaver_live_check/src/sample_metric.rs | 62 +++++++++++++++- src/registry/otlp/conversion.rs | 47 ++++++++++-- 4 files changed, 209 insertions(+), 16 deletions(-) diff --git a/crates/weaver_live_check/src/advice.rs b/crates/weaver_live_check/src/advice.rs index 1d066f2f..29a37776 100644 --- a/crates/weaver_live_check/src/advice.rs +++ b/crates/weaver_live_check/src/advice.rs @@ -18,7 +18,10 @@ use weaver_semconv::{ stability::Stability, }; -use crate::{live_checker::LiveChecker, sample_attribute::SampleAttribute, Error, SampleRef}; +use crate::{ + live_checker::LiveChecker, sample_attribute::SampleAttribute, sample_metric::SampleInstrument, + Error, SampleRef, +}; /// Embedded default live check rego policies pub const DEFAULT_LIVE_CHECK_REGO: &str = @@ -238,16 +241,46 @@ impl Advisor for TypeAdvisor { // Check the instrument and unit of the metric let mut advice_list = Vec::new(); if let Some(semconv_metric) = registry_group { - if let Some(semconv_instrument) = &semconv_metric.instrument { - if semconv_instrument != &sample_metric.instrument { + match &sample_metric.instrument { + SampleInstrument::Summary => { + advice_list.push(Advice { + advice_type: "legacy_instrument".to_owned(), + value: Value::String(sample_metric.instrument.to_string()), + message: "`Summary` Instrument is not supported".to_owned(), + advice_level: AdviceLevel::Violation, + }); + } + SampleInstrument::Unspecified => { advice_list.push(Advice { - advice_type: "instrument_mismatch".to_owned(), + advice_type: "instrument_missing".to_owned(), value: Value::String(sample_metric.instrument.to_string()), - message: format!("Instrument should be `{}`", semconv_instrument), + message: "An Instrument must be specified".to_owned(), advice_level: AdviceLevel::Violation, }); } + _ => { + if let Some(semconv_instrument) = &semconv_metric.instrument { + if let Some(sample_instrument) = + &sample_metric.instrument.as_semconv() + { + if semconv_instrument != sample_instrument { + advice_list.push(Advice { + advice_type: "instrument_mismatch".to_owned(), + value: Value::String( + sample_metric.instrument.to_string(), + ), + message: format!( + "Instrument should be `{}`", + sample_instrument + ), + advice_level: AdviceLevel::Violation, + }); + } + } + } + } } + if let Some(semconv_unit) = &semconv_metric.unit { if semconv_unit != &sample_metric.unit { advice_list.push(Advice { diff --git a/crates/weaver_live_check/src/live_checker.rs b/crates/weaver_live_check/src/live_checker.rs index 8f83bab3..b54061d9 100644 --- a/crates/weaver_live_check/src/live_checker.rs +++ b/crates/weaver_live_check/src/live_checker.rs @@ -110,6 +110,9 @@ mod tests { use crate::{ advice::{DeprecatedAdvisor, EnumAdvisor, RegoAdvisor, StabilityAdvisor, TypeAdvisor}, sample_attribute::SampleAttribute, + sample_metric::{ + DataPoints, SampleExponentialHistogramDataPoint, SampleInstrument, SampleMetric, + }, LiveCheckRunner, LiveCheckStatistics, Sample, }; @@ -899,4 +902,74 @@ mod tests { .contains("use of undefined variable")); } } + + #[test] + fn test_exponential_histogram() { + let registry = make_metrics_registry(); + + // Generate a sample with exponential histogram data points + let sample = Sample::Metric(SampleMetric { + name: "system.memory.usage".to_owned(), + instrument: SampleInstrument::Histogram, + unit: "By".to_owned(), + data_points: Some(DataPoints::ExponentialHistogram(vec![ + SampleExponentialHistogramDataPoint { + attributes: vec![], + count: 0, + sum: None, + min: None, + max: None, + live_check_result: None, + }, + ])), + live_check_result: None, + }); + let mut samples = vec![sample]; + let advisors: Vec> = vec![Box::new(TypeAdvisor)]; + let mut live_checker = LiveChecker::new(registry, advisors); + + let mut stats = LiveCheckStatistics::new(&live_checker.registry); + for sample in &mut samples { + let result = sample.run_live_check(&mut live_checker, &mut stats); + assert!(result.is_ok()); + } + stats.finalize(); + assert_eq!( + stats.advice_type_counts.get("instrument_mismatch"), + Some(&1) + ); + } + + #[test] + fn test_summary_unspecified() { + let registry = make_metrics_registry(); + + let mut samples = vec![ + Sample::Metric(SampleMetric { + name: "system.memory.usage".to_owned(), + instrument: SampleInstrument::Summary, + unit: "By".to_owned(), + data_points: None, + live_check_result: None, + }), + Sample::Metric(SampleMetric { + name: "system.memory.usage".to_owned(), + instrument: SampleInstrument::Unspecified, + unit: "By".to_owned(), + data_points: None, + live_check_result: None, + }), + ]; + let advisors: Vec> = vec![Box::new(TypeAdvisor)]; + let mut live_checker = LiveChecker::new(registry, advisors); + + let mut stats = LiveCheckStatistics::new(&live_checker.registry); + for sample in &mut samples { + let result = sample.run_live_check(&mut live_checker, &mut stats); + assert!(result.is_ok()); + } + stats.finalize(); + assert_eq!(stats.advice_type_counts.get("legacy_instrument"), Some(&1)); + assert_eq!(stats.advice_type_counts.get("instrument_missing"), Some(&1)); + } } diff --git a/crates/weaver_live_check/src/sample_metric.rs b/crates/weaver_live_check/src/sample_metric.rs index dadb727c..65e9d998 100644 --- a/crates/weaver_live_check/src/sample_metric.rs +++ b/crates/weaver_live_check/src/sample_metric.rs @@ -2,6 +2,8 @@ //! Intermediary format for telemetry sample spans +use std::fmt::{Display, Formatter}; + use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -13,6 +15,52 @@ use crate::{ LiveCheckRunner, LiveCheckStatistics, SampleRef, MISSING_METRIC_ADVICE_TYPE, }; +/// Represents the instrument type of a metric +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "lowercase")] +pub enum SampleInstrument { + /// An up-down counter metric. + UpDownCounter, + /// A counter metric. + Counter, + /// A gauge metric. + Gauge, + /// A histogram metric. + Histogram, + /// A summary metric. This is no longer used and will cause a violation. + Summary, + /// Unspecified instrument type. + Unspecified, +} + +impl SampleInstrument { + /// Converts the instrument type to a semconv instrument type. + #[must_use] + pub fn as_semconv(&self) -> Option { + match self { + SampleInstrument::UpDownCounter => Some(InstrumentSpec::UpDownCounter), + SampleInstrument::Counter => Some(InstrumentSpec::Counter), + SampleInstrument::Gauge => Some(InstrumentSpec::Gauge), + SampleInstrument::Histogram => Some(InstrumentSpec::Histogram), + _ => None, + } + } +} + +/// Implements a human readable display for the instrument. +impl Display for SampleInstrument { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + SampleInstrument::UpDownCounter => write!(f, "updowncounter"), + SampleInstrument::Counter => write!(f, "counter"), + SampleInstrument::Gauge => write!(f, "gauge"), + SampleInstrument::Histogram => write!(f, "histogram"), + SampleInstrument::Summary => write!(f, "summary"), + SampleInstrument::Unspecified => write!(f, "unspecified"), + } + } +} + /// The data point types of a metric #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] #[serde(untagged)] @@ -23,8 +71,6 @@ pub enum DataPoints { Histogram(Vec), /// Exponential histogram data points ExponentialHistogram(Vec), - /// Summary data points - Summary(Vec), } /// Represents a single data point of a metric @@ -62,6 +108,16 @@ pub struct SampleHistogramDataPoint { pub struct SampleExponentialHistogramDataPoint { /// The attributes of the data point pub attributes: Vec, + /// The count of the data point + pub count: u64, + /// The sum of the data point + pub sum: Option, + /// The minimum of the data point + pub min: Option, + /// The maximum of the data point + pub max: Option, + /// Live check result + pub live_check_result: Option, } /// Represents a single summary data point of a metric @@ -77,7 +133,7 @@ pub struct SampleMetric { /// Metric name. pub name: String, /// Type of the metric (e.g. gauge, histogram, ...). - pub instrument: InstrumentSpec, + pub instrument: SampleInstrument, /// Unit of the metric. pub unit: String, /// Data points of the metric. diff --git a/src/registry/otlp/conversion.rs b/src/registry/otlp/conversion.rs index 75968875..50861c98 100644 --- a/src/registry/otlp/conversion.rs +++ b/src/registry/otlp/conversion.rs @@ -5,10 +5,10 @@ use serde_json::{json, Value}; use weaver_live_check::{ sample_attribute::SampleAttribute, - sample_metric::{DataPoints, SampleMetric}, + sample_metric::{DataPoints, SampleInstrument, SampleMetric}, sample_span::{Status, StatusCode}, }; -use weaver_semconv::group::{InstrumentSpec, SpanKindSpec}; +use weaver_semconv::group::SpanKindSpec; use super::grpc_stubs::proto::{ common::v1::{AnyValue, KeyValue}, @@ -104,18 +104,22 @@ pub fn otlp_metric_to_sample(otlp_metric: Metric) -> SampleMetric { /// updowncounter → Sum with is_monotonic: false /// gauge → Gauge /// histogram → Histogram -fn otlp_data_to_instrument(data: &Option) -> InstrumentSpec { +/// histogram → ExponentialHistogram +/// summary → Summary (this will cause a legacy_instrument violation) +fn otlp_data_to_instrument(data: &Option) -> SampleInstrument { match data { Some(Data::Sum(sum)) => { if sum.is_monotonic { - InstrumentSpec::Counter + SampleInstrument::Counter } else { - InstrumentSpec::UpDownCounter + SampleInstrument::UpDownCounter } } - Some(Data::Gauge(_)) => InstrumentSpec::Gauge, - Some(Data::Histogram(_)) => InstrumentSpec::Histogram, - _ => InstrumentSpec::Gauge, // TODO Default to Gauge if unknown? + Some(Data::Gauge(_)) => SampleInstrument::Gauge, + Some(Data::Histogram(_)) => SampleInstrument::Histogram, + Some(Data::ExponentialHistogram(_)) => SampleInstrument::Histogram, + Some(Data::Summary(_)) => SampleInstrument::Summary, + None => SampleInstrument::Unspecified, } } @@ -127,10 +131,37 @@ fn otlp_data_to_data_points(data: &Option) -> Option { Some(Data::Histogram(histogram)) => { Some(otlp_histogram_data_points(&histogram.data_points)) } + Some(Data::ExponentialHistogram(exponential_histogram)) => Some( + otlp_exponential_histogram_data_points(&exponential_histogram.data_points), + ), _ => None, } } +/// Converts OTLP ExponentialHistogram data points to DataPoints::ExponentialHistogram +fn otlp_exponential_histogram_data_points( + otlp: &Vec, +) -> DataPoints { + let mut data_points = Vec::new(); + for point in otlp { + let live_check_point = + weaver_live_check::sample_metric::SampleExponentialHistogramDataPoint { + attributes: point + .attributes + .iter() + .map(sample_attribute_from_key_value) + .collect(), + count: point.count, + sum: point.sum, + min: point.min, + max: point.max, + live_check_result: None, + }; + data_points.push(live_check_point); + } + DataPoints::ExponentialHistogram(data_points) +} + /// Converts OTLP Histogram data points to DataPoints::Histogram fn otlp_histogram_data_points(otlp: &Vec) -> DataPoints { let mut data_points = Vec::new();