Skip to content

Commit f913bbf

Browse files
authored
[otap-dataflow] Add validation process for verifying otlp requests after encoding and decoding (open-telemetry#1696)
Add the validation process based on [https://github.com/open-telemetry/otel-arrow/blob/main/docs/validation_process.md](https://github.com/open-telemetry/otel-arrow/blob/main/docs/validation_process.md) to verify that otlp requests are equivalent after encoding and decoding.
1 parent eff6c47 commit f913bbf

2 files changed

Lines changed: 146 additions & 0 deletions

File tree

rust/otap-dataflow/crates/otap/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ mod fixtures;
7171
#[cfg(test)]
7272
pub mod testing;
7373

74+
/// validation process to verify that encoding/decoding works properly with otlp request
75+
#[cfg(test)]
76+
pub mod validation;
77+
7478
/// Signal-type router processor (OTAP-based)
7579
pub mod signal_type_router;
7680

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Validation test module to validate the encoding/decoding process for otlp messages
5+
6+
// ToDo: Add support to simulate a pipeline with various processors
7+
// ToDo: Move the validation process to it's own CICD job (outside of the tests)
8+
use otap_df_pdata::otap::{OtapArrowRecords, from_record_messages};
9+
use otap_df_pdata::proto::OtlpProtoMessage;
10+
use otap_df_pdata::testing::round_trip::{otap_to_otlp, otlp_to_otap};
11+
use otap_df_pdata::{Consumer, Producer};
12+
use weaver_common::result::WResult;
13+
use weaver_common::vdir::VirtualDirectoryPath;
14+
use weaver_forge::registry::ResolvedRegistry;
15+
use weaver_resolver::SchemaResolver;
16+
use weaver_semconv::registry::SemConvRegistry;
17+
use weaver_semconv::registry_repo::RegistryRepo;
18+
19+
/// struct to simulate the otel arrow protocol, uses a producer and consumer to encode and decode a otlp request
20+
pub struct OtelProtoSimulator {
21+
producer: Producer,
22+
consumer: Consumer,
23+
}
24+
25+
impl OtelProtoSimulator {
26+
/// Takes the Otlp request message and encodes it and decodes it via producer -> consumer
27+
pub fn simulate_proto(&mut self, proto_message: &OtlpProtoMessage) -> OtlpProtoMessage {
28+
// take otlp proto message
29+
// convert to otap arrow records which we can pass to the producer
30+
let mut otap_message = otlp_to_otap(proto_message);
31+
// convert to batch arrow records
32+
// converg batch arrow records
33+
// convert msg to proto bytes?
34+
let mut bar = self.producer.produce_bar(&mut otap_message).unwrap();
35+
let records = self.consumer.consume_bar(&mut bar).unwrap();
36+
let otap_message = match proto_message {
37+
OtlpProtoMessage::Logs(_) => OtapArrowRecords::Logs(from_record_messages(records)),
38+
OtlpProtoMessage::Metrics(_) => {
39+
OtapArrowRecords::Metrics(from_record_messages(records))
40+
}
41+
OtlpProtoMessage::Traces(_) => OtapArrowRecords::Traces(from_record_messages(records)),
42+
};
43+
otap_to_otlp(&otap_message)
44+
}
45+
46+
// ToDo: add function to simulate pipeline
47+
// if pipeline alters the data via a processor that performs some transofmration we should expect the equivalent assert to fail
48+
// otherwise the assert should succeed
49+
// pub fn simulate_pipeline(proto_message: OtlpProtoMessage) -> OtlpProtoMessage {
50+
// // todo: run a pipeline
51+
// }
52+
}
53+
54+
impl Default for OtelProtoSimulator {
55+
fn default() -> Self {
56+
Self {
57+
producer: Producer::new(),
58+
consumer: Consumer::default(),
59+
}
60+
}
61+
}
62+
63+
#[cfg(test)]
64+
mod test {
65+
use super::*;
66+
use crate::fake_data_generator::fake_signal::{
67+
fake_otlp_logs, fake_otlp_metrics, fake_otlp_traces,
68+
};
69+
use otap_df_pdata::testing::equiv::assert_equivalent;
70+
71+
const LOG_SIGNAL_COUNT: usize = 100;
72+
const METRIC_SIGNAL_COUNT: usize = 100;
73+
const TRACE_SIGNAL_COUNT: usize = 100;
74+
const ITERATIONS: usize = 10;
75+
76+
fn get_registry() -> ResolvedRegistry {
77+
let registry_repo = RegistryRepo::try_new(
78+
"main",
79+
&VirtualDirectoryPath::GitRepo {
80+
url: "https://github.com/open-telemetry/semantic-conventions.git".to_owned(),
81+
sub_folder: Some("model".to_owned()),
82+
refspec: None,
83+
},
84+
)
85+
.expect("all registries are definied under the model folder in semantic convention repo");
86+
87+
// Load the semantic convention specs
88+
let semconv_specs = match SchemaResolver::load_semconv_specs(&registry_repo, true, false) {
89+
WResult::Ok(semconv_specs) => semconv_specs,
90+
WResult::OkWithNFEs(semconv_specs, _) => semconv_specs,
91+
WResult::FatalErr(_err) => {
92+
panic!("Failed to load semantic convention specs");
93+
}
94+
};
95+
96+
// Resolve the main registry
97+
let mut registry = SemConvRegistry::from_semconv_specs(&registry_repo, semconv_specs)
98+
.expect("Can resolve the registries defined in semantic convention repo");
99+
// Resolve the semantic convention specifications.
100+
// If there are any resolution errors, they should be captured into the ongoing list of
101+
// diagnostic messages and returned immediately because there is no point in continuing
102+
// as the resolution is a prerequisite for the next stages.
103+
let resolved_schema =
104+
match SchemaResolver::resolve_semantic_convention_registry(&mut registry, true) {
105+
WResult::Ok(resolved_schema) => resolved_schema,
106+
WResult::OkWithNFEs(resolved_schema, _) => resolved_schema,
107+
WResult::FatalErr(_err) => {
108+
panic!("Failed to resolve semantic convetion schema");
109+
}
110+
};
111+
112+
ResolvedRegistry::try_from_resolved_registry(
113+
&resolved_schema.registry,
114+
resolved_schema.catalog(),
115+
)
116+
.expect("can get resolved registry from official semantic convention repo")
117+
}
118+
119+
// validate the encoding and decoding
120+
#[test]
121+
fn validate_encode_decode() {
122+
let mut otel_proto_simulator = OtelProtoSimulator::default();
123+
124+
let registry = get_registry();
125+
126+
for _ in 0..ITERATIONS {
127+
// generate data and simulate the protocol and compare result
128+
let logs = OtlpProtoMessage::Logs(fake_otlp_logs(LOG_SIGNAL_COUNT, &registry));
129+
let logs_output = otel_proto_simulator.simulate_proto(&logs);
130+
assert_equivalent(&[logs], &[logs_output]);
131+
132+
let metrics =
133+
OtlpProtoMessage::Metrics(fake_otlp_metrics(METRIC_SIGNAL_COUNT, &registry));
134+
let metrics_output = otel_proto_simulator.simulate_proto(&metrics);
135+
assert_equivalent(&[metrics], &[metrics_output]);
136+
137+
let traces = OtlpProtoMessage::Traces(fake_otlp_traces(TRACE_SIGNAL_COUNT, &registry));
138+
let traces_output = otel_proto_simulator.simulate_proto(&traces);
139+
assert_equivalent(&[traces], &[traces_output]);
140+
}
141+
}
142+
}

0 commit comments

Comments
 (0)