diff --git a/Cargo.lock b/Cargo.lock index 76ba5c69..97ba1c87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5947,6 +5947,7 @@ dependencies = [ "globset", "miette", "regorus", + "schemars", "serde", "serde_json", "serde_yaml", @@ -6065,6 +6066,7 @@ name = "weaver_live_check" version = "0.15.0" dependencies = [ "miette", + "schemars", "serde", "serde_json", "tempfile", diff --git a/crates/weaver_checker/Cargo.toml b/crates/weaver_checker/Cargo.toml index f7028d3b..0283b8fa 100644 --- a/crates/weaver_checker/Cargo.toml +++ b/crates/weaver_checker/Cargo.toml @@ -23,6 +23,7 @@ serde_yaml.workspace = true walkdir.workspace = true globset.workspace = true miette.workspace = true +schemars.workspace = true regorus = { version = "0.4.0", default-features = false, features = [ "std", diff --git a/crates/weaver_checker/src/violation.rs b/crates/weaver_checker/src/violation.rs index 0d1bb256..ae79ce4a 100644 --- a/crates/weaver_checker/src/violation.rs +++ b/crates/weaver_checker/src/violation.rs @@ -2,6 +2,7 @@ //! Definition of a policy violation. +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::fmt::{Display, Formatter}; @@ -72,7 +73,9 @@ impl Violation { } /// The level of an advice -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Eq, Hash)] +#[derive( + Debug, Clone, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Eq, Hash, JsonSchema, +)] #[serde(rename_all = "snake_case")] pub enum AdviceLevel { /// Useful context without action needed @@ -84,7 +87,7 @@ pub enum AdviceLevel { } /// Represents a live check advice -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct Advice { /// The type of advice e.g. "is_deprecated" pub advice_type: String, diff --git a/crates/weaver_live_check/Cargo.toml b/crates/weaver_live_check/Cargo.toml index dcbf6a14..8fbe0989 100644 --- a/crates/weaver_live_check/Cargo.toml +++ b/crates/weaver_live_check/Cargo.toml @@ -19,6 +19,7 @@ thiserror.workspace = true serde.workspace = true serde_json.workspace = true miette.workspace = true +schemars.workspace = true [dev-dependencies] tempfile = "3.18.0" diff --git a/crates/weaver_live_check/README.md b/crates/weaver_live_check/README.md index 17343ddb..2a4bbbe9 100644 --- a/crates/weaver_live_check/README.md +++ b/crates/weaver_live_check/README.md @@ -107,6 +107,9 @@ As mentioned, a list of `Advice` is returned in the report for each sample entit } ``` +> **Note** +> The `live_check_result` object augments the sample entity at the pertinent level in the structure. If the structure is `metric`->`[number_data_point]`->`[attribute]`, advice should be give at the `number_data_point` level for, say, required attributes that have not been supplied. Whereas, attribute advice, like `missing_attribute` in the JSON above, is given at the attribute level. + ### Custom advisors Use the `--advice-policies` command line option to provide a path to a directory containing Rego policies with the `live_check_advice` package name. Here's a very simple example that rejects any attribute name containing the string "test": @@ -118,8 +121,8 @@ import rego.v1 # checks attribute name contains the word "test" deny contains make_advice(advice_type, advice_level, value, message) if { - input.attribute - value := input.attribute.name + input.sample.attribute + value := input.sample.attribute.name contains(value, "test") advice_type := "contains_test" advice_level := "violation" @@ -135,7 +138,13 @@ make_advice(advice_type, advice_level, value, message) := { } ``` -`input` contains the sample entity for assessment wrapped in a type e.g. `input.attribute` or `input.span`. `data` contains a structure derived from the supplied `Registry`. A jq preprocessor takes the `Registry` (and maps for attributes and templates) to produce the `data` for the policy. If the jq is simply `.` this will passthrough as-is. Preprocessing is used to improve Rego performance and to simplify policy definitions. With this model `data` is processed once whereas the Rego policy runs for every sample entity as it arrives in the stream. +`input.sample` contains the sample entity for assessment wrapped in a type e.g. `input.sample.attribute` or `input.sample.span`. + +`input.registry_attribute`, when present, contains the matching attribute definition from the supplied registry. + +`input.registry_group`, when present, contains the matching group definition from the supplied registry. + +`data` contains a structure derived from the supplied `Registry`. A jq preprocessor takes the `Registry` (and maps for attributes and templates) to produce the `data` for the policy. If the jq is simply `.` this will passthrough as-is. Preprocessing is used to improve Rego performance and to simplify policy definitions. With this model `data` is processed once whereas the Rego policy runs for every sample entity as it arrives in the stream. To override the default Otel jq preprocessor provide a path to the jq file through the `--advice-preprocessor` option. @@ -202,7 +211,9 @@ These should be self-explanatory, but: - `no_advice_count` is the number of samples that received no advice - `seen_registry_attributes` is a record of how many times each attribute in the registry was seen in the samples - `seen_non_registry_attributes` is a record of how many times each non-registry attribute was seen in the samples -- `registry_coverage` is the fraction of seen registry attributes over the total registry attributes +- `seen_registry_metrics` is a record of how many times each metric in the registry was seen in the samples +- `seen_non_registry_attributes` is a record of how many times each non-registry metric was seen in the samples +- `registry_coverage` is the fraction of seen registry entities over the total registry entities This could be parsed for a more sophisticated way to determine pass/fail in CI for example. diff --git a/crates/weaver_live_check/data/metrics.json b/crates/weaver_live_check/data/metrics.json new file mode 100644 index 00000000..f9dad36b --- /dev/null +++ b/crates/weaver_live_check/data/metrics.json @@ -0,0 +1,98 @@ +[ + { + "metric": { + "data_points": [ + { + "attributes": [ + { + "name": "state", + "value": "used" + } + ], + "value": 26963050496 + }, + { + "attributes": [ + { + "name": "state", + "value": "free" + } + ], + "value": 586153984 + }, + { + "attributes": [ + { + "name": "system.memory.state", + "value": "inactive" + } + ], + "value": 681053388.8 + } + ], + "instrument": "updowncounter", + "name": "system.memory.usage", + "unit": "By" + } + }, + { + "metric": { + "data_points": [ + { + "attributes": [], + "bucket_counts": [ + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0 + ], + "count": 1, + "max": 7.0015, + "min": 7.0015, + "sum": 7.0015 + } + ], + "instrument": "histogram", + "name": "rpc.client.duration", + "unit": "ms" + } + }, + { + "metric": { + "data_points": [ + { + "attributes": [], + "value": 151552000 + } + ], + "instrument": "gauge", + "name": "otelcol_process_memory_rss", + "unit": "By" + } + }, + { + "metric": { + "data_points": [ + { + "attributes": [], + "value": 39585616 + } + ], + "instrument": "counter", + "name": "otelcol_process_runtime_total_alloc_bytes", + "unit": "By" + } + } +] \ No newline at end of file diff --git a/crates/weaver_live_check/data/model/metrics/metrics.yaml b/crates/weaver_live_check/data/model/metrics/metrics.yaml new file mode 100644 index 00000000..58522f70 --- /dev/null +++ b/crates/weaver_live_check/data/model/metrics/metrics.yaml @@ -0,0 +1,54 @@ +groups: + - id: registry.system.memory + type: attribute_group + display_name: System Memory Attributes + brief: "Describes System Memory attributes" + attributes: + - id: system.memory.state + type: + members: + - id: used + value: "used" + stability: development + - id: free + value: "free" + stability: development + - id: shared + value: "shared" + stability: development + deprecated: "Removed, report shared memory usage with `metric.system.memory.shared` metric" + - id: buffers + value: "buffers" + stability: development + - id: cached + value: "cached" + stability: development + stability: development + brief: "The memory state" + examples: ["free", "cached"] + + # system.* metrics + - id: metric.system.uptime + type: metric + metric_name: system.uptime + stability: development + brief: "The time the system has been running" + note: | + Instrumentations SHOULD use a gauge with type `double` and measure uptime in seconds as a floating point number with the highest precision available. + The actual accuracy would depend on the instrumentation and operating system. + instrument: gauge + unit: "s" + + # system.memory.* metrics + - id: metric.system.memory.usage + type: metric + metric_name: system.memory.usage + stability: development + brief: "Reports memory in use by state." + note: | + The sum over all `system.memory.state` values SHOULD equal the total memory + available on the system, that is `system.memory.limit`. + instrument: updowncounter + unit: "By" + attributes: + - ref: system.memory.state diff --git a/crates/weaver_live_check/data/policies/live_check_advice/test.rego b/crates/weaver_live_check/data/policies/live_check_advice/test.rego index 9115f8ab..24bb538b 100644 --- a/crates/weaver_live_check/data/policies/live_check_advice/test.rego +++ b/crates/weaver_live_check/data/policies/live_check_advice/test.rego @@ -4,8 +4,8 @@ import rego.v1 # checks attribute name contains the word "test" deny contains make_advice(advice_type, advice_level, value, message) if { - input.attribute - value := input.attribute.name + input.sample.attribute + value := input.sample.attribute.name contains(value, "test") advice_type := "contains_test" advice_level := "violation" @@ -14,8 +14,8 @@ deny contains make_advice(advice_type, advice_level, value, message) if { # checks span name contains the word "test" deny contains make_advice(advice_type, advice_level, value, message) if { - input.span - value := input.span.name + input.sample.span + value := input.sample.span.name contains(value, "test") advice_type := "contains_test" advice_level := "violation" @@ -24,8 +24,8 @@ deny contains make_advice(advice_type, advice_level, value, message) if { # checks span status message contains the word "test" deny contains make_advice(advice_type, advice_level, value, message) if { - input.span - value := input.span.status.message + input.sample.span + value := input.sample.span.status.message contains(value, "test") advice_type := "contains_test_in_status" advice_level := "violation" @@ -34,14 +34,26 @@ deny contains make_advice(advice_type, advice_level, value, message) if { # checks span_event name contains the word "test" deny contains make_advice(advice_type, advice_level, value, message) if { - input.span_event - value := input.span_event.name + input.sample.span_event + value := input.sample.span_event.name contains(value, "test") advice_type := "contains_test" advice_level := "violation" message := "Name must not contain 'test'" } +# This example shows how to use the registry_group provided in the input. +# If the metric's unit is "By" the value in this data-point must be an integer. +deny contains make_advice(advice_type, advice_level, value, message) if { + input.sample.number_data_point + value := input.sample.number_data_point.value + input.registry_group.unit == "By" + value != floor(value) # not a good type check, but serves as an example + advice_type := "invalid_data_point_value" + advice_level := "violation" + message := "Value must be an integer when unit is 'By'" +} + make_advice(advice_type, advice_level, value, message) := { "type": "advice", "advice_type": advice_type, diff --git a/crates/weaver_live_check/src/advice.rs b/crates/weaver_live_check/src/advice.rs index 7ba659d2..29a37776 100644 --- a/crates/weaver_live_check/src/advice.rs +++ b/crates/weaver_live_check/src/advice.rs @@ -2,7 +2,7 @@ //! Builtin advisors -use std::{collections::BTreeMap, path::PathBuf}; +use std::{collections::BTreeMap, path::PathBuf, rc::Rc}; use serde::Serialize; use serde_json::Value; @@ -18,7 +18,10 @@ use weaver_semconv::{ stability::Stability, }; -use crate::{live_checker::LiveChecker, Error, SampleRef}; +use crate::{ + live_checker::LiveChecker, sample_attribute::SampleAttribute, sample_metric::SampleInstrument, + Error, SampleRef, +}; /// Embedded default live check rego policies pub const DEFAULT_LIVE_CHECK_REGO: &str = @@ -37,19 +40,27 @@ pub trait Advisor { fn advise( &mut self, sample: SampleRef<'_>, - registry_attribute: Option<&Attribute>, - registry_group: Option<&ResolvedGroup>, + registry_attribute: Option<&Rc>, + registry_group: Option<&Rc>, ) -> Result, Error>; } +fn deprecated_to_value(deprecated: &Deprecated) -> Value { + match deprecated { + Deprecated::Renamed { .. } => Value::String("renamed".to_owned()), + Deprecated::Obsoleted { .. } => Value::String("obsoleted".to_owned()), + Deprecated::Uncategorized { .. } => Value::String("uncategorized".to_owned()), + } +} + /// An advisor that checks if an attribute is deprecated pub struct DeprecatedAdvisor; impl Advisor for DeprecatedAdvisor { fn advise( &mut self, sample: SampleRef<'_>, - registry_attribute: Option<&Attribute>, - _registry_group: Option<&ResolvedGroup>, + registry_attribute: Option<&Rc>, + registry_group: Option<&Rc>, ) -> Result, Error> { match sample { SampleRef::Attribute(_sample_attribute) => { @@ -58,15 +69,21 @@ impl Advisor for DeprecatedAdvisor { if let Some(deprecated) = &attribute.deprecated { advices.push(Advice { advice_type: "deprecated".to_owned(), - value: match deprecated { - Deprecated::Renamed { .. } => Value::String("renamed".to_owned()), - Deprecated::Obsoleted { .. } => { - Value::String("obsoleted".to_owned()) - } - Deprecated::Uncategorized { .. } => { - Value::String("uncategorized".to_owned()) - } - }, + value: deprecated_to_value(deprecated), + message: deprecated.to_string(), + advice_level: AdviceLevel::Violation, + }); + } + } + Ok(advices) + } + SampleRef::Metric(_sample_metric) => { + let mut advices = Vec::new(); + if let Some(group) = registry_group { + if let Some(deprecated) = &group.deprecated { + advices.push(Advice { + advice_type: "deprecated".to_owned(), + value: deprecated_to_value(deprecated), message: deprecated.to_string(), advice_level: AdviceLevel::Violation, }); @@ -88,8 +105,8 @@ impl Advisor for StabilityAdvisor { fn advise( &mut self, sample: SampleRef<'_>, - registry_attribute: Option<&Attribute>, - _registry_group: Option<&ResolvedGroup>, + registry_attribute: Option<&Rc>, + registry_group: Option<&Rc>, ) -> Result, Error> { match sample { SampleRef::Attribute(_sample_attribute) => { @@ -109,6 +126,23 @@ impl Advisor for StabilityAdvisor { } Ok(advices) } + SampleRef::Metric(_sample_metric) => { + let mut advices = Vec::new(); + if let Some(group) = registry_group { + match group.stability { + Some(ref stability) if *stability != Stability::Stable => { + advices.push(Advice { + advice_type: "stability".to_owned(), + value: Value::String(stability.to_string()), + message: "Is not stable".to_owned(), + advice_level: AdviceLevel::Improvement, + }); + } + _ => {} + } + } + Ok(advices) + } _ => Ok(Vec::new()), } } @@ -116,12 +150,37 @@ impl Advisor for StabilityAdvisor { /// An advisor that checks if an attribute has the correct type pub struct TypeAdvisor; + +/// Checks if required attributes from a resolved group are present in a list of attributes +/// Returns a list of advice for missing attributes +fn check_required_attributes( + required_attributes: &[Attribute], + attributes: &[SampleAttribute], +) -> Vec { + let mut advice_list = Vec::new(); + for required_attribute in required_attributes { + // Check if the attribute is present in the sample + if !attributes + .iter() + .any(|attribute| attribute.name == required_attribute.name) + { + advice_list.push(Advice { + advice_type: "attribute_required".to_owned(), + value: Value::String(required_attribute.name.clone()), + message: "Attribute is required".to_owned(), + advice_level: AdviceLevel::Violation, + }); + } + } + advice_list +} + impl Advisor for TypeAdvisor { fn advise( &mut self, sample: SampleRef<'_>, - registry_attribute: Option<&Attribute>, - _registry_group: Option<&ResolvedGroup>, + registry_attribute: Option<&Rc>, + registry_group: Option<&Rc>, ) -> Result, Error> { match sample { SampleRef::Attribute(sample_attribute) => { @@ -178,6 +237,83 @@ impl Advisor for TypeAdvisor { _ => Ok(Vec::new()), } } + SampleRef::Metric(sample_metric) => { + // Check the instrument and unit of the metric + let mut advice_list = Vec::new(); + if let Some(semconv_metric) = registry_group { + match &sample_metric.instrument { + SampleInstrument::Summary => { + advice_list.push(Advice { + advice_type: "legacy_instrument".to_owned(), + value: Value::String(sample_metric.instrument.to_string()), + message: "`Summary` Instrument is not supported".to_owned(), + advice_level: AdviceLevel::Violation, + }); + } + SampleInstrument::Unspecified => { + advice_list.push(Advice { + advice_type: "instrument_missing".to_owned(), + value: Value::String(sample_metric.instrument.to_string()), + message: "An Instrument must be specified".to_owned(), + advice_level: AdviceLevel::Violation, + }); + } + _ => { + if let Some(semconv_instrument) = &semconv_metric.instrument { + if let Some(sample_instrument) = + &sample_metric.instrument.as_semconv() + { + if semconv_instrument != sample_instrument { + advice_list.push(Advice { + advice_type: "instrument_mismatch".to_owned(), + value: Value::String( + sample_metric.instrument.to_string(), + ), + message: format!( + "Instrument should be `{}`", + sample_instrument + ), + advice_level: AdviceLevel::Violation, + }); + } + } + } + } + } + + if let Some(semconv_unit) = &semconv_metric.unit { + if semconv_unit != &sample_metric.unit { + advice_list.push(Advice { + advice_type: "unit_mismatch".to_owned(), + value: Value::String(sample_metric.unit.clone()), + message: format!("Unit should be `{}`", semconv_unit), + advice_level: AdviceLevel::Violation, + }); + } + } + } + Ok(advice_list) + } + SampleRef::NumberDataPoint(sample_number_data_point) => { + if let Some(semconv_metric) = registry_group { + Ok(check_required_attributes( + &semconv_metric.attributes, + &sample_number_data_point.attributes, + )) + } else { + Ok(Vec::new()) + } + } + SampleRef::HistogramDataPoint(sample_histogram_data_point) => { + if let Some(semconv_metric) = registry_group { + Ok(check_required_attributes( + &semconv_metric.attributes, + &sample_histogram_data_point.attributes, + )) + } else { + Ok(Vec::new()) + } + } _ => Ok(Vec::new()), } } @@ -189,8 +325,8 @@ impl Advisor for EnumAdvisor { fn advise( &mut self, sample: SampleRef<'_>, - registry_attribute: Option<&Attribute>, - _registry_group: Option<&ResolvedGroup>, + registry_attribute: Option<&Rc>, + _registry_group: Option<&Rc>, ) -> Result, Error> { match sample { SampleRef::Attribute(sample_attribute) => { @@ -334,13 +470,25 @@ impl RegoAdvisor { } } +/// Input data for the check function +#[derive(Serialize)] +struct RegoInput<'a> { + sample: SampleRef<'a>, + registry_attribute: Option<&'a Rc>, + registry_group: Option<&'a Rc>, +} + impl Advisor for RegoAdvisor { fn advise( &mut self, sample: SampleRef<'_>, - _registry_attribute: Option<&Attribute>, - _registry_group: Option<&ResolvedGroup>, + registry_attribute: Option<&Rc>, + registry_group: Option<&Rc>, ) -> Result, Error> { - self.check(sample) + self.check(RegoInput { + sample, + registry_attribute, + registry_group, + }) } } diff --git a/crates/weaver_live_check/src/lib.rs b/crates/weaver_live_check/src/lib.rs index ee79013d..70e8accd 100644 --- a/crates/weaver_live_check/src/lib.rs +++ b/crates/weaver_live_check/src/lib.rs @@ -7,12 +7,15 @@ use std::collections::HashMap; use live_checker::LiveChecker; use miette::Diagnostic; use sample_attribute::SampleAttribute; +use sample_metric::{SampleHistogramDataPoint, SampleMetric, SampleNumberDataPoint}; use sample_resource::SampleResource; use sample_span::{SampleSpan, SampleSpanEvent, SampleSpanLink}; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use weaver_checker::violation::{Advice, AdviceLevel}; use weaver_common::diagnostic::{DiagnosticMessage, DiagnosticMessages}; use weaver_forge::registry::ResolvedRegistry; +use weaver_semconv::group::GroupType; /// Advisors for live checks pub mod advice; @@ -24,6 +27,8 @@ pub mod json_stdin_ingester; pub mod live_checker; /// The intermediary format for attributes pub mod sample_attribute; +/// The intermediary format for metrics +pub mod sample_metric; /// An intermediary format for resources pub mod sample_resource; /// The intermediary format for spans @@ -37,6 +42,8 @@ pub mod text_stdin_ingester; pub const MISSING_ATTRIBUTE_ADVICE_TYPE: &str = "missing_attribute"; /// Template Attribute advice type pub const TEMPLATE_ATTRIBUTE_ADVICE_TYPE: &str = "template_attribute"; +/// Missing Metric advice type +pub const MISSING_METRIC_ADVICE_TYPE: &str = "missing_metric"; /// Weaver live check errors #[derive(thiserror::Error, Debug, Clone, PartialEq, Serialize, Diagnostic)] @@ -81,7 +88,7 @@ pub trait Ingester { } /// Represents a sample entity -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "snake_case")] pub enum Sample { /// A sample attribute @@ -94,6 +101,8 @@ pub enum Sample { SpanLink(SampleSpanLink), /// A sample resource Resource(SampleResource), + /// A sample metric + Metric(SampleMetric), } /// Represents a sample entity with a reference to the inner type @@ -110,6 +119,12 @@ pub enum SampleRef<'a> { SpanLink(&'a SampleSpanLink), /// A sample resource Resource(&'a SampleResource), + /// A sample metric + Metric(&'a SampleMetric), + /// A sample number data point + NumberDataPoint(&'a SampleNumberDataPoint), + /// A sample histogram data point + HistogramDataPoint(&'a SampleHistogramDataPoint), } // Dispatch the live check to the sample type @@ -125,12 +140,13 @@ impl LiveCheckRunner for Sample { Sample::SpanEvent(span_event) => span_event.run_live_check(live_checker, stats), Sample::SpanLink(span_link) => span_link.run_live_check(live_checker, stats), Sample::Resource(resource) => resource.run_live_check(live_checker, stats), + Sample::Metric(metric) => metric.run_live_check(live_checker, stats), } } } /// Represents a live check result -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct LiveCheckResult { /// Advice on the entity pub all_advice: Vec, @@ -195,17 +211,21 @@ pub struct LiveCheckStatistics { pub total_advisories: usize, /// The number of each advice level pub advice_level_counts: HashMap, - /// The number of attributes with each highest advice level + /// The number of entities with each highest advice level pub highest_advice_level_counts: HashMap, - /// The number of attributes with no advice + /// The number of entities with no advice pub no_advice_count: usize, - /// The number of attributes with each advice type + /// The number of entities with each advice type pub advice_type_counts: HashMap, /// The number of each attribute seen from the registry pub seen_registry_attributes: HashMap, /// The number of each non-registry attribute seen pub seen_non_registry_attributes: HashMap, - /// Fraction of the registry covered by the attributes + /// The number of each metric seen from the registry + pub seen_registry_metrics: HashMap, + /// The number of each non-registry metric seen + pub seen_non_registry_metrics: HashMap, + /// Fraction of the registry covered by the attributes and metrics pub registry_coverage: f32, } @@ -214,12 +234,18 @@ impl LiveCheckStatistics { #[must_use] pub fn new(registry: &ResolvedRegistry) -> Self { let mut seen_attributes = HashMap::new(); + let mut seen_metrics = HashMap::new(); for group in ®istry.groups { for attribute in &group.attributes { if attribute.deprecated.is_none() { let _ = seen_attributes.insert(attribute.name.clone(), 0); } } + if group.r#type == GroupType::Metric && group.deprecated.is_none() { + if let Some(metric_name) = &group.metric_name { + let _ = seen_metrics.insert(metric_name.clone(), 0); + } + } } LiveCheckStatistics { total_entities: 0, @@ -231,6 +257,8 @@ impl LiveCheckStatistics { advice_type_counts: HashMap::new(), seen_registry_attributes: seen_attributes, seen_non_registry_attributes: HashMap::new(), + seen_registry_metrics: seen_metrics, + seen_non_registry_metrics: HashMap::new(), registry_coverage: 0.0, } } @@ -306,6 +334,20 @@ impl LiveCheckStatistics { } } + /// Add metric name to coverage + pub fn add_metric_name_to_coverage(&mut self, seen_metric_name: String) { + if let Some(count) = self.seen_registry_metrics.get_mut(&seen_metric_name) { + // This is a registry metric + *count += 1; + } else { + // This is a non-registry metric + *self + .seen_non_registry_metrics + .entry(seen_metric_name) + .or_insert(0) += 1; + } + } + /// Are there any violations in the statistics? #[must_use] pub fn has_violations(&self) -> bool { @@ -316,16 +358,26 @@ impl LiveCheckStatistics { /// Finalize the statistics pub fn finalize(&mut self) { // Calculate the registry coverage - // non-zero attributes / total attributes + // (non-zero attributes + non-zero metrics) / (total attributes + total metrics) let non_zero_attributes = self .seen_registry_attributes .values() .filter(|&&count| count > 0) .count(); let total_registry_attributes = self.seen_registry_attributes.len(); - if total_registry_attributes > 0 { + + let non_zero_metrics = self + .seen_registry_metrics + .values() + .filter(|&&count| count > 0) + .count(); + let total_registry_metrics = self.seen_registry_metrics.len(); + + let total_registry_items = total_registry_attributes + total_registry_metrics; + + if total_registry_items > 0 { self.registry_coverage = - (non_zero_attributes as f32) / (total_registry_attributes as f32); + ((non_zero_attributes + non_zero_metrics) as f32) / (total_registry_items as f32); } else { self.registry_coverage = 0.0; } @@ -341,3 +393,11 @@ pub trait LiveCheckRunner { stats: &mut LiveCheckStatistics, ) -> Result<(), Error>; } + +/// Get the JSON schema for the Sample struct +pub fn get_json_schema() -> Result { + let schema = schemars::schema_for!(Sample); + serde_json::to_string_pretty(&schema).map_err(|e| Error::OutputError { + error: e.to_string(), + }) +} diff --git a/crates/weaver_live_check/src/live_checker.rs b/crates/weaver_live_check/src/live_checker.rs index 60284a64..b54061d9 100644 --- a/crates/weaver_live_check/src/live_checker.rs +++ b/crates/weaver_live_check/src/live_checker.rs @@ -4,9 +4,10 @@ use serde::Serialize; use std::collections::HashMap; -use weaver_semconv::attribute::AttributeType; +use std::rc::Rc; +use weaver_semconv::{attribute::AttributeType, group::GroupType}; -use weaver_forge::registry::ResolvedRegistry; +use weaver_forge::registry::{ResolvedGroup, ResolvedRegistry}; use weaver_resolved_schema::attribute::Attribute; use crate::advice::Advisor; @@ -16,13 +17,14 @@ use crate::advice::Advisor; pub struct LiveChecker { /// The resolved registry pub registry: ResolvedRegistry, - semconv_attributes: HashMap, - semconv_templates: HashMap, + semconv_attributes: HashMap>, + semconv_templates: HashMap>, + semconv_metrics: HashMap>, /// The advisors to run #[serde(skip)] pub advisors: Vec>, #[serde(skip)] - templates_by_length: Vec<(String, Attribute)>, + templates_by_length: Vec<(String, Rc)>, } impl LiveChecker { @@ -33,17 +35,26 @@ impl LiveChecker { let mut semconv_attributes = HashMap::new(); let mut semconv_templates = HashMap::new(); let mut templates_by_length = Vec::new(); + // Hashmap of metrics by name + let mut semconv_metrics = HashMap::new(); for group in ®istry.groups { + if group.r#type == GroupType::Metric { + if let Some(metric_name) = &group.metric_name { + let group_rc = Rc::new(group.clone()); + let _ = semconv_metrics.insert(metric_name.clone(), group_rc); + } + } for attribute in &group.attributes { + let attribute_rc = Rc::new(attribute.clone()); match attribute.r#type { AttributeType::Template(_) => { - templates_by_length.push((attribute.name.clone(), attribute.clone())); - let _ = semconv_templates.insert(attribute.name.clone(), attribute.clone()); + templates_by_length + .push((attribute.name.clone(), Rc::clone(&attribute_rc))); + let _ = semconv_templates.insert(attribute.name.clone(), attribute_rc); } _ => { - let _ = - semconv_attributes.insert(attribute.name.clone(), attribute.clone()); + let _ = semconv_attributes.insert(attribute.name.clone(), attribute_rc); } } } @@ -56,6 +67,7 @@ impl LiveChecker { registry, semconv_attributes, semconv_templates, + semconv_metrics, advisors, templates_by_length, } @@ -68,17 +80,23 @@ impl LiveChecker { /// Find an attribute in the registry #[must_use] - pub fn find_attribute(&self, name: &str) -> Option<&Attribute> { - self.semconv_attributes.get(name) + pub fn find_attribute(&self, name: &str) -> Option> { + self.semconv_attributes.get(name).map(Rc::clone) + } + + /// Find a metric in the registry + #[must_use] + pub fn find_metric(&self, name: &str) -> Option> { + self.semconv_metrics.get(name).map(Rc::clone) } /// Find a template in the registry #[must_use] - pub fn find_template(&self, attribute_name: &str) -> Option<&Attribute> { + pub fn find_template(&self, attribute_name: &str) -> Option> { // Use the pre-sorted list to find the first (longest) matching template for (template_name, attribute) in &self.templates_by_length { if attribute_name.starts_with(template_name) { - return Some(attribute); + return Some(Rc::clone(attribute)); } } None @@ -92,6 +110,9 @@ mod tests { use crate::{ advice::{DeprecatedAdvisor, EnumAdvisor, RegoAdvisor, StabilityAdvisor, TypeAdvisor}, sample_attribute::SampleAttribute, + sample_metric::{ + DataPoints, SampleExponentialHistogramDataPoint, SampleInstrument, SampleMetric, + }, LiveCheckRunner, LiveCheckStatistics, Sample, }; @@ -105,7 +126,7 @@ mod tests { AttributeType, EnumEntriesSpec, Examples, PrimitiveOrArrayTypeSpec, RequirementLevel, TemplateTypeSpec, ValueSpec, }, - group::{GroupType, SpanKindSpec}, + group::{GroupType, InstrumentSpec, SpanKindSpec}, stability::Stability, }; @@ -417,6 +438,137 @@ mod tests { } } + fn make_metrics_registry() -> ResolvedRegistry { + ResolvedRegistry { + registry_url: "TEST_METRICS".to_owned(), + groups: vec![ + // Attribute group for system memory + ResolvedGroup { + id: "registry.system.memory".to_owned(), + r#type: GroupType::AttributeGroup, + brief: "Describes System Memory attributes".to_owned(), + note: "".to_owned(), + prefix: "".to_owned(), + entity_associations: vec![], + extends: None, + stability: None, + deprecated: None, + attributes: vec![ + Attribute { + name: "system.memory.state".to_owned(), + r#type: AttributeType::Enum { + allow_custom_values: None, + members: vec![ + EnumEntriesSpec { + id: "used".to_owned(), + value: ValueSpec::String("used".to_owned()), + brief: None, + note: None, + stability: Some(Stability::Development), + deprecated: None, + }, + EnumEntriesSpec { + id: "free".to_owned(), + value: ValueSpec::String("free".to_owned()), + brief: None, + note: None, + stability: Some(Stability::Development), + deprecated: None, + }, + ], + }, + examples: Some(Examples::Strings(vec![ + "free".to_owned(), + "cached".to_owned(), + ])), + brief: "The memory state".to_owned(), + tag: None, + requirement_level: RequirementLevel::Recommended { text: "".to_owned() }, + sampling_relevant: None, + note: "".to_owned(), + stability: Some(Stability::Development), + deprecated: None, + prefix: false, + tags: None, + value: None, + annotations: None, + }, + ], + span_kind: None, + events: vec![], + metric_name: None, + instrument: None, + unit: None, + name: None, + lineage: None, + display_name: Some("System Memory Attributes".to_owned()), + body: None, + }, + // System uptime metric + ResolvedGroup { + id: "metric.system.uptime".to_owned(), + r#type: GroupType::Metric, + brief: "The time the system has been running".to_owned(), + note: "Instrumentations SHOULD use a gauge with type `double` and measure uptime in seconds as a floating point number with the highest precision available.\nThe actual accuracy would depend on the instrumentation and operating system.".to_owned(), + prefix: "".to_owned(), + entity_associations: vec![], + extends: None, + stability: Some(Stability::Development), + deprecated: None, + attributes: vec![], + span_kind: None, + events: vec![], + metric_name: Some("system.uptime".to_owned()), + instrument: Some(InstrumentSpec::Gauge), + unit: Some("s".to_owned()), + name: None, + lineage: None, + display_name: None, + body: None, + }, + // System memory usage metric + ResolvedGroup { + id: "metric.system.memory.usage".to_owned(), + r#type: GroupType::Metric, + brief: "Reports memory in use by state.".to_owned(), + note: "The sum over all `system.memory.state` values SHOULD equal the total memory\navailable on the system, that is `system.memory.limit`.".to_owned(), + prefix: "".to_owned(), + entity_associations: vec![], + extends: None, + stability: Some(Stability::Development), + deprecated: None, + attributes: vec![ + Attribute { + name: "system.memory.state".to_owned(), + r#type: AttributeType::PrimitiveOrArray(PrimitiveOrArrayTypeSpec::String), + examples: None, + brief: "The memory state".to_owned(), + tag: None, + requirement_level: RequirementLevel::Recommended { text: "".to_owned() }, + sampling_relevant: None, + note: "".to_owned(), + stability: Some(Stability::Development), + deprecated: None, + prefix: false, + tags: None, + value: None, + annotations: None, + }, + ], + span_kind: None, + events: vec![], + metric_name: Some("system.memory.usage".to_owned()), + instrument: Some(InstrumentSpec::UpDownCounter), + unit: Some("By".to_owned()), + name: None, + lineage: None, + display_name: None, + body: None, + }, + ], + } + } + #[test] fn test_custom_rego() { let registry = ResolvedRegistry { @@ -585,6 +737,97 @@ mod tests { ); } + #[test] + fn test_json_metric() { + let registry = make_metrics_registry(); + + // Load samples from JSON file + let path = "data/metrics.json"; + let mut samples: Vec = + serde_json::from_reader(File::open(path).expect("Unable to open file")) + .expect("Unable to parse JSON"); + + let advisors: Vec> = vec![ + Box::new(DeprecatedAdvisor), + Box::new(StabilityAdvisor), + Box::new(TypeAdvisor), + Box::new(EnumAdvisor), + ]; + + let mut live_checker = LiveChecker::new(registry, advisors); + let rego_advisor = + RegoAdvisor::new(&live_checker, &None, &None).expect("Failed to create Rego advisor"); + live_checker.add_advisor(Box::new(rego_advisor)); + + let mut stats = LiveCheckStatistics::new(&live_checker.registry); + for sample in &mut samples { + let result = sample.run_live_check(&mut live_checker, &mut stats); + assert!(result.is_ok()); + } + stats.finalize(); + /* Assert on these: + total_entities_by_type: { + "data_point": 6, + "metric": 4, + "attribute": 3, + }, + no_advice_count: 4, + advice_type_counts: { + "attribute_required": 2, + "missing_attribute": 2, + "stability": 2, + "missing_metric": 3, + "missing_namespace": 2, + }, */ + // Check the statistics + assert_eq!(stats.total_entities_by_type.get("data_point"), Some(&6)); + assert_eq!(stats.total_entities_by_type.get("metric"), Some(&4)); + assert_eq!(stats.total_entities_by_type.get("attribute"), Some(&3)); + assert_eq!(stats.no_advice_count, 4); + assert_eq!(stats.advice_type_counts.get("attribute_required"), Some(&2)); + assert_eq!(stats.advice_type_counts.get("missing_attribute"), Some(&2)); + assert_eq!(stats.advice_type_counts.get("stability"), Some(&2)); + assert_eq!(stats.advice_type_counts.get("missing_metric"), Some(&3)); + assert_eq!(stats.advice_type_counts.get("missing_namespace"), Some(&2)); + assert_eq!( + stats.seen_registry_metrics.get("system.memory.usage"), + Some(&1) + ); + assert_eq!(stats.seen_non_registry_metrics.len(), 3); + } + + #[test] + fn test_json_metric_custom_rego() { + let registry = make_metrics_registry(); + + // Load samples from JSON file + let path = "data/metrics.json"; + let mut samples: Vec = + serde_json::from_reader(File::open(path).expect("Unable to open file")) + .expect("Unable to parse JSON"); + + let mut live_checker = LiveChecker::new(registry, vec![]); + let rego_advisor = RegoAdvisor::new( + &live_checker, + &Some("data/policies/live_check_advice/".into()), + &Some("data/jq/test.jq".into()), + ) + .expect("Failed to create Rego advisor"); + live_checker.add_advisor(Box::new(rego_advisor)); + + let mut stats = LiveCheckStatistics::new(&live_checker.registry); + for sample in &mut samples { + let result = sample.run_live_check(&mut live_checker, &mut stats); + println!("{:?}", result); + assert!(result.is_ok()); + } + stats.finalize(); + assert_eq!( + stats.advice_type_counts.get("invalid_data_point_value"), + Some(&1) + ); + } + #[test] fn test_bad_custom_rego() { let registry = ResolvedRegistry { @@ -659,4 +902,74 @@ mod tests { .contains("use of undefined variable")); } } + + #[test] + fn test_exponential_histogram() { + let registry = make_metrics_registry(); + + // Generate a sample with exponential histogram data points + let sample = Sample::Metric(SampleMetric { + name: "system.memory.usage".to_owned(), + instrument: SampleInstrument::Histogram, + unit: "By".to_owned(), + data_points: Some(DataPoints::ExponentialHistogram(vec![ + SampleExponentialHistogramDataPoint { + attributes: vec![], + count: 0, + sum: None, + min: None, + max: None, + live_check_result: None, + }, + ])), + live_check_result: None, + }); + let mut samples = vec![sample]; + let advisors: Vec> = vec![Box::new(TypeAdvisor)]; + let mut live_checker = LiveChecker::new(registry, advisors); + + let mut stats = LiveCheckStatistics::new(&live_checker.registry); + for sample in &mut samples { + let result = sample.run_live_check(&mut live_checker, &mut stats); + assert!(result.is_ok()); + } + stats.finalize(); + assert_eq!( + stats.advice_type_counts.get("instrument_mismatch"), + Some(&1) + ); + } + + #[test] + fn test_summary_unspecified() { + let registry = make_metrics_registry(); + + let mut samples = vec![ + Sample::Metric(SampleMetric { + name: "system.memory.usage".to_owned(), + instrument: SampleInstrument::Summary, + unit: "By".to_owned(), + data_points: None, + live_check_result: None, + }), + Sample::Metric(SampleMetric { + name: "system.memory.usage".to_owned(), + instrument: SampleInstrument::Unspecified, + unit: "By".to_owned(), + data_points: None, + live_check_result: None, + }), + ]; + let advisors: Vec> = vec![Box::new(TypeAdvisor)]; + let mut live_checker = LiveChecker::new(registry, advisors); + + let mut stats = LiveCheckStatistics::new(&live_checker.registry); + for sample in &mut samples { + let result = sample.run_live_check(&mut live_checker, &mut stats); + assert!(result.is_ok()); + } + stats.finalize(); + assert_eq!(stats.advice_type_counts.get("legacy_instrument"), Some(&1)); + assert_eq!(stats.advice_type_counts.get("instrument_missing"), Some(&1)); + } } diff --git a/crates/weaver_live_check/src/sample_attribute.rs b/crates/weaver_live_check/src/sample_attribute.rs index e9dcfad6..e0b975da 100644 --- a/crates/weaver_live_check/src/sample_attribute.rs +++ b/crates/weaver_live_check/src/sample_attribute.rs @@ -2,6 +2,7 @@ //! Intermediary format for telemetry sample attributes +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use weaver_checker::violation::{Advice, AdviceLevel}; @@ -13,7 +14,7 @@ use crate::{ }; /// Represents a sample telemetry attribute parsed from any source -#[derive(Debug, Clone, PartialEq, Serialize)] +#[derive(Debug, Clone, PartialEq, Serialize, JsonSchema)] pub struct SampleAttribute { /// The name of the attribute pub name: String, @@ -171,9 +172,9 @@ impl LiveCheckRunner for SampleAttribute { // find the attribute in the registry let semconv_attribute = { if let Some(attribute) = live_checker.find_attribute(&self.name) { - Some(attribute.clone()) + Some(attribute) } else { - live_checker.find_template(&self.name).cloned() + live_checker.find_template(&self.name) } }; diff --git a/crates/weaver_live_check/src/sample_metric.rs b/crates/weaver_live_check/src/sample_metric.rs new file mode 100644 index 00000000..65e9d998 --- /dev/null +++ b/crates/weaver_live_check/src/sample_metric.rs @@ -0,0 +1,218 @@ +// SPDX-License-Identifier: Apache-2.0 + +//! Intermediary format for telemetry sample spans + +use std::fmt::{Display, Formatter}; + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use weaver_checker::violation::{Advice, AdviceLevel}; +use weaver_semconv::group::InstrumentSpec; + +use crate::{ + live_checker::LiveChecker, sample_attribute::SampleAttribute, Error, LiveCheckResult, + LiveCheckRunner, LiveCheckStatistics, SampleRef, MISSING_METRIC_ADVICE_TYPE, +}; + +/// Represents the instrument type of a metric +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "lowercase")] +pub enum SampleInstrument { + /// An up-down counter metric. + UpDownCounter, + /// A counter metric. + Counter, + /// A gauge metric. + Gauge, + /// A histogram metric. + Histogram, + /// A summary metric. This is no longer used and will cause a violation. + Summary, + /// Unspecified instrument type. + Unspecified, +} + +impl SampleInstrument { + /// Converts the instrument type to a semconv instrument type. + #[must_use] + pub fn as_semconv(&self) -> Option { + match self { + SampleInstrument::UpDownCounter => Some(InstrumentSpec::UpDownCounter), + SampleInstrument::Counter => Some(InstrumentSpec::Counter), + SampleInstrument::Gauge => Some(InstrumentSpec::Gauge), + SampleInstrument::Histogram => Some(InstrumentSpec::Histogram), + _ => None, + } + } +} + +/// Implements a human readable display for the instrument. +impl Display for SampleInstrument { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + SampleInstrument::UpDownCounter => write!(f, "updowncounter"), + SampleInstrument::Counter => write!(f, "counter"), + SampleInstrument::Gauge => write!(f, "gauge"), + SampleInstrument::Histogram => write!(f, "histogram"), + SampleInstrument::Summary => write!(f, "summary"), + SampleInstrument::Unspecified => write!(f, "unspecified"), + } + } +} + +/// The data point types of a metric +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] +#[serde(untagged)] +pub enum DataPoints { + /// Number data points + Number(Vec), + /// Histogram data points + Histogram(Vec), + /// Exponential histogram data points + ExponentialHistogram(Vec), +} + +/// Represents a single data point of a metric +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] +pub struct SampleNumberDataPoint { + /// The value of the data point + pub value: Value, + /// The attributes of the data point + pub attributes: Vec, + /// Live check result + pub live_check_result: Option, +} + +/// Represents a single histogram data point of a metric +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] +pub struct SampleHistogramDataPoint { + /// The attributes of the data point + pub attributes: Vec, + /// The count of the data point + pub count: u64, + /// The sum of the data point + pub sum: Option, + /// The bucket counts of the data point + pub bucket_counts: Vec, + /// The minimum of the data point + pub min: Option, + /// The maximum of the data point + pub max: Option, + /// Live check result + pub live_check_result: Option, +} + +/// Represents a single exponential histogram data point of a metric +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] +pub struct SampleExponentialHistogramDataPoint { + /// The attributes of the data point + pub attributes: Vec, + /// The count of the data point + pub count: u64, + /// The sum of the data point + pub sum: Option, + /// The minimum of the data point + pub min: Option, + /// The maximum of the data point + pub max: Option, + /// Live check result + pub live_check_result: Option, +} + +/// Represents a single summary data point of a metric +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] +pub struct SampleSummaryDataPoint { + /// The attributes of the data point + pub attributes: Vec, +} + +/// Represents a sample telemetry span parsed from any source +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] +pub struct SampleMetric { + /// Metric name. + pub name: String, + /// Type of the metric (e.g. gauge, histogram, ...). + pub instrument: SampleInstrument, + /// Unit of the metric. + pub unit: String, + /// Data points of the metric. + pub data_points: Option, + /// Live check result + pub live_check_result: Option, +} + +impl LiveCheckRunner for SampleMetric { + fn run_live_check( + &mut self, + live_checker: &mut LiveChecker, + stats: &mut LiveCheckStatistics, + ) -> Result<(), Error> { + let mut result = LiveCheckResult::new(); + // find the metric in the registry + let semconv_metric = live_checker.find_metric(&self.name); + if semconv_metric.is_none() { + result.add_advice(Advice { + advice_type: MISSING_METRIC_ADVICE_TYPE.to_owned(), + value: Value::String(self.name.clone()), + message: "Does not exist in the registry".to_owned(), + advice_level: AdviceLevel::Violation, + }); + }; + for advisor in live_checker.advisors.iter_mut() { + let advice_list = + advisor.advise(SampleRef::Metric(self), None, semconv_metric.as_ref())?; + result.add_advice_list(advice_list); + } + // Get advice for the data points + match &mut self.data_points { + Some(DataPoints::Number(number_data_points)) => { + for point in number_data_points.iter_mut() { + let mut point_result = LiveCheckResult::new(); + for advisor in live_checker.advisors.iter_mut() { + let advice_list = advisor.advise( + SampleRef::NumberDataPoint(point), + None, + semconv_metric.as_ref(), + )?; + point_result.add_advice_list(advice_list); + } + point.live_check_result = Some(point_result); + stats.inc_entity_count("data_point"); + stats.maybe_add_live_check_result(point.live_check_result.as_ref()); + + for attribute in &mut point.attributes { + attribute.run_live_check(live_checker, stats)?; + } + } + } + Some(DataPoints::Histogram(histogram_data_points)) => { + for point in histogram_data_points.iter_mut() { + let mut point_result = LiveCheckResult::new(); + for advisor in live_checker.advisors.iter_mut() { + let advice_list = advisor.advise( + SampleRef::HistogramDataPoint(point), + None, + semconv_metric.as_ref(), + )?; + point_result.add_advice_list(advice_list); + } + point.live_check_result = Some(point_result); + stats.inc_entity_count("data_point"); + stats.maybe_add_live_check_result(point.live_check_result.as_ref()); + + for attribute in &mut point.attributes { + attribute.run_live_check(live_checker, stats)?; + } + } + } + _ => (), + } + + self.live_check_result = Some(result); + stats.inc_entity_count("metric"); + stats.maybe_add_live_check_result(self.live_check_result.as_ref()); + stats.add_metric_name_to_coverage(self.name.clone()); + Ok(()) + } +} diff --git a/crates/weaver_live_check/src/sample_resource.rs b/crates/weaver_live_check/src/sample_resource.rs index 4681cacd..bb5eb2c2 100644 --- a/crates/weaver_live_check/src/sample_resource.rs +++ b/crates/weaver_live_check/src/sample_resource.rs @@ -2,6 +2,7 @@ //! Intermediary format for telemetry sample resources +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use crate::{ @@ -10,7 +11,7 @@ use crate::{ }; /// Represents a resource -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct SampleResource { /// The attributes of the resource #[serde(default)] diff --git a/crates/weaver_live_check/src/sample_span.rs b/crates/weaver_live_check/src/sample_span.rs index b9619e9c..95d09d0d 100644 --- a/crates/weaver_live_check/src/sample_span.rs +++ b/crates/weaver_live_check/src/sample_span.rs @@ -2,6 +2,7 @@ //! Intermediary format for telemetry sample spans +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use weaver_semconv::group::SpanKindSpec; @@ -11,7 +12,7 @@ use crate::{ }; /// The status code of the span -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "snake_case")] pub enum StatusCode { /// The status is unset @@ -23,7 +24,7 @@ pub enum StatusCode { } /// The status code and message of the span -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct Status { /// The status code pub code: StatusCode, @@ -32,7 +33,7 @@ pub struct Status { } /// Represents a sample telemetry span parsed from any source -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct SampleSpan { /// The name of the span pub name: String, @@ -81,7 +82,7 @@ impl LiveCheckRunner for SampleSpan { } /// Represents a span event -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct SampleSpanEvent { /// The name of the event pub name: String, @@ -114,7 +115,7 @@ impl LiveCheckRunner for SampleSpanEvent { } /// Represents a span link -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] pub struct SampleSpanLink { /// The attributes of the link #[serde(default)] diff --git a/defaults/live_check_templates/ansi/live_check_macros.j2 b/defaults/live_check_templates/ansi/live_check_macros.j2 index 821849de..03fb196e 100644 --- a/defaults/live_check_templates/ansi/live_check_macros.j2 +++ b/defaults/live_check_templates/ansi/live_check_macros.j2 @@ -27,7 +27,7 @@ {% endif %} {{ ("Registry coverage") | ansi_blue | ansi_bold }} - - attributes seen: {{ (statistics.registry_coverage * 100) | round(2) }}% + - entities seen: {{ (statistics.registry_coverage * 100) | round(2) }}% {% endmacro %} {% macro display_advice(all_advice, indent=0) %} @@ -72,6 +72,30 @@ {%- endfor %} {% endmacro %} +{% macro summarize_data_point_value(data_point) %} +{%- if data_point.value is defined -%} +{{ data_point.value }} +{%- else -%} +count={{ data_point.count }}, sum={{ data_point.sum }}, min={{ data_point.min }}, max={{ data_point.max }} +{%- endif -%} +{% endmacro %} + +{% macro display_data_point(data_point, indent=0) %} +{{ " " * indent }}{{ ("Data point") | ansi_bright_cyan | ansi_bold }} {{ summarize_data_point_value(data_point) }} +{{ display_advice(data_point.live_check_result.all_advice, indent) }} +{%- for attribute in data_point.attributes %} +{{ display_attribute(attribute, indent + 4) }} +{%- endfor %} +{% endmacro %} + +{% macro display_metric(metric, indent=0) %} +{{ " " * indent }}{{ ("Metric") | ansi_bright_cyan | ansi_bold }} {{ display_sample_header(metric.live_check_result.highest_advice_level, metric.name) }} `{{ metric.instrument }}`, `{{ metric.unit }}` +{{ display_advice(metric.live_check_result.all_advice, indent) }} +{%- for data_point in metric.data_points %} +{{ display_data_point(data_point, indent + 4) }} +{%- endfor %} +{%- endmacro %} + {%- macro display_sample(sample, indent=0) -%} {%- if sample.span is defined -%} {{ " " * indent }}{{ ("Span") | ansi_bright_cyan | ansi_bold }} {{ display_sample_header(sample.span.live_check_result.highest_advice_level, sample.span.name) }} `{{ sample.span.kind }}` @@ -93,6 +117,8 @@ {{ display_span_link(sample.span_link, indent) }} {%- elif sample.resource is defined -%} {{ display_resource(sample.resource, indent) }} +{%- elif sample.metric is defined -%} +{{ display_metric(sample.metric, indent) }} {%- endif -%} {%- endmacro -%} diff --git a/defaults/policies/live_check_advice/otel.rego b/defaults/policies/live_check_advice/otel.rego index 84f53f1e..6333afd4 100644 --- a/defaults/policies/live_check_advice/otel.rego +++ b/defaults/policies/live_check_advice/otel.rego @@ -22,33 +22,43 @@ concat(".", array.slice(parts, 0, i)) | # checks attribute has a namespace deny contains make_advice(advice_type, advice_level, value, message) if { - input.attribute - not contains(input.attribute.name, ".") + input.sample.attribute + not contains(input.sample.attribute.name, ".") advice_type := "missing_namespace" advice_level := "improvement" - value := input.attribute.name + value := input.sample.attribute.name message := "Does not have a namespace" } # checks attribute name format deny contains make_advice(advice_type, advice_level, value, message) if { - input.attribute - not regex.match(name_regex, input.attribute.name) + input.sample.attribute + not regex.match(name_regex, input.sample.attribute.name) advice_type := "invalid_format" advice_level := "violation" - value := input.attribute.name + value := input.sample.attribute.name + message := "Does not match name formatting rules" +} + +# checks metric name format +deny contains make_advice(advice_type, advice_level, value, message) if { + input.sample.metric + not regex.match(name_regex, input.sample.metric.name) + advice_type := "invalid_format" + advice_level := "violation" + value := input.sample.metric.name message := "Does not match name formatting rules" } # checks attribute namespace doesn't collide with existing attributes deny contains make_advice(advice_type, advice_level, value, message) if { - input.attribute + input.sample.attribute # Skip if no namespace - contains(input.attribute.name, ".") + contains(input.sample.attribute.name, ".") # Get input namespaces - namespaces := derive_namespaces(input.attribute.name) + namespaces := derive_namespaces(input.sample.attribute.name) # Find collision some value in namespaces @@ -61,15 +71,15 @@ deny contains make_advice(advice_type, advice_level, value, message) if { # provides advice if the attribute extends an existing namespace deny contains make_advice(advice_type, advice_level, value, message) if { - input.attribute + input.sample.attribute # Skip checks first (fail fast) - contains(input.attribute.name, ".") # Must have at least one namespace - not is_template_type(input.attribute.name) - not is_registry_attribute(input.attribute.name) + contains(input.sample.attribute.name, ".") # Must have at least one namespace + not is_template_type(input.sample.attribute.name) + not is_registry_attribute(input.sample.attribute.name) # Get input namespaces - namespaces := derive_namespaces(input.attribute.name) + namespaces := derive_namespaces(input.sample.attribute.name) # Find matches - check keys in set matches := [ns | some ns in namespaces; namespaces_to_check_set[ns] != null] diff --git a/src/registry/otlp/conversion.rs b/src/registry/otlp/conversion.rs new file mode 100644 index 00000000..50861c98 --- /dev/null +++ b/src/registry/otlp/conversion.rs @@ -0,0 +1,213 @@ +// SPDX-License-Identifier: Apache-2.0 + +//! Conversion routines for OTLP to Sample + +use serde_json::{json, Value}; +use weaver_live_check::{ + sample_attribute::SampleAttribute, + sample_metric::{DataPoints, SampleInstrument, SampleMetric}, + sample_span::{Status, StatusCode}, +}; +use weaver_semconv::group::SpanKindSpec; + +use super::grpc_stubs::proto::{ + common::v1::{AnyValue, KeyValue}, + metrics::v1::{metric::Data, HistogramDataPoint, Metric, NumberDataPoint}, +}; + +fn maybe_to_json(value: Option) -> Option { + if let Some(value) = value { + if let Some(value) = value.value { + use crate::registry::otlp::grpc_stubs::proto::common::v1::any_value::Value as GrpcValue; + match value { + GrpcValue::StringValue(string) => Some(Value::String(string)), + GrpcValue::IntValue(int_value) => Some(Value::Number(int_value.into())), + GrpcValue::DoubleValue(double_value) => Some(json!(double_value)), + GrpcValue::BoolValue(bool_value) => Some(Value::Bool(bool_value)), + GrpcValue::ArrayValue(array_value) => { + let mut vec = Vec::new(); + for value in array_value.values { + if let Some(value) = maybe_to_json(Some(value)) { + vec.push(value); + } + } + Some(Value::Array(vec)) + } + _ => None, + } + } else { + None + } + } else { + None + } +} + +/// Converts an OTLP KeyValue to a SampleAttribute +pub fn sample_attribute_from_key_value(key_value: &KeyValue) -> SampleAttribute { + let value = maybe_to_json(key_value.value.clone()); + let r#type = match value { + Some(ref val) => SampleAttribute::infer_type(val), + None => None, + }; + SampleAttribute { + name: key_value.key.clone(), + value, + r#type, + live_check_result: None, + } +} + +/// Converts an OTLP span kind to a SpanKindSpec +pub fn span_kind_from_otlp_kind(kind: i32) -> SpanKindSpec { + match kind { + 2 => SpanKindSpec::Server, + 3 => SpanKindSpec::Client, + 4 => SpanKindSpec::Producer, + 5 => SpanKindSpec::Consumer, + _ => SpanKindSpec::Internal, + } +} + +/// Converts an OTLP status to a Status +pub fn status_from_otlp_status( + status: Option, +) -> Option { + if let Some(status) = status { + let code = match status.code { + 1 => StatusCode::Ok, + 2 => StatusCode::Error, + _ => StatusCode::Unset, + }; + return Some(Status { + code, + message: status.message, + }); + } + None +} + +/// Converts an OTLP metric to a SampleMetric +pub fn otlp_metric_to_sample(otlp_metric: Metric) -> SampleMetric { + SampleMetric { + name: otlp_metric.name, + instrument: otlp_data_to_instrument(&otlp_metric.data), + unit: otlp_metric.unit, + data_points: otlp_data_to_data_points(&otlp_metric.data), + live_check_result: None, + } +} + +/// Converts OTLP data to a SampleMetric instrument +/// Mapping: +/// counter → Sum with is_monotonic: true +/// updowncounter → Sum with is_monotonic: false +/// gauge → Gauge +/// histogram → Histogram +/// histogram → ExponentialHistogram +/// summary → Summary (this will cause a legacy_instrument violation) +fn otlp_data_to_instrument(data: &Option) -> SampleInstrument { + match data { + Some(Data::Sum(sum)) => { + if sum.is_monotonic { + SampleInstrument::Counter + } else { + SampleInstrument::UpDownCounter + } + } + Some(Data::Gauge(_)) => SampleInstrument::Gauge, + Some(Data::Histogram(_)) => SampleInstrument::Histogram, + Some(Data::ExponentialHistogram(_)) => SampleInstrument::Histogram, + Some(Data::Summary(_)) => SampleInstrument::Summary, + None => SampleInstrument::Unspecified, + } +} + +/// Converts OTLP data to SampleMetric data points +fn otlp_data_to_data_points(data: &Option) -> Option { + match data { + Some(Data::Sum(sum)) => Some(otlp_number_data_points(&sum.data_points)), + Some(Data::Gauge(gauge)) => Some(otlp_number_data_points(&gauge.data_points)), + Some(Data::Histogram(histogram)) => { + Some(otlp_histogram_data_points(&histogram.data_points)) + } + Some(Data::ExponentialHistogram(exponential_histogram)) => Some( + otlp_exponential_histogram_data_points(&exponential_histogram.data_points), + ), + _ => None, + } +} + +/// Converts OTLP ExponentialHistogram data points to DataPoints::ExponentialHistogram +fn otlp_exponential_histogram_data_points( + otlp: &Vec, +) -> DataPoints { + let mut data_points = Vec::new(); + for point in otlp { + let live_check_point = + weaver_live_check::sample_metric::SampleExponentialHistogramDataPoint { + attributes: point + .attributes + .iter() + .map(sample_attribute_from_key_value) + .collect(), + count: point.count, + sum: point.sum, + min: point.min, + max: point.max, + live_check_result: None, + }; + data_points.push(live_check_point); + } + DataPoints::ExponentialHistogram(data_points) +} + +/// Converts OTLP Histogram data points to DataPoints::Histogram +fn otlp_histogram_data_points(otlp: &Vec) -> DataPoints { + let mut data_points = Vec::new(); + for point in otlp { + let live_check_point = weaver_live_check::sample_metric::SampleHistogramDataPoint { + attributes: point + .attributes + .iter() + .map(sample_attribute_from_key_value) + .collect(), + count: point.count, + sum: point.sum, + bucket_counts: point.bucket_counts.clone(), + min: point.min, + max: point.max, + live_check_result: None, + }; + data_points.push(live_check_point); + } + DataPoints::Histogram(data_points) +} + +/// Converts OTLP Number data points to DataPoints::Number +fn otlp_number_data_points(otlp: &Vec) -> DataPoints { + let mut data_points = Vec::new(); + for point in otlp { + let live_check_point = weaver_live_check::sample_metric::SampleNumberDataPoint { + value: match point.value { + Some(value) => match value { + super::grpc_stubs::proto::metrics::v1::number_data_point::Value::AsDouble( + double, + ) => json!(double), + super::grpc_stubs::proto::metrics::v1::number_data_point::Value::AsInt(int) => { + Value::Number(int.into()) + } + }, + None => Value::Null, + }, + attributes: point + .attributes + .iter() + .map(sample_attribute_from_key_value) + .collect(), + live_check_result: None, + }; + data_points.push(live_check_point); + } + DataPoints::Number(data_points) +} diff --git a/src/registry/otlp/mod.rs b/src/registry/otlp/mod.rs index 6a82b3fa..31769e7f 100644 --- a/src/registry/otlp/mod.rs +++ b/src/registry/otlp/mod.rs @@ -2,6 +2,7 @@ //! A basic OTLP receiver integrated into Weaver. +pub mod conversion; pub mod otlp_ingester; use grpc_stubs::proto::collector::logs::v1::logs_service_server::{LogsService, LogsServiceServer}; diff --git a/src/registry/otlp/otlp_ingester.rs b/src/registry/otlp/otlp_ingester.rs index a2ffe83f..4d622faf 100644 --- a/src/registry/otlp/otlp_ingester.rs +++ b/src/registry/otlp/otlp_ingester.rs @@ -1,18 +1,21 @@ +// SPDX-License-Identifier: Apache-2.0 + +//! OTLP ingester use std::time::Duration; use log::info; -use serde_json::{json, Value}; use weaver_common::log_info; use weaver_live_check::{ - sample_attribute::SampleAttribute, sample_resource::SampleResource, - sample_span::{SampleSpan, SampleSpanEvent, SampleSpanLink, Status, StatusCode}, + sample_span::{SampleSpan, SampleSpanEvent, SampleSpanLink}, Error, Ingester, Sample, }; -use weaver_semconv::group::SpanKindSpec; use super::{ - grpc_stubs::proto::common::v1::{AnyValue, KeyValue}, + conversion::{ + otlp_metric_to_sample, sample_attribute_from_key_value, span_kind_from_otlp_kind, + status_from_otlp_status, + }, listen_otlp_requests, OtlpRequest, }; @@ -42,86 +45,44 @@ impl OtlpIterator { } } - fn maybe_to_json(value: Option) -> Option { - if let Some(value) = value { - if let Some(value) = value.value { - use crate::registry::otlp::grpc_stubs::proto::common::v1::any_value::Value as GrpcValue; - match value { - GrpcValue::StringValue(string) => Some(Value::String(string)), - GrpcValue::IntValue(int_value) => Some(Value::Number(int_value.into())), - GrpcValue::DoubleValue(double_value) => Some(json!(double_value)), - GrpcValue::BoolValue(bool_value) => Some(Value::Bool(bool_value)), - GrpcValue::ArrayValue(array_value) => { - let mut vec = Vec::new(); - for value in array_value.values { - if let Some(value) = Self::maybe_to_json(Some(value)) { - vec.push(value); - } - } - Some(Value::Array(vec)) - } - _ => None, - } - } else { - None - } - } else { - None - } - } - - // TODO Ideally this would be a TryFrom in the SampleAttribute but requires - // the grpc_stubs to be in another crate - fn sample_attribute_from_key_value(key_value: &KeyValue) -> SampleAttribute { - let value = Self::maybe_to_json(key_value.value.clone()); - let r#type = match value { - Some(ref val) => SampleAttribute::infer_type(val), - None => None, - }; - SampleAttribute { - name: key_value.key.clone(), - value, - r#type, - live_check_result: None, - } - } - - fn span_kind_from_otlp_kind(kind: i32) -> SpanKindSpec { - match kind { - 2 => SpanKindSpec::Server, - 3 => SpanKindSpec::Client, - 4 => SpanKindSpec::Producer, - 5 => SpanKindSpec::Consumer, - _ => SpanKindSpec::Internal, - } - } - - fn status_from_otlp_status( - status: Option, - ) -> Option { - if let Some(status) = status { - let code = match status.code { - 1 => StatusCode::Ok, - 2 => StatusCode::Error, - _ => StatusCode::Unset, - }; - return Some(Status { - code, - message: status.message, - }); - } - None - } - fn fill_buffer_from_request(&mut self, request: OtlpRequest) -> Option { match request { OtlpRequest::Logs(_logs) => { // TODO Implement the checking logic for logs Some(0) } - OtlpRequest::Metrics(_metrics) => { - // TODO Implement the checking logic for metrics - Some(0) + OtlpRequest::Metrics(metrics) => { + for resource_metric in metrics.resource_metrics { + if let Some(resource) = resource_metric.resource { + let mut sample_resource = SampleResource { + attributes: Vec::new(), + live_check_result: None, + }; + for attribute in resource.attributes { + sample_resource + .attributes + .push(sample_attribute_from_key_value(&attribute)); + } + self.buffer.push(Sample::Resource(sample_resource)); + } + + for scope_metric in resource_metric.scope_metrics { + if let Some(scope) = scope_metric.scope { + // TODO SampleInstrumentationScope? + for attribute in scope.attributes { + self.buffer.push(Sample::Attribute( + sample_attribute_from_key_value(&attribute), + )); + } + } + + for metric in scope_metric.metrics { + let sample_metric = Sample::Metric(otlp_metric_to_sample(metric)); + self.buffer.push(sample_metric); + } + } + } + Some(self.buffer.len()) } OtlpRequest::Traces(trace) => { for resource_span in trace.resource_spans { @@ -133,7 +94,7 @@ impl OtlpIterator { for attribute in resource.attributes { sample_resource .attributes - .push(Self::sample_attribute_from_key_value(&attribute)); + .push(sample_attribute_from_key_value(&attribute)); } self.buffer.push(Sample::Resource(sample_resource)); } @@ -143,7 +104,7 @@ impl OtlpIterator { // TODO SampleInstrumentationScope? for attribute in scope.attributes { self.buffer.push(Sample::Attribute( - Self::sample_attribute_from_key_value(&attribute), + sample_attribute_from_key_value(&attribute), )); } } @@ -151,8 +112,8 @@ impl OtlpIterator { for span in scope_span.spans { let mut sample_span = SampleSpan { name: span.name, - kind: Self::span_kind_from_otlp_kind(span.kind), - status: Self::status_from_otlp_status(span.status), + kind: span_kind_from_otlp_kind(span.kind), + status: status_from_otlp_status(span.status), attributes: Vec::new(), span_events: Vec::new(), span_links: Vec::new(), @@ -161,7 +122,7 @@ impl OtlpIterator { for attribute in span.attributes { sample_span .attributes - .push(Self::sample_attribute_from_key_value(&attribute)); + .push(sample_attribute_from_key_value(&attribute)); } for event in span.events { let mut sample_event = SampleSpanEvent { @@ -172,7 +133,7 @@ impl OtlpIterator { for attribute in event.attributes { sample_event .attributes - .push(Self::sample_attribute_from_key_value(&attribute)); + .push(sample_attribute_from_key_value(&attribute)); } sample_span.span_events.push(sample_event); } @@ -184,7 +145,7 @@ impl OtlpIterator { for attribute in link.attributes { sample_link .attributes - .push(Self::sample_attribute_from_key_value(&attribute)); + .push(sample_attribute_from_key_value(&attribute)); } sample_span.span_links.push(sample_link); }