Skip to content
Open
128 changes: 126 additions & 2 deletions rust/otap-dataflow/crates/validation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ e.g. `receiver`, `exporter`.
- each `${KEY}` in the YAML is replaced with the corresponding value
- returns an error if any `${...}` placeholders remain unresolved
- useful for injecting TLS cert/key paths at test time (see [TLS / mTLS](#tls--mtls))
- `with_transport_headers_policy(TransportHeadersPolicy)` - set a transport
headers policy on the SUV pipeline from a typed struct
- controls header capture at receivers and header propagation at exporters
- optional; see [Transport Headers](#transport-headers) for details
- `with_transport_headers_policy_yaml(yaml_str)` - set a transport headers
policy from a YAML string
- returns `Result<Self, ValidationError>` (YAML parsing can fail)
- alternative to the typed struct method
- `connect_container(PipelineContainerConnection)` - declare a connection between
a node in the SUV pipeline and a test container
- the framework rewrites the specified config key with the container's allocated
Expand Down Expand Up @@ -181,6 +189,12 @@ e.g. `receiver`, `exporter`.
- `with_tls(TlsConfig)` - enable TLS on the generator's exporter
- optional; see [TLS / mTLS](#tls--mtls) for details
- uses the built-in TLS support
- `with_transport_headers(headers)` - configure transport headers to inject
into generated traffic
- each key is a header name; the value is an optional fixed string
- when the value is `None`, the fake data generator assigns a random value
at startup
- only meaningful when the pipeline uses OTLP receivers/exporters
- `to_container(ContainerConnection)` - use a custom exporter that sends to a
test container instead of directly to the SUV pipeline receiver
- mutually exclusive with `otlp_grpc()` / `otap_grpc()`
Expand Down Expand Up @@ -376,6 +390,11 @@ should receive control signals from
waits without receiving any messages before declaring the data stream
settled and performing the final validation
- default: 3 seconds
- `with_capture_header_keys(keys)` - configure transport header keys to capture
from inbound signals
- each key becomes a `match_names` rule in the capture pipeline's
`header_capture` policy
- required when using transport header validation instructions
- `from_container(ContainerConnection)` - use a custom receiver that reads from
a test container instead of directly from the SUV pipeline exporter
- mutually exclusive with `otlp_grpc()` / `otap_grpc()`
Expand Down Expand Up @@ -417,8 +436,22 @@ count per message; `min/max` optional; `timeout` optional
- `domains` accepts `AttributeDomain::Resource`, `Scope`, or `Signal`
- `pairs` accepts `Vec<KeyValue>`
- `AttributeNoDuplicate`: check that there are no duplicate attributes

(see `validation_types::attributes` and `validation_types`)
- `TransportHeaderRequireKey { keys }`: specified transport header keys must be
present on every SUV message. Fails if any message is missing transport
headers entirely.
- `TransportHeaderRequireKeyValue { pairs }`: specified transport header
key/value pairs must be present on every SUV message. Values are compared as
UTF-8 text (case-sensitive). When duplicate header keys exist, the check
passes if any entry with that key matches the expected value.
- `pairs` accepts `Vec<TransportHeaderKeyValue>`
- `TransportHeaderDeny { keys }`: specified transport header keys must NOT
appear on any SUV message. Messages without transport headers are acceptable
for this check.

> See [Transport Headers](#transport-headers) for setup and a full example.

(see `validation_types::attributes`, `validation_types::transport_headers`,
and `validation_types`)

> NOTE: Some ValidationInstructions require control signals. Use
`control_streams` on the Capture to declare which generator(s) should
Expand Down Expand Up @@ -446,6 +479,97 @@ it should receive control signals from
to receive from two generators
- each generator label must match a label passed to `add_generator`

### Transport Headers

Validate that transport headers survive the full pipeline chain from generator
through the system-under-validation to the capture pipeline.

> **NOTE:** Only OTLP (`otlp_grpc`) receivers and exporters currently support
> transport header capture and propagation. OTAP receivers/exporters do **not**
> support transport headers.

For these headers to flow through the pipeline chain, each
stage needs to be appropriately configured:

1. **Generator pipeline** -- automatically adds a `header_propagation` policy
(propagate all headers) when `with_transport_headers` is configured.
2. **SUV pipeline** -- requires a `transport_headers` policy with both
`header_capture` (to extract headers from inbound gRPC metadata) and
`header_propagation` (to re-emit headers on the outbound gRPC connection).
Configure via `Pipeline::with_transport_headers_policy_yaml()` or
`Pipeline::with_transport_headers_policy()`.
3. **Capture pipeline** -- requires `with_capture_header_keys` to specify which
headers to extract from inbound gRPC metadata for validation.

#### Transport header validation instructions

| Instruction | Behavior |
| ----------- | -------- |
| `TransportHeaderRequireKey { keys }` | Assert specified header keys exist on every SUV message |
| `TransportHeaderRequireKeyValue { pairs }` | Assert specified key/value pairs exist on every SUV message |
| `TransportHeaderDeny { keys }` | Assert specified header keys do NOT exist on any SUV message |

For `RequireKey` and `RequireKeyValue`, every SUV message must carry transport
headers (`Some`). A single message without headers causes immediate failure.

For `Deny`, messages without transport headers are acceptable -- a signal that
never received headers cannot contain a forbidden key.

#### Example: transport headers validation

```rust
use otap_df_validation::ValidationInstructions;
use otap_df_validation::pipeline::Pipeline;
use otap_df_validation::scenario::Scenario;
use otap_df_validation::traffic::{Capture, Generator};
use otap_df_validation::validation_types::transport_headers::TransportHeaderKeyValue;

Scenario::new()
.pipeline(
Pipeline::from_file("./validation_pipelines/no-processor.yaml")
.expect("load pipeline")
.with_transport_headers_policy_yaml(r#"
header_capture:
headers:
- match_names: ["x-tenant-id"]
header_propagation:
default:
selector:
type: all_captured
action: propagate
"#)
.expect("parse policy"),
)
.add_generator(
"traffic_gen",
Generator::logs()
.fixed_count(500)
.otlp_grpc("receiver")
.static_signals()
.with_transport_headers([("x-tenant-id", Some("test-tenant"))]),
)
.add_capture(
"validate",
Capture::default()
.otlp_grpc("exporter")
.with_capture_header_keys(["x-tenant-id"])
.validate(vec![
ValidationInstructions::TransportHeaderRequireKey {
keys: vec!["x-tenant-id".into()],
},
ValidationInstructions::TransportHeaderRequireKeyValue {
pairs: vec![TransportHeaderKeyValue::new("x-tenant-id", "test-tenant")],
},
ValidationInstructions::TransportHeaderDeny {
keys: vec!["x-should-not-exist".into()],
},
])
.control_streams(["traffic_gen"]),
)
.run()
.expect("transport headers validation failed");
```

### Test Containers

Run Docker containers alongside your validation scenario using the
Expand Down
63 changes: 63 additions & 0 deletions rust/otap-dataflow/crates/validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,69 @@ mod tests {
.run()
.expect("validation scenario failed");
}

/// End-to-end validation: transport headers injected by the fake data
/// generator survive the full pipeline chain (generator → SUV → capture)
/// and can be asserted via transport header validation instructions.
///
/// Only OTLP receivers and exporters support transport header
/// capture/propagation, so every hop in the chain uses OTLP gRPC.
#[test]
fn validation_transport_headers() {
use crate::validation_types::transport_headers::TransportHeaderKeyValue;

let header_key = "x-tenant-id";
let header_value = "test-tenant";

Scenario::new()
.pipeline(
Pipeline::from_file("./validation_pipelines/no-processor.yaml")
.expect("failed to read in pipeline yaml")
.with_transport_headers_policy_yaml(
r#"
header_capture:
headers:
- match_names: ["x-tenant-id"]
header_propagation:
default:
selector:
type: all_captured
action: propagate
"#,
)
.expect("failed to parse transport headers policy"),
)
.add_generator(
"traffic_gen",
Generator::logs()
.fixed_count(500)
.otlp_grpc("receiver")
.core_range(1, 1)
.static_signals()
.with_transport_headers([(header_key, Some(header_value))]),
)
.add_capture(
"validate",
Capture::default()
.otlp_grpc("exporter")
.with_capture_header_keys([header_key])
.validate(vec![
ValidationInstructions::TransportHeaderRequireKey {
keys: vec![header_key.into()],
},
ValidationInstructions::TransportHeaderRequireKeyValue {
pairs: vec![TransportHeaderKeyValue::new(header_key, header_value)],
},
ValidationInstructions::TransportHeaderDeny {
keys: vec!["x-should-not-exist".into()],
},
])
.control_streams(["traffic_gen"])
.core_range(2, 2),
)
.run()
.expect("transport headers validation failed");
}
}

#[cfg(test)]
Expand Down
Loading
Loading