diff --git a/.github/workflows/rust-validation-tests.yml b/.github/workflows/rust-validation-tests.yml index 06e89150af..15e5446c64 100644 --- a/.github/workflows/rust-validation-tests.yml +++ b/.github/workflows/rust-validation-tests.yml @@ -15,6 +15,16 @@ permissions: on: workflow_dispatch: + inputs: + pr_number: + description: "Pull request number to run validation tests against" + required: true + type: number + +# Cancel in-progress runs for the same PR if a new manual run is started +concurrency: + group: ${{ github.workflow }}-${{ inputs.pr_number }} + cancel-in-progress: true env: CARGO_TERM_COLOR: always @@ -23,9 +33,12 @@ jobs: validation_tests: runs-on: ubuntu-latest steps: - - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + - name: Checkout PR merge ref + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 with: + ref: refs/pull/${{ inputs.pr_number }}/merge submodules: true + fetch-depth: 0 - uses: arduino/setup-protoc@c65c819552d16ad3c9b72d9dfd5ba5237b9c906b # v3.0.0 with: repo-token: ${{ secrets.GITHUB_TOKEN }} diff --git a/rust/otap-dataflow/crates/validation/src/lib.rs b/rust/otap-dataflow/crates/validation/src/lib.rs index 6365148332..39be8fe264 100644 --- a/rust/otap-dataflow/crates/validation/src/lib.rs +++ b/rust/otap-dataflow/crates/validation/src/lib.rs @@ -170,6 +170,854 @@ mod tests { .expect("filter processor validation failed"); } + /// Validates the log sampling processor with a ratio policy (emit 1 out of + /// 10) using 10 signals. With a small input the sampler should still drop + /// approximately 90% of log records. + #[test] + fn validation_log_sampling_ratio_pipeline_1() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/log-sampling-ratio-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(10) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ + ValidationInstructions::SignalDrop { + min_drop_ratio: Some(0.80), + max_drop_ratio: Some(0.99), + }, + ValidationInstructions::AttributeNoDuplicate, + ]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("log sampling ratio validation failed for signal_count=10"); + } + + /// Validates the log sampling processor with a ratio policy (emit 1 out of + /// 10) using 100 signals, matching the default batch size boundary. + #[test] + fn validation_log_sampling_ratio_pipeline_2() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/log-sampling-ratio-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(100) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ + ValidationInstructions::SignalDrop { + min_drop_ratio: Some(0.80), + max_drop_ratio: Some(0.99), + }, + ValidationInstructions::AttributeNoDuplicate, + ]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("log sampling ratio validation failed for signal_count=100"); + } + + /// Validates the log sampling processor with a ratio policy (emit 1 out of + /// 10) using 500 signals spanning multiple batches. + #[test] + fn validation_log_sampling_ratio_pipeline_3() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/log-sampling-ratio-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ + ValidationInstructions::SignalDrop { + min_drop_ratio: Some(0.80), + max_drop_ratio: Some(0.99), + }, + ValidationInstructions::AttributeNoDuplicate, + ]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("log sampling ratio validation failed for signal_count=500"); + } + + /// Tests the transform processor KQL `where ==` operation. Filters logs + /// keeping only ERROR-severity records. Static logs are ~5% ERROR, ~15% + /// WARN, ~80% INFO, so the vast majority should be dropped. + /// Query: `logs | where severity_text == "ERROR"` + #[test] + fn validation_transform_kql_where_eq() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/transform-kql-where-eq-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ + ValidationInstructions::SignalDrop { + min_drop_ratio: Some(0.80), + max_drop_ratio: Some(0.99), + }, + ValidationInstructions::AttributeNoDuplicate, + ]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("transform kql where-eq validation failed"); + } + + /// Validates the log sampling processor with a zip policy (max 50 items per + /// 60-second window) using 10 signals. All signals are within the budget + /// so everything should pass through unchanged. + #[test] + fn validation_log_sampling_zip_pipeline_1() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/log-sampling-zip-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(10) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ValidationInstructions::Equivalence]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("log sampling zip validation failed for signal_count=10"); + } + + /// Validates the log sampling processor with a zip policy (max 50 items per + /// 60-second window) using 100 signals. The budget is exceeded so + /// approximately 50% of signals should be dropped. + #[test] + fn validation_log_sampling_zip_pipeline_2() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/log-sampling-zip-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(100) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ + ValidationInstructions::SignalDrop { + min_drop_ratio: Some(0.45), + max_drop_ratio: Some(0.55), + }, + ValidationInstructions::AttributeNoDuplicate, + ]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("log sampling zip validation failed for signal_count=100"); + } + + /// Validates the log sampling processor with a zip policy (max 50 items per + /// 60-second window) using 500 signals spanning multiple batches. The budget + /// is far exceeded so approximately 90% of signals should be dropped. + #[test] + fn validation_log_sampling_zip_pipeline_3() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/log-sampling-zip-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ + ValidationInstructions::SignalDrop { + min_drop_ratio: Some(0.85), + max_drop_ratio: Some(0.95), + }, + ValidationInstructions::AttributeNoDuplicate, + ]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("log sampling zip validation failed for signal_count=500"); + } + + /// Validates that the log sampling processor (ratio policy) passes non-log + /// signals through unchanged. Sends trace signals through a ratio sampler + /// and asserts semantic equivalence -- the sampler should not alter traces + /// at all. + #[test] + fn validation_log_sampling_ratio_passthrough_traces() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/log-sampling-ratio-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::traces() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ValidationInstructions::Equivalence]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("log sampling ratio passthrough traces validation failed"); + } + + /// Validates that the log sampling processor (ratio policy) passes metric + /// signals through unchanged. The ratio sampler only operates on logs; + /// metrics should be forwarded with no modifications. + #[test] + fn validation_log_sampling_ratio_passthrough_metrics() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/log-sampling-ratio-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::metrics() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ValidationInstructions::Equivalence]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("log sampling ratio passthrough metrics validation failed"); + } + + /// Validates the log sampling processor with a full passthrough ratio policy + /// (emit 1 out of 1 = 100% sampling). All log signals should pass through + /// unchanged. + #[test] + fn validation_log_sampling_ratio_full_passthrough() { + Scenario::new() + .pipeline( + Pipeline::from_file( + "./validation_pipelines/log-sampling-full-passthrough-processor.yaml", + ) + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ValidationInstructions::Equivalence]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("log sampling ratio full passthrough validation failed"); + } + + /// Tests the transform processor KQL `where !=` operation. Filters logs + /// dropping INFO-severity records and keeping WARN + ERROR. Static logs + /// are ~80% INFO so roughly 80% should be dropped. + /// Query: `logs | where severity_text != "INFO"` + #[test] + fn validation_transform_kql_where_neq() { + Scenario::new() + .pipeline( + Pipeline::from_file( + "./validation_pipelines/transform-kql-where-neq-processor.yaml", + ) + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ + ValidationInstructions::SignalDrop { + min_drop_ratio: Some(0.70), + max_drop_ratio: Some(0.90), + }, + ValidationInstructions::AttributeNoDuplicate, + ]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("transform kql where-neq validation failed"); + } + + /// Tests the transform processor OPL `set` operation on a signal attribute. + /// Adds a new `processed_by` attribute to every log record. + /// Query: `logs | set attributes["processed_by"] = "transform"` + #[test] + fn validation_transform_opl_set_attribute() { + Scenario::new() + .pipeline( + Pipeline::from_file( + "./validation_pipelines/transform-opl-set-attribute-processor.yaml", + ) + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ValidationInstructions::AttributeRequireKey { + domains: vec![AttributeDomain::Signal], + keys: vec!["processed_by".into()], + }]) + .core_range(2, 2), + ) + .run() + .expect("transform opl set-attribute validation failed"); + } + + /// Tests the transform processor KQL signal scoping passthrough. The query + /// is scoped to `logs |`, so trace signals should be forwarded unchanged. + /// Query: `logs | where severity_text == "ERROR"` (KQL, but traces sent) + #[test] + fn validation_transform_kql_passthrough_traces() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/transform-kql-where-eq-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::traces() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ValidationInstructions::Equivalence]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("transform kql passthrough traces validation failed"); + } + + /// Tests the transform processor OPL `exclude` operation. Removes the + /// `thread.id` attribute from every log record while preserving `thread.name`. + /// Query: `logs | exclude attributes["thread.id"]` + #[test] + fn validation_transform_opl_exclude_attribute() { + Scenario::new() + .pipeline( + Pipeline::from_file( + "./validation_pipelines/transform-opl-exclude-attribute-processor.yaml", + ) + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ + ValidationInstructions::AttributeDeny { + domains: vec![AttributeDomain::Signal], + keys: vec!["thread.id".into()], + }, + ValidationInstructions::AttributeRequireKey { + domains: vec![AttributeDomain::Signal], + keys: vec!["thread.name".into()], + }, + ]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("transform opl exclude-attribute validation failed"); + } + + /// Tests the transform processor OPL `rename` operation. Renames the + /// `thread.id` attribute to `new_thread_id`. The old key must be absent + /// and the new key must be present on every log record. + /// Query: `logs | rename attributes["new_thread_id"] = attributes["thread.id"]` + #[test] + fn validation_transform_opl_rename_attribute() { + Scenario::new() + .pipeline( + Pipeline::from_file( + "./validation_pipelines/transform-opl-rename-attribute-processor.yaml", + ) + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ + ValidationInstructions::AttributeDeny { + domains: vec![AttributeDomain::Signal], + keys: vec!["thread.id".into()], + }, + ValidationInstructions::AttributeRequireKey { + domains: vec![AttributeDomain::Signal], + keys: vec!["new_thread_id".into()], + }, + ]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("transform opl rename-attribute validation failed"); + } + + /// Tests the transform processor OPL `if/else` conditional with `set`. + /// Sets `is_error` to `"true"` for ERROR logs and `"false"` for all others. + /// Every log record should have the `is_error` attribute regardless of + /// severity. + /// Query: `logs | if (severity_text == "ERROR") { set attributes["is_error"] + /// = "true" } else { set attributes["is_error"] = "false" }` + #[test] + fn validation_transform_opl_conditional_set() { + Scenario::new() + .pipeline( + Pipeline::from_file( + "./validation_pipelines/transform-opl-conditional-set-processor.yaml", + ) + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ValidationInstructions::AttributeRequireKey { + domains: vec![AttributeDomain::Signal], + keys: vec!["is_error".into()], + }]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("transform opl conditional-set validation failed"); + } + + /// Tests the transform processor with chained OPL `set` + `exclude` + /// operations. First adds a `processed` attribute, then removes + /// `thread.id`. Both transformations must be reflected in the output. + /// Query: `logs | set attributes["processed"] = "yes" | exclude + /// attributes["thread.id"]` + #[test] + fn validation_transform_opl_chained_set_exclude() { + Scenario::new() + .pipeline( + Pipeline::from_file( + "./validation_pipelines/transform-opl-chained-set-exclude-processor.yaml", + ) + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ + ValidationInstructions::AttributeRequireKey { + domains: vec![AttributeDomain::Signal], + keys: vec!["processed".into()], + }, + ValidationInstructions::AttributeDeny { + domains: vec![AttributeDomain::Signal], + keys: vec!["thread.id".into()], + }, + ]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("transform opl chained set+exclude validation failed"); + } + + /// Tests the transform processor OPL `set` operation on a resource + /// attribute. Adds `env` = `"test"` to every resource. + /// Query: `logs | set resource.attributes["env"] = "test"` + #[test] + fn validation_transform_opl_set_resource_attribute() { + Scenario::new() + .pipeline( + Pipeline::from_file( + "./validation_pipelines/transform-opl-set-resource-attribute-processor.yaml", + ) + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ValidationInstructions::AttributeRequireKeyValue { + domains: vec![AttributeDomain::Resource], + pairs: vec![KeyValue::new("env".into(), AnyValue::String("test".into()))], + }]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("transform opl set-resource-attribute validation failed"); + } + + /// Validates that the temporal reaggregation processor passes log signals + /// through unchanged. The processor only operates on metrics; logs should + /// be forwarded with no modifications. + #[test] + fn validation_temporal_reaggregation_passthrough_logs() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/temporal-reaggregation-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ValidationInstructions::Equivalence]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("temporal reaggregation passthrough logs validation failed"); + } + + /// Validates that the temporal reaggregation processor passes trace signals + /// through unchanged. The processor only operates on metrics; traces should + /// be forwarded with no modifications. + #[test] + fn validation_temporal_reaggregation_passthrough_traces() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/temporal-reaggregation-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::traces() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ValidationInstructions::Equivalence]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("temporal reaggregation passthrough traces validation failed"); + } + + /// Validates the temporal reaggregation processor with metric signals. + /// All data points share the same stream identity and collapse to a small + /// number of outputs per flush window, yielding at least 95% signal reduction. + #[test] + fn validation_temporal_reaggregation_metrics() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/temporal-reaggregation-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::metrics() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ + ValidationInstructions::SignalDrop { + min_drop_ratio: Some(0.95), + max_drop_ratio: Some(1.0), + }, + ValidationInstructions::AttributeNoDuplicate, + ]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("temporal reaggregation metrics validation failed"); + } + + /// Validates that the temporal reaggregation processor preserves resource + /// attributes when passing log signals through. The static signal generator + /// sets `service.name` to `"load-generator"` on all resources, and this + /// must be present in the output. + #[test] + fn validation_temporal_reaggregation_resource_preservation() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/temporal-reaggregation-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::logs() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ValidationInstructions::AttributeRequireKeyValue { + domains: vec![AttributeDomain::Resource], + pairs: vec![KeyValue::new( + "service.name".into(), + AnyValue::String("load-generator".into()), + )], + }]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("temporal reaggregation resource preservation validation failed"); + } + + /// Validates that the temporal reaggregation processor preserves data point + /// attributes through the aggregation and flush cycle. Static metrics have + /// `http.method` and `http.route` on every data point, and these must be + /// present in the flushed output after deduplication. + #[test] + fn validation_temporal_reaggregation_metrics_attribute_preservation() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/temporal-reaggregation-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::metrics() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ValidationInstructions::AttributeRequireKeyValue { + domains: vec![AttributeDomain::Signal], + pairs: vec![ + KeyValue::new("http.method".into(), AnyValue::String("GET".into())), + KeyValue::new("http.route".into(), AnyValue::String("/api".into())), + ], + }]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("temporal reaggregation metrics attribute preservation validation failed"); + } + + /// Validates that the temporal reaggregation processor preserves resource + /// attributes through the aggregation and flush cycle for metric signals. + /// Unlike `resource_preservation` which tests the passthrough path (logs), + /// this test exercises the aggregation path where metrics are buffered, + /// rebuilt by the builder, and flushed. + #[test] + fn validation_temporal_reaggregation_metrics_resource_attributes() { + Scenario::new() + .pipeline( + Pipeline::from_file("./validation_pipelines/temporal-reaggregation-processor.yaml") + .expect("failed to read pipeline yaml"), + ) + .add_generator( + "traffic_gen", + Generator::metrics() + .fixed_count(500) + .otlp_grpc("receiver") + .core_range(1, 1) + .static_signals(), + ) + .add_capture( + "validate", + Capture::default() + .otap_grpc("exporter") + .validate(vec![ValidationInstructions::AttributeRequireKeyValue { + domains: vec![AttributeDomain::Resource], + pairs: vec![KeyValue::new( + "service.name".into(), + AnyValue::String("load-generator".into()), + )], + }]) + .control_streams(["traffic_gen"]) + .core_range(2, 2), + ) + .run() + .expect("temporal reaggregation metrics resource attributes validation failed"); + } + #[test] fn validation_multiple_input_output() { Scenario::new() diff --git a/rust/otap-dataflow/crates/validation/validation_pipelines/log-sampling-full-passthrough-processor.yaml b/rust/otap-dataflow/crates/validation/validation_pipelines/log-sampling-full-passthrough-processor.yaml new file mode 100644 index 0000000000..33a5ad1716 --- /dev/null +++ b/rust/otap-dataflow/crates/validation/validation_pipelines/log-sampling-full-passthrough-processor.yaml @@ -0,0 +1,26 @@ +nodes: + receiver: + type: "urn:otel:receiver:otlp" + config: + protocols: + grpc: + listening_addr: 127.0.0.1:4317 + ratio_sampler: + type: "urn:otel:processor:log_sampling" + config: + policy: + ratio: + emit: 1 + out_of: 1 + exporter: + type: "urn:otel:exporter:otap" + config: + grpc_endpoint: http://127.0.0.1:4318 + compression_method: none + arrow: + payload_compression: none +connections: + - from: receiver + to: ratio_sampler + - from: ratio_sampler + to: exporter diff --git a/rust/otap-dataflow/crates/validation/validation_pipelines/log-sampling-ratio-processor.yaml b/rust/otap-dataflow/crates/validation/validation_pipelines/log-sampling-ratio-processor.yaml new file mode 100644 index 0000000000..eeaf6d0697 --- /dev/null +++ b/rust/otap-dataflow/crates/validation/validation_pipelines/log-sampling-ratio-processor.yaml @@ -0,0 +1,26 @@ +nodes: + receiver: + type: "urn:otel:receiver:otlp" + config: + protocols: + grpc: + listening_addr: 127.0.0.1:4317 + ratio_sampler: + type: "urn:otel:processor:log_sampling" + config: + policy: + ratio: + emit: 1 + out_of: 10 + exporter: + type: "urn:otel:exporter:otap" + config: + grpc_endpoint: http://127.0.0.1:4318 + compression_method: none + arrow: + payload_compression: none +connections: + - from: receiver + to: ratio_sampler + - from: ratio_sampler + to: exporter diff --git a/rust/otap-dataflow/crates/validation/validation_pipelines/log-sampling-zip-processor.yaml b/rust/otap-dataflow/crates/validation/validation_pipelines/log-sampling-zip-processor.yaml new file mode 100644 index 0000000000..00a6c09113 --- /dev/null +++ b/rust/otap-dataflow/crates/validation/validation_pipelines/log-sampling-zip-processor.yaml @@ -0,0 +1,26 @@ +nodes: + receiver: + type: "urn:otel:receiver:otlp" + config: + protocols: + grpc: + listening_addr: 127.0.0.1:4317 + zip_sampler: + type: "urn:otel:processor:log_sampling" + config: + policy: + zip: + interval: 60s + max_items: 50 + exporter: + type: "urn:otel:exporter:otap" + config: + grpc_endpoint: http://127.0.0.1:4318 + compression_method: none + arrow: + payload_compression: none +connections: + - from: receiver + to: zip_sampler + - from: zip_sampler + to: exporter diff --git a/rust/otap-dataflow/crates/validation/validation_pipelines/temporal-reaggregation-processor.yaml b/rust/otap-dataflow/crates/validation/validation_pipelines/temporal-reaggregation-processor.yaml new file mode 100644 index 0000000000..43a09d42e7 --- /dev/null +++ b/rust/otap-dataflow/crates/validation/validation_pipelines/temporal-reaggregation-processor.yaml @@ -0,0 +1,23 @@ +nodes: + receiver: + type: "urn:otel:receiver:otlp" + config: + protocols: + grpc: + listening_addr: 127.0.0.1:4317 + reaggregator: + type: "urn:otel:processor:temporal_reaggregation" + config: + period: 1s + exporter: + type: "urn:otel:exporter:otap" + config: + grpc_endpoint: http://127.0.0.1:4318 + compression_method: none + arrow: + payload_compression: none +connections: + - from: receiver + to: reaggregator + - from: reaggregator + to: exporter diff --git a/rust/otap-dataflow/crates/validation/validation_pipelines/transform-kql-where-eq-processor.yaml b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-kql-where-eq-processor.yaml new file mode 100644 index 0000000000..5f2da407c6 --- /dev/null +++ b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-kql-where-eq-processor.yaml @@ -0,0 +1,23 @@ +nodes: + receiver: + type: "urn:otel:receiver:otlp" + config: + protocols: + grpc: + listening_addr: 127.0.0.1:4317 + transform: + type: "urn:otel:processor:transform" + config: + kql_query: 'logs | where severity_text == "ERROR"' + exporter: + type: "urn:otel:exporter:otap" + config: + grpc_endpoint: http://127.0.0.1:4318 + compression_method: none + arrow: + payload_compression: none +connections: + - from: receiver + to: transform + - from: transform + to: exporter diff --git a/rust/otap-dataflow/crates/validation/validation_pipelines/transform-kql-where-neq-processor.yaml b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-kql-where-neq-processor.yaml new file mode 100644 index 0000000000..60881bef50 --- /dev/null +++ b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-kql-where-neq-processor.yaml @@ -0,0 +1,23 @@ +nodes: + receiver: + type: "urn:otel:receiver:otlp" + config: + protocols: + grpc: + listening_addr: 127.0.0.1:4317 + transform: + type: "urn:otel:processor:transform" + config: + kql_query: 'logs | where severity_text != "INFO"' + exporter: + type: "urn:otel:exporter:otap" + config: + grpc_endpoint: http://127.0.0.1:4318 + compression_method: none + arrow: + payload_compression: none +connections: + - from: receiver + to: transform + - from: transform + to: exporter diff --git a/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-chained-set-exclude-processor.yaml b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-chained-set-exclude-processor.yaml new file mode 100644 index 0000000000..40f7dada10 --- /dev/null +++ b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-chained-set-exclude-processor.yaml @@ -0,0 +1,23 @@ +nodes: + receiver: + type: "urn:otel:receiver:otlp" + config: + protocols: + grpc: + listening_addr: 127.0.0.1:4317 + transform: + type: "urn:otel:processor:transform" + config: + opl_query: 'logs | set attributes["processed"] = "yes" | exclude attributes["thread.id"]' + exporter: + type: "urn:otel:exporter:otap" + config: + grpc_endpoint: http://127.0.0.1:4318 + compression_method: none + arrow: + payload_compression: none +connections: + - from: receiver + to: transform + - from: transform + to: exporter diff --git a/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-conditional-set-processor.yaml b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-conditional-set-processor.yaml new file mode 100644 index 0000000000..f1fb8fedb7 --- /dev/null +++ b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-conditional-set-processor.yaml @@ -0,0 +1,23 @@ +nodes: + receiver: + type: "urn:otel:receiver:otlp" + config: + protocols: + grpc: + listening_addr: 127.0.0.1:4317 + transform: + type: "urn:otel:processor:transform" + config: + opl_query: 'logs | if (severity_text == "ERROR") { set attributes["is_error"] = "true" } else { set attributes["is_error"] = "false" }' + exporter: + type: "urn:otel:exporter:otap" + config: + grpc_endpoint: http://127.0.0.1:4318 + compression_method: none + arrow: + payload_compression: none +connections: + - from: receiver + to: transform + - from: transform + to: exporter diff --git a/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-exclude-attribute-processor.yaml b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-exclude-attribute-processor.yaml new file mode 100644 index 0000000000..4efa055f1f --- /dev/null +++ b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-exclude-attribute-processor.yaml @@ -0,0 +1,23 @@ +nodes: + receiver: + type: "urn:otel:receiver:otlp" + config: + protocols: + grpc: + listening_addr: 127.0.0.1:4317 + transform: + type: "urn:otel:processor:transform" + config: + opl_query: 'logs | exclude attributes["thread.id"]' + exporter: + type: "urn:otel:exporter:otap" + config: + grpc_endpoint: http://127.0.0.1:4318 + compression_method: none + arrow: + payload_compression: none +connections: + - from: receiver + to: transform + - from: transform + to: exporter diff --git a/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-rename-attribute-processor.yaml b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-rename-attribute-processor.yaml new file mode 100644 index 0000000000..65cbc3adf1 --- /dev/null +++ b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-rename-attribute-processor.yaml @@ -0,0 +1,23 @@ +nodes: + receiver: + type: "urn:otel:receiver:otlp" + config: + protocols: + grpc: + listening_addr: 127.0.0.1:4317 + transform: + type: "urn:otel:processor:transform" + config: + opl_query: 'logs | rename attributes["new_thread_id"] = attributes["thread.id"]' + exporter: + type: "urn:otel:exporter:otap" + config: + grpc_endpoint: http://127.0.0.1:4318 + compression_method: none + arrow: + payload_compression: none +connections: + - from: receiver + to: transform + - from: transform + to: exporter diff --git a/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-set-attribute-processor.yaml b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-set-attribute-processor.yaml new file mode 100644 index 0000000000..e74f9b1800 --- /dev/null +++ b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-set-attribute-processor.yaml @@ -0,0 +1,23 @@ +nodes: + receiver: + type: "urn:otel:receiver:otlp" + config: + protocols: + grpc: + listening_addr: 127.0.0.1:4317 + transform: + type: "urn:otel:processor:transform" + config: + opl_query: 'logs | set attributes["processed_by"] = "transform"' + exporter: + type: "urn:otel:exporter:otap" + config: + grpc_endpoint: http://127.0.0.1:4318 + compression_method: none + arrow: + payload_compression: none +connections: + - from: receiver + to: transform + - from: transform + to: exporter diff --git a/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-set-resource-attribute-processor.yaml b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-set-resource-attribute-processor.yaml new file mode 100644 index 0000000000..5dbeedcbb7 --- /dev/null +++ b/rust/otap-dataflow/crates/validation/validation_pipelines/transform-opl-set-resource-attribute-processor.yaml @@ -0,0 +1,23 @@ +nodes: + receiver: + type: "urn:otel:receiver:otlp" + config: + protocols: + grpc: + listening_addr: 127.0.0.1:4317 + transform: + type: "urn:otel:processor:transform" + config: + opl_query: 'logs | set resource.attributes["env"] = "test"' + exporter: + type: "urn:otel:exporter:otap" + config: + grpc_endpoint: http://127.0.0.1:4318 + compression_method: none + arrow: + payload_compression: none +connections: + - from: receiver + to: transform + - from: transform + to: exporter