diff --git a/rust/otap-dataflow/crates/validation/README.md b/rust/otap-dataflow/crates/validation/README.md index 25c944ca42..05f7ae03cc 100644 --- a/rust/otap-dataflow/crates/validation/README.md +++ b/rust/otap-dataflow/crates/validation/README.md @@ -142,6 +142,14 @@ e.g. `receiver`, `exporter`. - each `${KEY}` in the YAML is replaced with the corresponding value - returns an error if any `${...}` placeholders remain unresolved - useful for injecting TLS cert/key paths at test time (see [TLS / mTLS](#tls--mtls)) +- `with_transport_headers_policy(TransportHeadersPolicy)` - set a transport + headers policy on the SUV pipeline from a typed struct + - controls header capture at receivers and header propagation at exporters + - optional; see [Transport Headers](#transport-headers) for details +- `with_transport_headers_policy_yaml(yaml_str)` - set a transport headers + policy from a YAML string + - returns `Result` (YAML parsing can fail) + - alternative to the typed struct method - `connect_container(PipelineContainerConnection)` - declare a connection between a node in the SUV pipeline and a test container - the framework rewrites the specified config key with the container's allocated @@ -181,6 +189,12 @@ e.g. `receiver`, `exporter`. - `with_tls(TlsConfig)` - enable TLS on the generator's exporter - optional; see [TLS / mTLS](#tls--mtls) for details - uses the built-in TLS support +- `with_transport_headers(headers)` - configure transport headers to inject + into generated traffic + - each key is a header name; the value is an optional fixed string + - when the value is `None`, the fake data generator assigns a random value + at startup + - only meaningful when the pipeline uses OTLP receivers/exporters - `to_container(ContainerConnection)` - use a custom exporter that sends to a test container instead of directly to the SUV pipeline receiver - mutually exclusive with `otlp_grpc()` / `otap_grpc()` @@ -376,6 +390,11 @@ should receive control signals from waits without receiving any messages before declaring the data stream settled and performing the final validation - default: 3 seconds +- `with_capture_header_keys(keys)` - configure transport header keys to capture + from inbound signals + - each key becomes a `match_names` rule in the capture pipeline's + `header_capture` policy + - required when using transport header validation instructions - `from_container(ContainerConnection)` - use a custom receiver that reads from a test container instead of directly from the SUV pipeline exporter - mutually exclusive with `otlp_grpc()` / `otap_grpc()` @@ -417,8 +436,22 @@ count per message; `min/max` optional; `timeout` optional - `domains` accepts `AttributeDomain::Resource`, `Scope`, or `Signal` - `pairs` accepts `Vec` - `AttributeNoDuplicate`: check that there are no duplicate attributes - -(see `validation_types::attributes` and `validation_types`) +- `TransportHeaderRequireKey { keys }`: specified transport header keys must be + present on every SUV message. Fails if any message is missing transport + headers entirely. +- `TransportHeaderRequireKeyValue { pairs }`: specified transport header + key/value pairs must be present on every SUV message. Values are compared as + UTF-8 text (case-sensitive). When duplicate header keys exist, the check + passes if any entry with that key matches the expected value. + - `pairs` accepts `Vec` +- `TransportHeaderDeny { keys }`: specified transport header keys must NOT + appear on any SUV message. Messages without transport headers are acceptable + for this check. + +> See [Transport Headers](#transport-headers) for setup and a full example. + +(see `validation_types::attributes`, `validation_types::transport_headers`, +and `validation_types`) > NOTE: Some ValidationInstructions require control signals. Use `control_streams` on the Capture to declare which generator(s) should @@ -446,6 +479,97 @@ it should receive control signals from to receive from two generators - each generator label must match a label passed to `add_generator` +### Transport Headers + +Validate that transport headers survive the full pipeline chain from generator +through the system-under-validation to the capture pipeline. + +> **NOTE:** Only OTLP (`otlp_grpc`) receivers and exporters currently support +> transport header capture and propagation. OTAP receivers/exporters do **not** +> support transport headers. + +For these headers to flow through the pipeline chain, each +stage needs to be appropriately configured: + +1. **Generator pipeline** -- automatically adds a `header_propagation` policy + (propagate all headers) when `with_transport_headers` is configured. +2. **SUV pipeline** -- requires a `transport_headers` policy with both + `header_capture` (to extract headers from inbound gRPC metadata) and + `header_propagation` (to re-emit headers on the outbound gRPC connection). + Configure via `Pipeline::with_transport_headers_policy_yaml()` or + `Pipeline::with_transport_headers_policy()`. +3. **Capture pipeline** -- requires `with_capture_header_keys` to specify which + headers to extract from inbound gRPC metadata for validation. + +#### Transport header validation instructions + +| Instruction | Behavior | +| ----------- | -------- | +| `TransportHeaderRequireKey { keys }` | Assert specified header keys exist on every SUV message | +| `TransportHeaderRequireKeyValue { pairs }` | Assert specified key/value pairs exist on every SUV message | +| `TransportHeaderDeny { keys }` | Assert specified header keys do NOT exist on any SUV message | + +For `RequireKey` and `RequireKeyValue`, every SUV message must carry transport +headers (`Some`). A single message without headers causes immediate failure. + +For `Deny`, messages without transport headers are acceptable -- a signal that +never received headers cannot contain a forbidden key. + +#### Example: transport headers validation + +```rust +use otap_df_validation::ValidationInstructions; +use otap_df_validation::pipeline::Pipeline; +use otap_df_validation::scenario::Scenario; +use otap_df_validation::traffic::{Capture, Generator}; +use otap_df_validation::validation_types::transport_headers::TransportHeaderKeyValue; + +Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/no-processor.yaml") + .expect("load pipeline") + .with_transport_headers_policy_yaml(r#" +header_capture: + headers: + - match_names: ["x-tenant-id"] +header_propagation: + default: + selector: + type: all_captured + action: propagate +"#) + .expect("parse policy"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .static_signals() + .with_transport_headers([("x-tenant-id", Some("test-tenant"))]), + ) + .add_capture( + "validate", + Capture::default() + .otlp_grpc("exporter") + .with_capture_header_keys(["x-tenant-id"]) + .validate(vec![ + ValidationInstructions::TransportHeaderRequireKey { + keys: vec!["x-tenant-id".into()], + }, + ValidationInstructions::TransportHeaderRequireKeyValue { + pairs: vec![TransportHeaderKeyValue::new("x-tenant-id", "test-tenant")], + }, + ValidationInstructions::TransportHeaderDeny { + keys: vec!["x-should-not-exist".into()], + }, + ]) + .control_streams(["traffic_gen"]), + ) + .run() + .expect("transport headers validation failed"); +``` + ### Test Containers Run Docker containers alongside your validation scenario using the diff --git a/rust/otap-dataflow/crates/validation/src/lib.rs b/rust/otap-dataflow/crates/validation/src/lib.rs index 39be8fe264..7f1cd3c0e3 100644 --- a/rust/otap-dataflow/crates/validation/src/lib.rs +++ b/rust/otap-dataflow/crates/validation/src/lib.rs @@ -1049,6 +1049,69 @@ mod tests { .run() .expect("validation scenario failed"); } + + /// End-to-end validation: transport headers injected by the fake data + /// generator survive the full pipeline chain (generator → SUV → capture) + /// and can be asserted via transport header validation instructions. + /// + /// Only OTLP receivers and exporters support transport header + /// capture/propagation, so every hop in the chain uses OTLP gRPC. + #[test] + fn validation_transport_headers() { + use crate::validation_types::transport_headers::TransportHeaderKeyValue; + + let header_key = "x-tenant-id"; + let header_value = "test-tenant"; + + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/no-processor.yaml") + .expect("failed to read in pipeline yaml") + .with_transport_headers_policy_yaml( + r#" +header_capture: + headers: + - match_names: ["x-tenant-id"] +header_propagation: + default: + selector: + type: all_captured + action: propagate +"#, + ) + .expect("failed to parse transport headers policy"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals() + .with_transport_headers([(header_key, Some(header_value))]), + ) + .add_capture( + "validate", + Capture::default() + .otlp_grpc("exporter") + .with_capture_header_keys([header_key]) + .validate(vec![ + ValidationInstructions::TransportHeaderRequireKey { + keys: vec![header_key.into()], + }, + ValidationInstructions::TransportHeaderRequireKeyValue { + pairs: vec![TransportHeaderKeyValue::new(header_key, header_value)], + }, + ValidationInstructions::TransportHeaderDeny { + keys: vec!["x-should-not-exist".into()], + }, + ]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("transport headers validation failed"); + } } #[cfg(test)] diff --git a/rust/otap-dataflow/crates/validation/src/pipeline.rs b/rust/otap-dataflow/crates/validation/src/pipeline.rs index 1e8dd218b4..022c9f0e51 100644 --- a/rust/otap-dataflow/crates/validation/src/pipeline.rs +++ b/rust/otap-dataflow/crates/validation/src/pipeline.rs @@ -8,6 +8,7 @@ use crate::error::ValidationError; use crate::template::render_jinja; use minijinja::context; +use otap_df_config::transport_headers_policy::TransportHeadersPolicy; use serde_yaml::{Mapping, Value}; use std::fs; @@ -131,6 +132,9 @@ pub struct Pipeline { /// Container connections declared on this pipeline, consumed during /// config wiring. pub(crate) container_connections: Vec, + /// Optional transport headers policy to embed in the SUV pipeline's + /// `policies.transport_headers` section. + pub(crate) transport_headers_policy: Option, } impl Pipeline { @@ -169,6 +173,7 @@ impl Pipeline { core_start: 0, core_end: 0, container_connections: Vec::new(), + transport_headers_policy: None, }) } @@ -195,6 +200,41 @@ impl Pipeline { self } + /// Set a transport headers policy for the SUV pipeline from a typed + /// [`TransportHeadersPolicy`] struct. + #[must_use] + pub fn with_transport_headers_policy(mut self, policy: TransportHeadersPolicy) -> Self { + self.transport_headers_policy = Some(policy); + self + } + + /// Set a transport headers policy for the SUV pipeline from a YAML + /// string. + pub fn with_transport_headers_policy_yaml( + mut self, + policy_yaml: impl AsRef, + ) -> Result { + let policy: TransportHeadersPolicy = + serde_yaml::from_str(policy_yaml.as_ref()).map_err(|e| { + ValidationError::Config(format!("invalid transport headers policy yaml: {e}")) + })?; + self.transport_headers_policy = Some(policy); + Ok(self) + } + + /// Serialize the transport headers policy as a YAML string for template + /// injection. Returns an empty string when no policy is configured. + pub(crate) fn transport_headers_policy_yaml(&self) -> Result { + match &self.transport_headers_policy { + Some(policy) => serde_yaml::to_string(policy).map_err(|e| { + ValidationError::Config(format!( + "failed to serialize transport headers policy: {e}" + )) + }), + None => Ok(String::new()), + } + } + /// Rewrite a config value in the pipeline YAML at the given /// dot-separated key path under `nodes..config`. pub(crate) fn set_node_config_value( @@ -315,6 +355,7 @@ fn set_config_by_path( #[cfg(test)] mod tests { use super::*; + use otap_df_config::transport_headers_policy::*; fn sample_yaml() -> &'static str { r#" @@ -525,6 +566,130 @@ nodes: assert!(matches!(err, ValidationError::Config(_))); } + #[test] + fn with_transport_headers_policy_stores_struct() { + let policy = TransportHeadersPolicy { + header_capture: HeaderCapturePolicy::new( + CaptureDefaults::default(), + vec![CaptureRule { + match_names: vec!["x-tenant-id".into()], + store_as: None, + sensitive: false, + value_kind: None, + }], + ), + header_propagation: HeaderPropagationPolicy::new( + PropagationDefault { + selector: PropagationSelector { + selector_type: PropagationSelectorType::AllCaptured, + named: None, + }, + ..Default::default() + }, + vec![], + ), + }; + let pipeline = Pipeline::from_yaml(sample_yaml()) + .unwrap() + .with_transport_headers_policy(policy.clone()); + assert_eq!(pipeline.transport_headers_policy, Some(policy)); + } + + #[test] + fn with_transport_headers_policy_yaml_parses_and_stores() { + let policy_yaml = r#" +header_capture: + headers: + - match_names: ["x-tenant-id"] +header_propagation: + default: + selector: + type: all_captured + action: propagate +"#; + let pipeline = Pipeline::from_yaml(sample_yaml()) + .unwrap() + .with_transport_headers_policy_yaml(policy_yaml) + .expect("valid yaml should parse"); + let policy = pipeline.transport_headers_policy.as_ref().unwrap(); + + // Verify the parsed struct matches what we'd build programmatically. + let expected = TransportHeadersPolicy { + header_capture: HeaderCapturePolicy::new( + CaptureDefaults::default(), + vec![CaptureRule { + match_names: vec!["x-tenant-id".into()], + store_as: None, + sensitive: false, + value_kind: None, + }], + ), + header_propagation: HeaderPropagationPolicy::new( + PropagationDefault { + selector: PropagationSelector { + selector_type: PropagationSelectorType::AllCaptured, + named: None, + }, + ..Default::default() + }, + vec![], + ), + }; + assert_eq!(policy, &expected); + } + + #[test] + fn with_transport_headers_policy_yaml_rejects_invalid() { + let result = Pipeline::from_yaml(sample_yaml()) + .unwrap() + .with_transport_headers_policy_yaml("not: {valid: [policy"); + assert!(result.is_err()); + let err = result.err().unwrap(); + assert!(matches!(err, ValidationError::Config(_))); + assert!(err.to_string().contains("invalid transport headers policy")); + } + + #[test] + fn transport_headers_policy_none_by_default() { + let pipeline = Pipeline::from_yaml(sample_yaml()).unwrap(); + assert!(pipeline.transport_headers_policy.is_none()); + } + + #[test] + fn transport_headers_policy_yaml_empty_when_none() { + let pipeline = Pipeline::from_yaml(sample_yaml()).unwrap(); + let yaml = pipeline.transport_headers_policy_yaml().unwrap(); + assert!(yaml.is_empty()); + } + + #[test] + fn transport_headers_policy_yaml_roundtrips() { + let policy = TransportHeadersPolicy { + header_capture: HeaderCapturePolicy::new( + CaptureDefaults::default(), + vec![CaptureRule { + match_names: vec!["x-tenant-id".into()], + store_as: None, + sensitive: false, + value_kind: None, + }], + ), + ..Default::default() + }; + let pipeline = Pipeline::from_yaml(sample_yaml()) + .unwrap() + .with_transport_headers_policy(policy); + let yaml = pipeline.transport_headers_policy_yaml().unwrap(); + assert!(!yaml.is_empty()); + // the serialized YAML should parse back to the same policy. + let reserailized_policy: TransportHeadersPolicy = + serde_yaml::from_str(&yaml).expect("roundtrip parse"); + assert_eq!( + pipeline.transport_headers_policy.as_ref().unwrap(), + &reserailized_policy + ); + } + #[test] fn connect_container_stores_connection() { let pipeline = Pipeline::from_yaml(sample_yaml()) diff --git a/rust/otap-dataflow/crates/validation/src/scenario.rs b/rust/otap-dataflow/crates/validation/src/scenario.rs index bee631256e..da7aab4c1d 100644 --- a/rust/otap-dataflow/crates/validation/src/scenario.rs +++ b/rust/otap-dataflow/crates/validation/src/scenario.rs @@ -15,11 +15,11 @@ use minijinja::context; use portpicker::pick_unused_port; use std::cell::RefCell; use std::collections::{HashMap, HashSet}; -use std::fs; -use std::path::PathBuf; use std::time::Duration; -const VALIDATION_TEMPLATE_PATH: &str = "templates/validation_template.yaml.j2"; +const VALIDATION_TEMPLATE: &str = include_str!("../templates/validation_template.yaml.j2"); +const CAPTURE_TEMPLATE: &str = include_str!("../templates/capture_template.yaml.j2"); +const GENERATOR_TEMPLATE: &str = include_str!("../templates/generator_template.yaml.j2"); const DEFAULT_ADMIN_ADDR: &str = "127.0.0.1:8085"; const DEFAULT_READY_MAX_ATTEMPTS: usize = 10; const DEFAULT_READY_BACKOFF: Duration = Duration::from_secs(3); @@ -60,7 +60,6 @@ pub struct Scenario { generators: HashMap, captures: HashMap, containers: HashMap, - template_path: PathBuf, admin_addr: String, ready_max_attempts: usize, ready_backoff: Duration, @@ -83,7 +82,6 @@ impl Scenario { generators: HashMap::new(), captures: HashMap::new(), containers: HashMap::new(), - template_path: PathBuf::from(VALIDATION_TEMPLATE_PATH), admin_addr: DEFAULT_ADMIN_ADDR.to_string(), ready_max_attempts: DEFAULT_READY_MAX_ATTEMPTS, ready_backoff: DEFAULT_READY_BACKOFF, @@ -197,17 +195,11 @@ impl Scenario { .ok_or_else(|| ValidationError::Config("pipeline missing".into()))?; let pipeline_yaml = pipeline.to_yaml_string()?; let (suv_core_start, suv_core_end) = (pipeline.core_start, pipeline.core_end); + let suv_transport_headers_policy = pipeline.transport_headers_policy_yaml()?; let capture_pipeline = self.render_captures()?; let generator_pipeline = self.render_generators()?; - let template = fs::read_to_string(&self.template_path).map_err(|e| { - ValidationError::Io(format!( - "failed to read {}: {e}", - self.template_path.display() - )) - })?; - render_jinja( - &template, + VALIDATION_TEMPLATE, context! { suv_pipeline => pipeline_yaml, admin_bind_address => &self.admin_addr, @@ -215,6 +207,7 @@ impl Scenario { generator_pipeline => generator_pipeline, suv_core_start => suv_core_start, suv_core_end => suv_core_end, + suv_transport_headers_policy => suv_transport_headers_policy, }, ) } @@ -371,8 +364,6 @@ impl Scenario { /// Render the capture pipelines. fn render_captures(&self) -> Result { - let template = fs::read_to_string("templates/capture_template.yaml.j2") - .map_err(|e| ValidationError::Io(format!("failed to read capture template: {e}")))?; let mut captures_rendered: Vec = vec![]; for (label, capture) in self.captures.iter() { @@ -385,7 +376,7 @@ impl Scenario { }; captures_rendered.push(render_jinja( - &template, + CAPTURE_TEMPLATE, context! { suv_receiver_type => &capture.suv_receiver_type, suv_port => capture.suv_port, @@ -396,6 +387,7 @@ impl Scenario { capture_label => label, custom_suv_receiver => &custom_suv_receiver, idle_timeout_secs => capture.idle_timeout, + capture_header_keys => &capture.capture_header_keys, }, )?); } @@ -404,8 +396,6 @@ impl Scenario { /// Render the generator pipelines. fn render_generators(&self) -> Result { - let template = fs::read_to_string("templates/generator_template.yaml.j2") - .map_err(|e| ValidationError::Io(format!("failed to read generator template: {e}")))?; let mut generators_rendered: Vec = vec![]; for (label, generator) in self.generators.iter() { @@ -442,8 +432,16 @@ impl Scenario { .as_ref() .map_or("localhost", |t| t.server_name.as_str()); + // Only pass transport_headers when non-empty; the template checks + // for truthiness so an empty map would be falsy. + let transport_headers = if generator.transport_headers.is_empty() { + None + } else { + Some(&generator.transport_headers) + }; + generators_rendered.push(render_jinja( - &template, + GENERATOR_TEMPLATE, context! { suv_exporter_type => &generator.suv_exporter_type, control_ports => generator.control_ports, @@ -465,6 +463,7 @@ impl Scenario { mtls_enabled => mtls_enabled, tls_server_name => tls_server_name, custom_suv_exporter => &custom_suv_exporter, + transport_headers => transport_headers, }, )?); } diff --git a/rust/otap-dataflow/crates/validation/src/traffic.rs b/rust/otap-dataflow/crates/validation/src/traffic.rs index c301b0dc86..cc87cc9c10 100644 --- a/rust/otap-dataflow/crates/validation/src/traffic.rs +++ b/rust/otap-dataflow/crates/validation/src/traffic.rs @@ -12,6 +12,7 @@ use minijinja::context; use otap_df_core_nodes::receivers::fake_data_generator::config::DataSource; use serde::{Deserialize, Serialize}; use serde_yaml; +use std::collections::HashMap; use std::path::PathBuf; const DEFAULT_MAX_SIGNAL_COUNT: usize = 2000; const DEFAULT_MAX_BATCH_SIZE: usize = 100; @@ -223,6 +224,13 @@ pub struct Generator { pub(crate) tls: Option, + /// Optional transport headers to attach to each generated pdata message. + /// + /// Keys are header names. Values are optional fixed strings; when left + /// empty (`None`), a random value is generated once at startup by the + /// fake data generator. + pub(crate) transport_headers: HashMap>, + /// Optional connection to a test container. When set, the generator's /// exporter node is rendered from the connection's Jinja2 template instead /// of the built-in OTLP/OTAP exporter, and SUV pipeline wiring is skipped. @@ -252,6 +260,11 @@ pub struct Capture { /// stream settled and performing the final validation check. pub(crate) idle_timeout: u8, + /// Transport header keys the capture pipeline's receiver should extract + /// from inbound gRPC metadata. Each key generates a `match_names` rule + /// in the pipeline's `header_capture` policy. + pub(crate) capture_header_keys: Vec, + /// Optional connection to a test container. When set, the capture's /// receiver node is rendered from the connection's Jinja2 template instead /// of the built-in OTLP/OTAP receiver, and SUV pipeline wiring is skipped. @@ -351,6 +364,23 @@ impl Generator { self } + /// Configure transport headers to inject into generated traffic. + /// + /// Each key is a header name; the value is an optional fixed string. + /// When the value is `None`, the fake data generator assigns a random + /// value at startup. + #[must_use] + pub fn with_transport_headers( + mut self, + headers: impl IntoIterator, Option>)>, + ) -> Self { + self.transport_headers = headers + .into_iter() + .map(|(k, v)| (k.into(), v.map(Into::into))) + .collect(); + self + } + /// Connect this generator to a test container using a custom exporter. /// /// The generator's `suv_exporter` node will be rendered from the @@ -385,6 +415,7 @@ impl Default for Generator { log_weight: DEFAULT_LOG_WEIGHT, data_source: DataSource::Static, tls: None, + transport_headers: HashMap::new(), container_connection: None, } } @@ -439,6 +470,21 @@ impl Capture { self.idle_timeout = timeout_secs; self } + /// Configure transport header keys to capture from inbound signals. + /// + /// Each key becomes a `match_names` rule in the capture pipeline's + /// `header_capture` policy, enabling the receiver to extract those + /// headers from gRPC metadata so they can be validated by transport + /// header validation instructions. + #[must_use] + pub fn with_capture_header_keys( + mut self, + keys: impl IntoIterator>, + ) -> Self { + self.capture_header_keys = keys.into_iter().map(Into::into).collect(); + self + } + /// Connect this capture to a test container using a custom receiver. /// /// The capture's `suv_receiver` node will be rendered from the @@ -477,6 +523,7 @@ impl Default for Capture { control_streams: vec![], validate: vec![], idle_timeout: DEFAULT_IDLE_TIMEOUT_SECS, + capture_header_keys: Vec::new(), container_connection: None, } } @@ -501,6 +548,7 @@ mod tests { assert_eq!(g.data_source, DataSource::Static); assert_eq!(g.core_start, 2); assert_eq!(g.core_end, 2); + assert!(g.transport_headers.is_empty()); } #[test] @@ -513,6 +561,7 @@ mod tests { assert_eq!(c.validate, vec![]); assert_eq!(c.core_start, 1); assert_eq!(c.core_end, 1); + assert!(c.capture_header_keys.is_empty()); } #[test] @@ -555,4 +604,30 @@ mod tests { let c = Capture::default().otap_grpc("receiver"); assert_eq!(c.suv_receiver_type, MessageType::Otap); } + + #[test] + fn capture_with_header_keys() { + let c = Capture::default().with_capture_header_keys(["x-tenant-id", "x-request-id"]); + assert_eq!( + c.capture_header_keys, + vec!["x-tenant-id".to_string(), "x-request-id".to_string()] + ); + } + + #[test] + fn generator_with_transport_headers() { + let g = Generator::logs().with_transport_headers([ + ("x-tenant-id", Some("acme")), + ("x-request-id", None::<&str>), + ]); + assert_eq!(g.transport_headers.len(), 2); + assert_eq!( + g.transport_headers.get("x-tenant-id"), + Some(&Some("acme".to_string())) + ); + assert_eq!( + g.transport_headers.get("x-request-id"), + Some(&None::) + ); + } } diff --git a/rust/otap-dataflow/crates/validation/src/validation_exporter.rs b/rust/otap-dataflow/crates/validation/src/validation_exporter.rs index 8b227645cf..c6b91a013c 100644 --- a/rust/otap-dataflow/crates/validation/src/validation_exporter.rs +++ b/rust/otap-dataflow/crates/validation/src/validation_exporter.rs @@ -10,6 +10,7 @@ use linkme::distributed_slice; use otap_df_config::NodeId as NodeName; use otap_df_config::error::Error as ConfigError; use otap_df_config::node::NodeUserConfig; +use otap_df_config::transport_headers::TransportHeaders; use otap_df_engine::ExporterFactory; use otap_df_engine::config::ExporterConfig; use otap_df_engine::context::PipelineContext; @@ -85,6 +86,10 @@ pub struct ValidationExporter { validations: Vec, control_msgs: Vec, suv_msgs: Vec<(OtlpProtoMessage, Duration)>, + /// Transport headers extracted from each SUV message's pipeline context. + /// Stored separately from signal data since header validation is + /// independent of the OTLP payload. + suv_transport_headers: Vec>, metrics: MetricSet, /// Duration to wait with no incoming messages before declaring the stream /// settled and performing the final validation. @@ -113,10 +118,22 @@ pub static VALIDATION_EXPORTER_FACTORY: ExporterFactory = ExporterFac impl ValidationExporter { /// Run the configured validations and update metrics. + /// + /// The `OtlpProtoMessage` projection is built once here so that + /// multiple [`ValidationInstructions`] can share it without + /// redundant cloning. fn validate_and_record(&mut self) { + let suv_msgs: Vec = + self.suv_msgs.iter().map(|(msg, _)| msg.clone()).collect(); + let mut valid = true; - for validate in &self.validations { - valid &= validate.validate(&self.control_msgs, &self.suv_msgs); + for instruction in &self.validations { + valid &= instruction.validate( + &self.control_msgs, + &suv_msgs, + &self.suv_msgs, + &self.suv_transport_headers, + ); } if valid { @@ -161,6 +178,7 @@ impl ValidationExporter { metrics, control_msgs: Vec::new(), suv_msgs: Vec::new(), + suv_transport_headers: Vec::new(), idle_timeout: Duration::from_secs(config.idle_timeout_secs), }) } @@ -205,6 +223,7 @@ impl Exporter for ValidationExporter { let time_elapsed = time.elapsed(); let (context, payload) = pdata.into_parts(); let source_node = context.source_node(); + let transport_headers = context.transport_headers().cloned(); let msg = OtlpProtoBytes::try_from(payload) .ok() .and_then(|bytes| OtlpProtoMessage::try_from(bytes).ok()); @@ -213,15 +232,14 @@ impl Exporter for ValidationExporter { if let Some(node_index) = source_node { if node_index == self.suv_index { self.suv_msgs.push((msg, time_elapsed)); - self.validate_and_record(); + self.suv_transport_headers.push(transport_headers); time = Instant::now(); } else if self.control_indices.contains(&node_index) { self.control_msgs.push(msg); - self.validate_and_record(); } } else if self.control_indices.is_empty() { self.suv_msgs.push((msg, time_elapsed)); - self.validate_and_record(); + self.suv_transport_headers.push(transport_headers); time = Instant::now(); } else { otel_error!("validation.missing.source"); diff --git a/rust/otap-dataflow/crates/validation/src/validation_types/mod.rs b/rust/otap-dataflow/crates/validation/src/validation_types/mod.rs index ad8955f4e7..99e84e6575 100644 --- a/rust/otap-dataflow/crates/validation/src/validation_types/mod.rs +++ b/rust/otap-dataflow/crates/validation/src/validation_types/mod.rs @@ -6,17 +6,23 @@ pub mod attributes; mod batch; mod signal_dropped; +pub mod transport_headers; use attributes::{ AttributeDomain, KeyValue, validate_deny_keys, validate_no_duplicate_keys, validate_require_key_values, validate_require_keys, }; use batch::{validate_batch_bytes, validate_batch_items}; +use otap_df_config::transport_headers::TransportHeaders; use otap_df_pdata::proto::OtlpProtoMessage; use otap_df_pdata::testing::equiv::validate_equivalent; use serde::{Deserialize, Serialize}; use signal_dropped::validate_signal_drop; use std::time::Duration; +use transport_headers::{ + TransportHeaderKeyValue, validate_transport_header_deny_keys, + validate_transport_header_require_key_values, validate_transport_header_require_keys, +}; /// Supported validation instructions executed by the validation exporter. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] @@ -79,6 +85,21 @@ pub enum ValidationInstructions { }, /// Ensure no duplicate attribute keys within all attribute lists. AttributeNoDuplicate, + /// Require specific transport header keys to be present on SUV messages. + TransportHeaderRequireKey { + /// Header keys (stored/logical names) that must be present. + keys: Vec, + }, + /// Require specific transport header key/value pairs on SUV messages. + TransportHeaderRequireKeyValue { + /// Key/value pairs that must be present (values compared as UTF-8 text). + pairs: Vec, + }, + /// Forbid specific transport header keys on SUV messages. + TransportHeaderDeny { + /// Header keys (stored/logical names) that must NOT be present. + keys: Vec, + }, } impl ValidationInstructions { /// Evaluate this validation against control and system-under-validation messages. @@ -86,36 +107,45 @@ impl ValidationInstructions { pub(crate) fn validate( &self, control: &[OtlpProtoMessage], - suv: &[(OtlpProtoMessage, Duration)], + suv_msgs: &[OtlpProtoMessage], + suv_with_duration: &[(OtlpProtoMessage, Duration)], + transport_headers: &[Option], ) -> bool { - let suv_msgs: Vec = suv.iter().map(|(msg, _)| msg.clone()).collect(); - match self { - ValidationInstructions::Equivalence => validate_equivalent(control, &suv_msgs), + ValidationInstructions::Equivalence => validate_equivalent(control, suv_msgs), ValidationInstructions::SignalDrop { min_drop_ratio, max_drop_ratio, - } => validate_signal_drop(control, &suv_msgs, *min_drop_ratio, *max_drop_ratio), + } => validate_signal_drop(control, suv_msgs, *min_drop_ratio, *max_drop_ratio), ValidationInstructions::BatchItems { min_batch_size, max_batch_size, timeout, - } => validate_batch_items(suv, min_batch_size, max_batch_size, timeout), + } => validate_batch_items(suv_with_duration, min_batch_size, max_batch_size, timeout), ValidationInstructions::BatchBytes { min_bytes, max_bytes, timeout, - } => validate_batch_bytes(suv, min_bytes, max_bytes, timeout), + } => validate_batch_bytes(suv_with_duration, min_bytes, max_bytes, timeout), ValidationInstructions::AttributeDeny { domains, keys } => { - validate_deny_keys(&suv_msgs, domains, keys) + validate_deny_keys(suv_msgs, domains, keys) } ValidationInstructions::AttributeRequireKey { domains, keys } => { - validate_require_keys(&suv_msgs, domains, keys) + validate_require_keys(suv_msgs, domains, keys) } ValidationInstructions::AttributeRequireKeyValue { domains, pairs } => { - validate_require_key_values(&suv_msgs, domains, pairs) + validate_require_key_values(suv_msgs, domains, pairs) + } + ValidationInstructions::AttributeNoDuplicate => validate_no_duplicate_keys(suv_msgs), + ValidationInstructions::TransportHeaderRequireKey { keys } => { + validate_transport_header_require_keys(transport_headers, keys) + } + ValidationInstructions::TransportHeaderRequireKeyValue { pairs } => { + validate_transport_header_require_key_values(transport_headers, pairs) + } + ValidationInstructions::TransportHeaderDeny { keys } => { + validate_transport_header_deny_keys(transport_headers, keys) } - ValidationInstructions::AttributeNoDuplicate => validate_no_duplicate_keys(&suv_msgs), } } } @@ -123,6 +153,7 @@ impl ValidationInstructions { mod tests { use super::*; use crate::validation_types::attributes::{AnyValue, KeyValue}; + use crate::validation_types::transport_headers::TransportHeaderKeyValue; use otap_df_pdata::proto::opentelemetry::common::v1::{ AnyValue as ProtoAny, KeyValue as ProtoKV, any_value::Value as ProtoVal, }; @@ -151,11 +182,15 @@ mod tests { .collect() } + fn no_headers(count: usize) -> Vec> { + vec![None; count] + } + #[test] fn equivalence_true_on_matching() { let msgs = vec![logs_with_records(2)]; let suv = with_duration(&msgs); - assert!(ValidationInstructions::Equivalence.validate(&msgs, &suv)); + assert!(ValidationInstructions::Equivalence.validate(&msgs, &msgs, &suv, &no_headers(1))); } #[test] @@ -206,30 +241,37 @@ mod tests { }], })]; let right_suv = with_duration(&right); - assert!(!ValidationInstructions::Equivalence.validate(&left, &right_suv)); + assert!(!ValidationInstructions::Equivalence.validate( + &left, + &right, + &right_suv, + &no_headers(1) + )); } #[test] fn batch_respects_bounds() { let msgs = vec![logs_with_records(3)]; let suv = with_duration(&msgs); + let headers = no_headers(1); let instruction = ValidationInstructions::BatchItems { min_batch_size: Some(2), max_batch_size: Some(5), timeout: None, }; - assert!(instruction.validate(&msgs, &suv)); + assert!(instruction.validate(&msgs, &msgs, &suv, &headers)); let failing = ValidationInstructions::BatchItems { min_batch_size: Some(4), max_batch_size: Some(5), timeout: None, }; - assert!(!failing.validate(&msgs, &suv)); + assert!(!failing.validate(&msgs, &msgs, &suv, &headers)); } #[test] fn batch_bytes_respects_bounds() { let msgs = vec![logs_with_records(1)]; let suv = with_duration(&msgs); + let headers = no_headers(1); let mut buf = Vec::new(); // compute encoded size of the latest SUV message let latest = msgs.last().unwrap(); @@ -254,9 +296,9 @@ mod tests { timeout: None, }; - assert!(pass.validate(&msgs, &suv)); - assert!(!fail_small.validate(&msgs, &suv)); - assert!(!fail_large.validate(&msgs, &suv)); + assert!(pass.validate(&msgs, &msgs, &suv, &headers)); + assert!(!fail_small.validate(&msgs, &msgs, &suv, &headers)); + assert!(!fail_large.validate(&msgs, &msgs, &suv, &headers)); } #[test] @@ -280,12 +322,15 @@ mod tests { ..Default::default() }], }; - let suv = vec![(OtlpProtoMessage::Logs(logs), Duration::from_secs(0))]; + let msg = OtlpProtoMessage::Logs(logs); + let suv_msgs = vec![msg.clone()]; + let suv = vec![(msg, Duration::from_secs(0))]; + let headers = no_headers(1); let check = ValidationInstructions::AttributeRequireKeyValue { domains: vec![AttributeDomain::Signal], pairs: vec![KeyValue::new("foo".into(), AnyValue::String("bar".into()))], }; - assert!(check.validate(&[], &suv)); + assert!(check.validate(&[], &suv_msgs, &suv, &headers)); } #[test] fn attribute_deny_blocks_key() { @@ -308,12 +353,15 @@ mod tests { ..Default::default() }], }; - let suv = vec![(OtlpProtoMessage::Logs(logs), Duration::from_secs(0))]; + let msg = OtlpProtoMessage::Logs(logs); + let suv_msgs = vec![msg.clone()]; + let suv = vec![(msg, Duration::from_secs(0))]; + let headers = no_headers(1); let check = ValidationInstructions::AttributeDeny { domains: vec![AttributeDomain::Signal], keys: vec!["deny".into()], }; - assert!(!check.validate(&[], &suv)); + assert!(!check.validate(&[], &suv_msgs, &suv, &headers)); } #[test] fn attribute_no_duplicate_detects_duplicates() { @@ -344,14 +392,19 @@ mod tests { ..Default::default() }], }; - let suv = vec![(OtlpProtoMessage::Logs(logs), Duration::from_secs(0))]; + let msg = OtlpProtoMessage::Logs(logs); + let suv_msgs = vec![msg.clone()]; + let suv = vec![(msg, Duration::from_secs(0))]; + let headers = no_headers(1); let check = ValidationInstructions::AttributeNoDuplicate; - assert!(!check.validate(&[], &suv)); + assert!(!check.validate(&[], &suv_msgs, &suv, &headers)); } #[test] fn signal_drop_with_ratio_bounds() { let before = vec![logs_with_records(10)]; - let after = with_duration(&[logs_with_records(4)]); + let after_msgs = vec![logs_with_records(4)]; + let after = with_duration(&after_msgs); + let headers = no_headers(1); // drop ratio = 0.6 let pass = ValidationInstructions::SignalDrop { min_drop_ratio: Some(0.5), @@ -365,8 +418,38 @@ mod tests { min_drop_ratio: None, max_drop_ratio: Some(0.4), }; - assert!(pass.validate(&before, &after)); - assert!(!fail_min.validate(&before, &after)); - assert!(!fail_max.validate(&before, &after)); + assert!(pass.validate(&before, &after_msgs, &after, &headers)); + assert!(!fail_min.validate(&before, &after_msgs, &after, &headers)); + assert!(!fail_max.validate(&before, &after_msgs, &after, &headers)); + } + + #[test] + fn transport_header_require_key_serialization_check() { + let instruction = ValidationInstructions::TransportHeaderRequireKey { + keys: vec!["x-tenant-id".into()], + }; + let yaml = serde_yaml::to_string(&instruction).expect("serialize"); + let back: ValidationInstructions = serde_yaml::from_str(&yaml).expect("deserialize"); + assert_eq!(back, instruction); + } + + #[test] + fn transport_header_require_key_value_serialization_check() { + let instruction = ValidationInstructions::TransportHeaderRequireKeyValue { + pairs: vec![TransportHeaderKeyValue::new("x-tenant-id", "acme")], + }; + let yaml = serde_yaml::to_string(&instruction).expect("serialize"); + let back: ValidationInstructions = serde_yaml::from_str(&yaml).expect("deserialize"); + assert_eq!(back, instruction); + } + + #[test] + fn transport_header_deny_serialization_check() { + let instruction = ValidationInstructions::TransportHeaderDeny { + keys: vec!["x-secret".into()], + }; + let yaml = serde_yaml::to_string(&instruction).expect("serialize"); + let back: ValidationInstructions = serde_yaml::from_str(&yaml).expect("deserialize"); + assert_eq!(back, instruction); } } diff --git a/rust/otap-dataflow/crates/validation/src/validation_types/transport_headers.rs b/rust/otap-dataflow/crates/validation/src/validation_types/transport_headers.rs new file mode 100644 index 0000000000..9d585b3720 --- /dev/null +++ b/rust/otap-dataflow/crates/validation/src/validation_types/transport_headers.rs @@ -0,0 +1,343 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Transport header validation helpers. +//! +//! Given SUV messages with optional [`TransportHeaders`], verify that certain +//! header keys or key/value pairs are present (or absent) on every message. +//! +//! For **require** checks (`require_keys`, `require_key_values`), every +//! message must carry transport headers (`Some`). A single `None` entry +//! (a signal that arrived without headers) causes immediate failure. +//! +//! For **deny** checks, `None` entries are acceptable — a signal without +//! headers cannot contain a forbidden key. + +use otap_df_config::transport_headers::TransportHeaders; +use serde::{Deserialize, Serialize}; + +/// A key/value pair for transport header assertions. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct TransportHeaderKeyValue { + /// Header key (stored/logical name). + pub key: String, + /// Expected header value (UTF-8 text). + pub value: String, +} + +impl TransportHeaderKeyValue { + /// Create a new transport header key/value pair. + #[must_use] + pub fn new(key: impl Into, value: impl Into) -> Self { + Self { + key: key.into(), + value: value.into(), + } + } +} + +/// Validate that **every** SUV message has transport headers and that each +/// set of headers contains all specified keys. +/// +/// Returns `false` when: +/// - `suv` is empty (no messages to validate), +/// - any entry is `None` (a signal arrived without transport headers), or +/// - any `Some(TransportHeaders)` is missing a required key. +/// +/// Returns `true` when `keys` is empty (nothing to check). +#[must_use] +pub fn validate_transport_header_require_keys( + suv: &[Option], + keys: &[String], +) -> bool { + if keys.is_empty() { + return true; + } + if suv.is_empty() { + return false; + } + + for entry in suv { + let headers = match entry { + Some(h) => h, + None => return false, + }; + for key in keys { + // check that key exists + if headers.find_by_name(key).next().is_none() { + return false; + } + } + } + + true +} + +/// Validate that **every** SUV message has transport headers and that each +/// set of headers contains all specified key/value pairs. +/// +/// Values are compared as UTF-8 text (case-sensitive). +/// +/// Returns `false` when: +/// - `suv` is empty (no messages to validate), +/// - any entry is `None` (a signal arrived without transport headers), or +/// - any `Some(TransportHeaders)` is missing a required key or has a +/// mismatched value. +/// +/// Returns `true` when `pairs` is empty (nothing to check). +#[must_use] +pub fn validate_transport_header_require_key_values( + suv: &[Option], + pairs: &[TransportHeaderKeyValue], +) -> bool { + if pairs.is_empty() { + return true; + } + if suv.is_empty() { + return false; + } + + for entry in suv { + let headers = match entry { + Some(h) => h, + None => return false, + }; + 'pairs: for pair in pairs { + for header in headers.find_by_name(&pair.key) { + match std::str::from_utf8(&header.value) { + Ok(value_str) if value_str == pair.value => continue 'pairs, + _ => continue, + } + } + return false; + } + } + + true +} + +/// Validate that no SUV message carrying transport headers contains any of +/// the specified keys. +/// +/// `None` entries (signals without transport headers) are acceptable — a +/// signal that never received headers cannot contain a forbidden key. +#[must_use] +pub fn validate_transport_header_deny_keys( + suv: &[Option], + keys: &[String], +) -> bool { + if keys.is_empty() { + return true; + } + + for headers in suv.iter().flatten() { + for key in keys { + if headers.find_by_name(key).next().is_some() { + return false; + } + } + } + + true +} + +#[cfg(test)] +mod tests { + use super::*; + use otap_df_config::transport_headers::{TransportHeader, TransportHeaders}; + + fn make_headers(entries: &[(&str, &str)]) -> TransportHeaders { + let mut headers = TransportHeaders::default(); + for (name, value) in entries { + headers.push(TransportHeader::text(*name, *name, value.as_bytes())); + } + headers + } + + #[test] + fn require_keys_passes_when_all_present() { + let headers = make_headers(&[("x-tenant-id", "acme"), ("x-request-id", "abc")]); + let suv = vec![Some(headers)]; + assert!(validate_transport_header_require_keys( + &suv, + &["x-tenant-id".into(), "x-request-id".into()], + )); + } + + #[test] + fn require_keys_fails_when_key_missing() { + let headers = make_headers(&[("x-tenant-id", "acme")]); + let suv = vec![Some(headers)]; + assert!(!validate_transport_header_require_keys( + &suv, + &["x-tenant-id".into(), "x-missing".into()], + )); + } + + #[test] + fn require_keys_fails_when_no_messages_have_headers() { + let suv: Vec> = vec![None, None]; + assert!(!validate_transport_header_require_keys( + &suv, + &["x-tenant-id".into()], + )); + } + + #[test] + fn require_key_values_passes_on_match() { + let headers = make_headers(&[("x-tenant-id", "acme")]); + let suv = vec![Some(headers)]; + assert!(validate_transport_header_require_key_values( + &suv, + &[TransportHeaderKeyValue::new("x-tenant-id", "acme")], + )); + } + + #[test] + fn require_key_values_fails_on_value_mismatch() { + let headers = make_headers(&[("x-tenant-id", "acme")]); + let suv = vec![Some(headers)]; + assert!(!validate_transport_header_require_key_values( + &suv, + &[TransportHeaderKeyValue::new("x-tenant-id", "other")], + )); + } + + #[test] + fn require_key_values_fails_on_missing_key() { + let headers = make_headers(&[("x-tenant-id", "acme")]); + let suv = vec![Some(headers)]; + assert!(!validate_transport_header_require_key_values( + &suv, + &[TransportHeaderKeyValue::new("x-missing", "value")], + )); + } + + #[test] + fn deny_keys_passes_when_key_absent() { + let headers = make_headers(&[("x-tenant-id", "acme")]); + let suv = vec![Some(headers)]; + assert!(validate_transport_header_deny_keys( + &suv, + &["x-secret".into()], + )); + } + + #[test] + fn deny_keys_fails_when_key_present() { + let headers = make_headers(&[("x-tenant-id", "acme")]); + let suv = vec![Some(headers)]; + assert!(!validate_transport_header_deny_keys( + &suv, + &["x-tenant-id".into()], + )); + } + + #[test] + fn deny_keys_passes_on_no_headers() { + let suv: Vec> = vec![None]; + assert!(validate_transport_header_deny_keys( + &suv, + &["x-tenant-id".into()], + )); + } + + #[test] + fn empty_keys_always_passes() { + let suv: Vec> = vec![None]; + assert!(validate_transport_header_require_keys(&suv, &[])); + assert!(validate_transport_header_require_key_values(&suv, &[])); + assert!(validate_transport_header_deny_keys(&suv, &[])); + } + + #[test] + fn multiple_messages_all_must_pass() { + let h1 = make_headers(&[("x-tenant-id", "acme"), ("x-request-id", "1")]); + let h2 = make_headers(&[("x-tenant-id", "acme")]); // missing x-request-id + let suv = vec![Some(h1), Some(h2)]; + assert!(!validate_transport_header_require_keys( + &suv, + &["x-tenant-id".into(), "x-request-id".into()], + )); + } + + #[test] + fn require_keys_fails_when_one_message_has_no_headers() { + let headers = make_headers(&[("x-tenant-id", "acme")]); + let suv = vec![Some(headers), None]; + assert!(!validate_transport_header_require_keys( + &suv, + &["x-tenant-id".into()], + )); + } + + #[test] + fn require_key_values_fails_when_one_message_has_no_headers() { + let headers = make_headers(&[("x-tenant-id", "acme")]); + let suv = vec![Some(headers), None]; + assert!(!validate_transport_header_require_key_values( + &suv, + &[TransportHeaderKeyValue::new("x-tenant-id", "acme")], + )); + } + + #[test] + fn require_keys_fails_on_empty_suv() { + let suv: Vec> = vec![]; + assert!(!validate_transport_header_require_keys( + &suv, + &["x-tenant-id".into()], + )); + } + + #[test] + fn require_key_values_fails_on_empty_suv() { + let suv: Vec> = vec![]; + assert!(!validate_transport_header_require_key_values( + &suv, + &[TransportHeaderKeyValue::new("x-tenant-id", "acme")], + )); + } + + #[test] + fn deny_keys_passes_when_mixed_some_and_none() { + let headers = make_headers(&[("x-tenant-id", "acme")]); + let suv = vec![Some(headers), None]; + assert!(validate_transport_header_deny_keys( + &suv, + &["x-secret".into()], + )); + } + + #[test] + fn require_key_values_matches_duplicate_key_second_value() { + // Two headers with the same key but different values. + // The required value matches the second entry. + let headers = make_headers(&[("x-env", "staging"), ("x-env", "production")]); + let suv = vec![Some(headers)]; + assert!(validate_transport_header_require_key_values( + &suv, + &[TransportHeaderKeyValue::new("x-env", "production")], + )); + } + + #[test] + fn require_key_values_matches_duplicate_key_first_value() { + let headers = make_headers(&[("x-env", "staging"), ("x-env", "production")]); + let suv = vec![Some(headers)]; + assert!(validate_transport_header_require_key_values( + &suv, + &[TransportHeaderKeyValue::new("x-env", "staging")], + )); + } + + #[test] + fn require_key_values_fails_when_no_duplicate_matches() { + let headers = make_headers(&[("x-env", "staging"), ("x-env", "production")]); + let suv = vec![Some(headers)]; + assert!(!validate_transport_header_require_key_values( + &suv, + &[TransportHeaderKeyValue::new("x-env", "development")], + )); + } +} diff --git a/rust/otap-dataflow/crates/validation/templates/capture_template.yaml.j2 b/rust/otap-dataflow/crates/validation/templates/capture_template.yaml.j2 index 5ce7c613eb..20e8cc4e35 100644 --- a/rust/otap-dataflow/crates/validation/templates/capture_template.yaml.j2 +++ b/rust/otap-dataflow/crates/validation/templates/capture_template.yaml.j2 @@ -10,6 +10,7 @@ {% set capture_core_start = capture_core_start | default(1) -%} {% set capture_core_end = capture_core_end | default(1) -%} {% set capture_label = capture_label | default("validation_exporter") -%} +{% set capture_header_keys = capture_header_keys | default([]) -%} {{ capture_label }}: policies: resources: @@ -18,6 +19,14 @@ set: - start: {{ capture_core_start }} end: {{ capture_core_end }} +{% if capture_header_keys | length > 0 %} + transport_headers: + header_capture: + headers: +{% for key in capture_header_keys %} + - match_names: ["{{ key }}"] +{% endfor %} +{% endif %} nodes: {% if custom_suv_receiver %} suv_receiver: diff --git a/rust/otap-dataflow/crates/validation/templates/generator_template.yaml.j2 b/rust/otap-dataflow/crates/validation/templates/generator_template.yaml.j2 index 5a57c23244..e33999ab76 100644 --- a/rust/otap-dataflow/crates/validation/templates/generator_template.yaml.j2 +++ b/rust/otap-dataflow/crates/validation/templates/generator_template.yaml.j2 @@ -16,6 +16,7 @@ {% set generator_core_start = generator_core_start | default(2) -%} {% set generator_core_end = generator_core_end | default(2) -%} {% set data_source = data_source | default(static) %} +{% set transport_headers = transport_headers | default(false) -%} {{ generator_label }}: policies: @@ -25,6 +26,14 @@ set: - start: {{ generator_core_start }} end: {{ generator_core_end }} +{% if transport_headers %} + transport_headers: + header_propagation: + default: + selector: + type: all_captured + action: propagate +{% endif %} nodes: {{ generator_label | indent(4, true)}}: type: "urn:otel:receiver:traffic_generator" @@ -37,6 +46,16 @@ metric_weight: {{ metric_weight }} trace_weight: {{ trace_weight }} log_weight: {{ log_weight }} +{% if transport_headers %} + transport_headers: +{% for key, value in transport_headers | items %} +{% if value %} + {{ key }}: "{{ value }}" +{% else %} + {{ key }}: +{% endif %} +{% endfor %} +{% endif %} registry_path: https://github.com/open-telemetry/semantic-conventions.git[model] fanout: type: "urn:otel:processor:fanout_temp" diff --git a/rust/otap-dataflow/crates/validation/templates/validation_template.yaml.j2 b/rust/otap-dataflow/crates/validation/templates/validation_template.yaml.j2 index 66b7315e42..56687706e0 100644 --- a/rust/otap-dataflow/crates/validation/templates/validation_template.yaml.j2 +++ b/rust/otap-dataflow/crates/validation/templates/validation_template.yaml.j2 @@ -1,5 +1,6 @@ # Engine-wide settings {% set admin_bind_address = admin_bind_address | default("127.0.0.1:8085") -%} +{% set suv_transport_headers_policy = suv_transport_headers_policy | default("") -%} version: otel_dataflow/v1 engine: http_admin: @@ -23,6 +24,10 @@ groups: set: - start: {{ suv_core_start | default(0) }} end: {{ suv_core_end | default(0) }} +{% if suv_transport_headers_policy | length > 0 %} + transport_headers: +{{ suv_transport_headers_policy | indent(12, true) }} +{% endif %} {{ suv_pipeline | indent(8, true)}} # ====================================================================== # Pipeline generating traffic