Skip to content

Commit b1a0d30

Browse files
committed
Generate OTel metrics with context limits
This commit allows users to configure an otel metrics payload that has a fixed number of 'contexts' to reuse the term from dogstatsd. This is done by setting a lot of fiddly knobs and the user is responsible for keeping track of how many unique instances are possible. This commit will be chased with some clean-up as I left a lot of internal duplication in the code. I am also unsure if this generates sensible load run against a collector, just that it obeys the protocol by construction. REF SMPTNG-659 Signed-off-by: Brian L. Troutwine <brian.troutwine@datadoghq.com>
1 parent b17ae82 commit b1a0d30

File tree

1 file changed

+188
-50
lines changed

1 file changed

+188
-50
lines changed

lading_payload/src/opentelemetry_metric.rs

Lines changed: 188 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
// Each `Metric` is associated with a `Resource` -- called `ResourceMetrics` in
2121
// the proto -- which defines the origination point of the `Metric`. Interior to
2222
// the `Resource` is the `Scope` -- called `ScopeMetrics` in the proto -- which
23-
// defines the library/smaller-than-resource scope that generated the `Metric`.
23+
// defines the library/smaller-than-resource scope that generated the
24+
// `Metric`. We will not set `Scope` until we find out that it's important to do
25+
// so for modeling purposes.
2426
//
2527
// The data types are as follows:
2628
//
@@ -39,7 +41,11 @@
3941
use std::io::Write;
4042

4143
use crate::{Error, Generator, common::config::ConfRange, common::strings};
42-
use opentelemetry_proto::tonic::metrics::v1;
44+
use opentelemetry_proto::tonic::{
45+
common::v1::{AnyValue, InstrumentationScope, KeyValue, any_value},
46+
metrics::v1,
47+
resource,
48+
};
4349
use prost::Message;
4450
use rand::{
4551
Rng,
@@ -55,19 +61,25 @@ use serde::{Deserialize, Serialize as SerdeSerialize};
5561
/// OpenTelemetry metric contexts
5662
pub struct Contexts {
5763
/// The range of attributes for resources.
58-
pub resources_tags: ConfRange<u32>,
64+
pub attributes_per_resource: ConfRange<u32>,
65+
/// TODO
66+
pub scopes_per_resource: ConfRange<u32>,
67+
/// TODO
68+
pub attributes_per_scope: ConfRange<u32>,
69+
/// The range of attributes for scopes per resource.
70+
pub metrics_per_scope: ConfRange<u32>,
5971
/// The range of attributes for scopes per resource.
60-
pub scopes_tags: ConfRange<u32>,
61-
/// The range of metric names per scope/resource.
62-
pub metric_names: ConfRange<u32>,
72+
pub attributes_per_metric: ConfRange<u32>,
6373
}
6474

6575
impl Default for Contexts {
6676
fn default() -> Self {
6777
Self {
68-
resources_tags: ConfRange::Constant(20),
69-
scopes_tags: ConfRange::Constant(5),
70-
metric_names: ConfRange::Constant(10),
78+
attributes_per_resource: ConfRange::Inclusive { min: 1, max: 20 },
79+
scopes_per_resource: ConfRange::Inclusive { min: 1, max: 20 },
80+
attributes_per_scope: ConfRange::Constant(0),
81+
metrics_per_scope: ConfRange::Inclusive { min: 0, max: 20 },
82+
attributes_per_metric: ConfRange::Inclusive { min: 0, max: 10 },
7183
}
7284
}
7385
}
@@ -115,50 +127,31 @@ impl Config {
115127
}
116128
}
117129

118-
/// A OTLP metric.
119-
#[derive(Debug)]
120-
pub struct Metric(v1::Metric);
121-
122-
struct NumberDataPoint(v1::NumberDataPoint);
123-
struct Gauge(v1::Gauge);
124-
struct Sum(v1::Sum);
125-
126130
// Okay if you think about it OTel Metrics are in a tree. That tree is rooted at
127131
// the Resource, below the resources are Scope which contain Metrics. If we want
128132
// to have a bounded amount of contexts we need some way of counting how many
129133
// we've made where a context is the Resources X Scopes X Metric Names and so
130134
// the Resource is the top generator etc.
131135

132-
/// Wrapper to generate arbitrary OpenTelemetry [`ExportMetricsServiceRequests`](opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest)
133-
struct ExportMetricsServiceRequest(Vec<Metric>);
134-
135-
impl ExportMetricsServiceRequest {
136-
fn into_prost_type(
137-
self,
138-
) -> opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest {
139-
opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest {
140-
resource_metrics: vec![v1::ResourceMetrics {
141-
resource: None,
142-
scope_metrics: vec![v1::ScopeMetrics {
143-
scope: None,
144-
metrics: self.0.into_iter().map(|metric| metric.0).collect(),
145-
schema_url: String::new(),
146-
}],
147-
schema_url: String::new(),
148-
}],
149-
}
150-
}
151-
}
152-
153136
#[derive(Debug, Clone)]
154-
pub(crate) struct ResourceGenerator {}
137+
pub(crate) struct ResourceGenerator {
138+
str_pool: strings::Pool,
139+
attributes_per_resource: ConfRange<u32>,
140+
scopes_per_resource: ConfRange<u32>,
141+
scope_generator: ScopeGenerator,
142+
}
155143

156144
impl ResourceGenerator {
157145
pub(crate) fn new<R>(config: Config, rng: &mut R) -> Result<Self, Error>
158146
where
159147
R: Rng + ?Sized,
160148
{
161-
Ok(Self {})
149+
Ok(Self {
150+
str_pool: strings::Pool::with_size(rng, 1_000_000),
151+
attributes_per_resource: config.contexts.attributes_per_resource,
152+
scopes_per_resource: config.contexts.scopes_per_resource,
153+
scope_generator: ScopeGenerator::new(config, rng)?,
154+
})
162155
}
163156
}
164157

@@ -170,19 +163,77 @@ impl<'a> Generator<'a> for ResourceGenerator {
170163
where
171164
R: rand::Rng + ?Sized,
172165
{
173-
unimplemented!()
166+
let unknown_resource = self.attributes_per_resource.start() == 0;
167+
// If the range of resources admits 0 the field `unknown_resources` will
168+
// be set and we give half-odds on there be no `resource` field set in
169+
// the message.
170+
let resource: Option<resource::v1::Resource> = if !unknown_resource && rng.random_bool(0.5)
171+
{
172+
// We do not make a k/v pair where the v is unset.
173+
let total_kv_pairs = self.attributes_per_resource.sample(rng);
174+
let mut kvs = Vec::with_capacity(total_kv_pairs as usize);
175+
for _ in 0..total_kv_pairs {
176+
let key = self
177+
.str_pool
178+
.of_size_range(&mut rng, 1_u8..16)
179+
.ok_or(Error::StringGenerate)?;
180+
let val = self
181+
.str_pool
182+
.of_size_range(&mut rng, 1_u8..16)
183+
.ok_or(Error::StringGenerate)?;
184+
185+
kvs.push(KeyValue {
186+
key: String::from(key),
187+
value: Some(AnyValue {
188+
value: Some(any_value::Value::StringValue(String::from(val))),
189+
}),
190+
});
191+
}
192+
let res = resource::v1::Resource {
193+
attributes: kvs,
194+
dropped_attributes_count: 0,
195+
};
196+
Some(res)
197+
} else {
198+
None
199+
};
200+
201+
let total_scopes = self.scopes_per_resource.sample(rng);
202+
let mut scope_metrics = Vec::with_capacity(total_scopes as usize);
203+
for _ in 0..total_scopes {
204+
scope_metrics.push(self.scope_generator.generate(rng)?);
205+
}
206+
207+
Ok(v1::ResourceMetrics {
208+
resource,
209+
scope_metrics,
210+
schema_url: String::new(),
211+
})
174212
}
175213
}
176214

215+
// NOTE the scope generator today is a pass-through, existing really only to
216+
// contain the metric generator. Left to leave the machinery in place for if we
217+
// want to start generating scopes.
177218
#[derive(Debug, Clone)]
178-
pub(crate) struct ScopeGenerator {}
219+
pub(crate) struct ScopeGenerator {
220+
str_pool: strings::Pool,
221+
metrics_per_scope: ConfRange<u32>,
222+
attributes_per_scope: ConfRange<u32>,
223+
metric_generator: MetricGenerator,
224+
}
179225

180226
impl ScopeGenerator {
181227
pub(crate) fn new<R>(config: Config, rng: &mut R) -> Result<Self, Error>
182228
where
183229
R: Rng + ?Sized,
184230
{
185-
Ok(Self {})
231+
Ok(Self {
232+
str_pool: strings::Pool::with_size(rng, 1_000_000),
233+
attributes_per_scope: config.contexts.attributes_per_scope,
234+
metrics_per_scope: config.contexts.metrics_per_scope,
235+
metric_generator: MetricGenerator::new(config, rng)?,
236+
})
186237
}
187238
}
188239

@@ -194,14 +245,60 @@ impl<'a> Generator<'a> for ScopeGenerator {
194245
where
195246
R: rand::Rng + ?Sized,
196247
{
197-
unimplemented!()
248+
let total_metrics = self.metrics_per_scope.sample(rng);
249+
let mut metrics = Vec::with_capacity(total_metrics as usize);
250+
for _ in 0..total_metrics {
251+
metrics.push(self.metric_generator.generate(rng)?);
252+
}
253+
254+
let instrumentation_scope: InstrumentationScope = {
255+
// We do not make a k/v pair where the v is unset.
256+
let total_kv_pairs = self.attributes_per_scope.sample(rng);
257+
let mut kvs = Vec::with_capacity(total_kv_pairs as usize);
258+
for _ in 0..total_kv_pairs {
259+
let key = self
260+
.str_pool
261+
.of_size_range(&mut rng, 1_u8..16)
262+
.ok_or(Error::StringGenerate)?;
263+
let val = self
264+
.str_pool
265+
.of_size_range(&mut rng, 1_u8..16)
266+
.ok_or(Error::StringGenerate)?;
267+
268+
kvs.push(KeyValue {
269+
key: String::from(key),
270+
value: Some(AnyValue {
271+
value: Some(any_value::Value::StringValue(String::from(val))),
272+
}),
273+
});
274+
}
275+
276+
let name = self
277+
.str_pool
278+
.of_size_range(&mut rng, 1_u8..16)
279+
.ok_or(Error::StringGenerate)?;
280+
281+
InstrumentationScope {
282+
name: String::from(name),
283+
version: String::new(),
284+
attributes: kvs,
285+
dropped_attributes_count: 0,
286+
}
287+
};
288+
289+
Ok(v1::ScopeMetrics {
290+
scope: Some(instrumentation_scope),
291+
metrics,
292+
schema_url: String::new(), // 0-sized alloc
293+
})
198294
}
199295
}
200296

201297
#[derive(Debug, Clone)]
202298
pub(crate) struct MetricGenerator {
203299
str_pool: strings::Pool,
204300
metric_weights: WeightedIndex<u16>,
301+
attributes_per_metric: ConfRange<u32>,
205302
}
206303

207304
impl MetricGenerator {
@@ -216,6 +313,7 @@ impl MetricGenerator {
216313
Ok(Self {
217314
str_pool: strings::Pool::with_size(rng, 1_000_000),
218315
metric_weights: WeightedIndex::new(member_choices)?,
316+
attributes_per_metric: config.contexts.attributes_per_metric,
219317
})
220318
}
221319
}
@@ -244,21 +342,49 @@ impl<'a> Generator<'a> for MetricGenerator {
244342
.str_pool
245343
.of_size_range(&mut rng, 1_u8..16)
246344
.ok_or(Error::StringGenerate)?;
345+
346+
// TODO this is not correct and must be fixed to accord with http://unitsofmeasure.org/ucum.html.
247347
let unit = self
248348
.str_pool
249349
.of_size_range(&mut rng, 1_u8..16)
250350
.ok_or(Error::StringGenerate)?;
251351

352+
// We do not make a k/v pair where the v is unset.
353+
let total_kv_pairs = self.attributes_per_metric.sample(rng);
354+
let mut kvs = Vec::with_capacity(total_kv_pairs as usize);
355+
for _ in 0..total_kv_pairs {
356+
let key = self
357+
.str_pool
358+
.of_size_range(&mut rng, 1_u8..16)
359+
.ok_or(Error::StringGenerate)?;
360+
let val = self
361+
.str_pool
362+
.of_size_range(&mut rng, 1_u8..16)
363+
.ok_or(Error::StringGenerate)?;
364+
365+
kvs.push(KeyValue {
366+
key: String::from(key),
367+
value: Some(AnyValue {
368+
value: Some(any_value::Value::StringValue(String::from(val))),
369+
}),
370+
});
371+
}
372+
252373
Ok(v1::Metric {
253374
name: String::from(name),
254375
description: String::from(description),
255376
unit: String::from(unit),
256377
data,
257-
metadata: Vec::new(),
378+
metadata: kvs,
258379
})
259380
}
260381
}
261382

383+
// Wrappers allowing us to implement Distribution
384+
struct NumberDataPoint(v1::NumberDataPoint);
385+
struct Gauge(v1::Gauge);
386+
struct Sum(v1::Sum);
387+
262388
impl Distribution<NumberDataPoint> for StandardUniform {
263389
fn sample<R>(&self, rng: &mut R) -> NumberDataPoint
264390
where
@@ -271,10 +397,16 @@ impl Distribution<NumberDataPoint> for StandardUniform {
271397
};
272398

273399
NumberDataPoint(v1::NumberDataPoint {
400+
// NOTE absent a reason to set attributes to not-empty, it's unclear
401+
// that we should.
274402
attributes: Vec::new(),
275-
start_time_unix_nano: rng.random(),
403+
start_time_unix_nano: 0, // epoch instant
276404
time_unix_nano: rng.random(),
405+
// Unclear that this needs to be set.
277406
exemplars: Vec::new(),
407+
// Equivalent to DoNotUse, the flag is ignored. If we ever set
408+
// `value` to None this must be set to
409+
// DATA_POINT_FLAGS_NO_RECORDED_VALUE_MASK
278410
flags: 0,
279411
value: Some(value),
280412
})
@@ -362,15 +494,21 @@ impl crate::Serialize for OpentelemetryMetrics {
362494
R: Rng + Sized,
363495
W: Write,
364496
{
365-
// An Export*ServiceRequest message has 5 bytes of fixed values plus
366-
// a varint-encoded message length field. The worst case for the message
367-
// length field is the max message size divided by 0b0111_1111.
497+
// Alright, what we're making here is the ExportMetricServiceRequest. It
498+
// has 5 bytes of fixed values plus a varint-encoded message length
499+
// field to it. The worst case for the message length field is the max
500+
// message size divided by 0b0111_1111.
501+
//
502+
// The user _does not_ set the number of Resoures per request -- we pack
503+
// those in until max_bytes -- but they do set the scopes per request
504+
// etc. All of that is transparent here, handled by the generators
505+
// above.
368506
let bytes_remaining = max_bytes.checked_sub(5 + max_bytes.div_ceil(0b0111_1111));
369507
let Some(mut bytes_remaining) = bytes_remaining else {
370508
return Ok(());
371509
};
372510

373-
let mut acc = Vec::with_capacity(1_024); // arbitrary constant
511+
let mut acc = Vec::with_capacity(128); // arbitrary constant
374512
loop {
375513
let resource: v1::ResourceMetrics = self.generate(&mut rng)?;
376514
let len = resource.encoded_len() + 2;

0 commit comments

Comments
 (0)