Skip to content

Commit 6138689

Browse files
authored
Introduce data_points_transmitted / second in OTel metrics HTTP, GRPC (#1357)
### What does this PR do? This commit implements a new piece of telemetry: data points transmitted per second. This is the equivalent of bytes per second but, when applicable, in terms of the metric (or other) "data points" transmitted per second. This approach should be immediately applicable to DogStatsD. This further opens the door to other block-based metadata. ### Motivation REF SMPTNG-659
1 parent 0fa26e7 commit 6138689

File tree

6 files changed

+98
-8
lines changed

6 files changed

+98
-8
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
- Block cache generation now logs its number of rejected, success blocks once
1414
the cache is created.
1515
- Added configurable metric kind weights in OpenTelemetry metrics payload generator.
16+
- Added new telemetry to OpenTelemetry metrics generation: data points per
17+
second. This opens the door for doing the same for other payloads.
1618

1719
## [0.25.9]
1820
## Added

lading/src/generator/grpc.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
//! `bytes_written`: Total bytes written
99
//! `response_bytes`: Total bytes received
1010
//! `bytes_per_second`: Configured rate to send data
11+
//! `data_points_transmitted`: Total data points transmitted (for OpenTelemetry metrics)
1112
//!
1213
//! Additional metrics may be emitted by this generator's [throttle].
1314
//!
@@ -290,6 +291,9 @@ impl Grpc {
290291
match res {
291292
Ok(res) => {
292293
counter!("bytes_written", &self.metric_labels).increment(block_length as u64);
294+
if let Some(data_points) = blk.metadata.data_points {
295+
counter!("data_points_transmitted", &self.metric_labels).increment(data_points);
296+
}
293297
counter!("request_ok", &self.metric_labels).increment(1);
294298
counter!("response_bytes", &self.metric_labels).increment(res.into_inner() as u64);
295299
}

lading/src/generator/http.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
//! `request_failure`: Failed requests
88
//! `bytes_written`: Total bytes written
99
//! `bytes_per_second`: Configured rate to send data
10+
//! `data_points_transmitted`: Total data points transmitted (for OpenTelemetry metrics)
1011
//!
1112
//! Additional metrics may be emitted by this generator's [throttle].
1213
//!
@@ -233,12 +234,18 @@ impl Http {
233234
let client = client.clone();
234235
let labels = labels.clone();
235236

237+
let data_points = blk.metadata.data_points;
236238
let permit = CONNECTION_SEMAPHORE.get().expect("Connection Semaphore is being initialized or cell is empty").acquire().await.expect("Connection Semaphore has already closed");
237239
tokio::spawn(async move {
238240
counter!("requests_sent", &labels).increment(1);
239241
match client.request(request).await {
240242
Ok(response) => {
241243
counter!("bytes_written", &labels).increment(block_length as u64);
244+
245+
if let Some(dp) = data_points {
246+
counter!("data_points_transmitted", &labels).increment(dp);
247+
}
248+
242249
let status = response.status();
243250
let mut status_labels = labels.clone();
244251
status_labels

lading_payload/src/block.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,15 @@ pub struct Block {
9090
pub total_bytes: NonZeroU32,
9191
/// The bytes of this block.
9292
pub bytes: Bytes,
93+
/// Optional metadata for the block
94+
pub metadata: BlockMetadata,
95+
}
96+
97+
/// Metadata associated with a Block
98+
#[derive(Debug, Clone, Default, Copy)]
99+
pub struct BlockMetadata {
100+
/// Number of data points in this block
101+
pub data_points: Option<u64>,
93102
}
94103

95104
/// Errors for the construction of the block cache
@@ -108,6 +117,7 @@ impl<'a> arbitrary::Arbitrary<'a> for Block {
108117
Ok(Self {
109118
total_bytes: NonZeroU32::new(total_bytes).expect("total_bytes must be non-zero"),
110119
bytes,
120+
metadata: BlockMetadata::default(),
111121
})
112122
}
113123
}
@@ -332,6 +342,7 @@ impl Cache {
332342
let mut pyld = crate::OpentelemetryMetrics::new(*config, &mut rng)?;
333343
let span = span!(Level::INFO, "fixed", payload = "otel-metrics");
334344
let _guard = span.enter();
345+
335346
construct_block_cache_inner(rng, &mut pyld, maximum_block_bytes, total_bytes.get())?
336347
}
337348
};
@@ -585,9 +596,21 @@ where
585596
let min_actual_block_str = Byte::from_u64(min_actual_block_size.into())
586597
.get_appropriate_unit(byte_unit::UnitType::Binary)
587598
.to_string();
588-
info!(
589-
"Filled {filled_sum_str} of requested {capacity_sum_str}. Discovered minimum block size of {min_actual_block_str}, maximum: {max_actual_block_str}. Total success blocks: {success_block_sizes}. Total rejected blocks: {rejected_block_sizes}."
590-
);
599+
600+
let total_data_points: u64 = block_cache
601+
.iter()
602+
.filter_map(|b| b.metadata.data_points)
603+
.sum();
604+
605+
if total_data_points > 0 {
606+
info!(
607+
"Filled {filled_sum_str} of requested {capacity_sum_str}. Discovered minimum block size of {min_actual_block_str}, maximum: {max_actual_block_str}. Total success blocks: {success_block_sizes}. Total rejected blocks: {rejected_block_sizes}. Total data points: {total_data_points}."
608+
);
609+
} else {
610+
info!(
611+
"Filled {filled_sum_str} of requested {capacity_sum_str}. Discovered minimum block size of {min_actual_block_str}, maximum: {max_actual_block_str}. Total success blocks: {success_block_sizes}. Total rejected blocks: {rejected_block_sizes}."
612+
);
613+
}
591614

592615
Ok(block_cache)
593616
}
@@ -626,6 +649,16 @@ where
626649
.expect("failed to get length of bytes"),
627650
)
628651
.ok_or(SpinError::Zero)?;
629-
Ok(Block { total_bytes, bytes })
652+
653+
let mut metadata = BlockMetadata::default();
654+
if let Some(data_points) = serializer.data_points_generated() {
655+
metadata.data_points = Some(data_points);
656+
}
657+
658+
Ok(Block {
659+
total_bytes,
660+
bytes,
661+
metadata,
662+
})
630663
}
631664
}

lading_payload/src/lib.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,18 @@ pub trait Serialize {
104104
where
105105
R: Rng + Sized,
106106
W: Write;
107+
108+
/// Reports data points count for the most recently generated content.
109+
///
110+
/// IMPORTANT: This method should be called immediately after `to_bytes` to
111+
/// get accurate counts for the most recently generated block. The
112+
/// information WILL be overwritten by subsequent calls to `to_bytes`.
113+
///
114+
/// If this function returns None the serialize does not support tracking
115+
/// data points.
116+
fn data_points_generated(&self) -> Option<u64> {
117+
None
118+
}
107119
}
108120

109121
/// Sub-configuration for `TraceAgent` format
@@ -201,6 +213,14 @@ impl Serialize for Payload {
201213
Payload::TraceAgent(ser) => ser.to_bytes(rng, max_bytes, writer),
202214
}
203215
}
216+
217+
fn data_points_generated(&self) -> Option<u64> {
218+
match self {
219+
Payload::OtelMetrics(ser) => ser.data_points_generated(),
220+
// Other implementations use the default None
221+
_ => None,
222+
}
223+
}
204224
}
205225

206226
/// Generate instances of `Self::Output` from source of randomness.

lading_payload/src/opentelemetry_metric.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -260,16 +260,19 @@ impl Config {
260260
#[derive(Debug, Clone)]
261261
/// OTLP metric payload
262262
pub struct OpentelemetryMetrics {
263-
/// Template pool for metric generation
264263
pool: Pool,
265-
/// Scratch buffer for serialization
266264
scratch: RefCell<BytesMut>,
267265
/// Current tick count for monotonic timing (starts at 0)
268266
tick: u64,
269267
/// Accumulating sum increment, floating point
270268
incr_f: f64,
271269
/// Accumulating sum increment, integer
272270
incr_i: i64,
271+
/// Number of data points in the most recent `ResourceMetrics` (set by
272+
/// `generate`).
273+
data_points_per_resource: u64,
274+
/// Number of data points in the most recent block (set by `to_bytes`).
275+
data_points_per_block: u64,
273276
}
274277

275278
impl OpentelemetryMetrics {
@@ -293,6 +296,8 @@ impl OpentelemetryMetrics {
293296
tick: 0,
294297
incr_f: 0.0,
295298
incr_i: 0,
299+
data_points_per_resource: 0,
300+
data_points_per_block: 0,
296301
})
297302
}
298303
}
@@ -303,8 +308,10 @@ impl<'a> SizedGenerator<'a> for OpentelemetryMetrics {
303308

304309
/// Generate OTLP metrics with the following enhancements:
305310
///
306-
/// * Monotonic sums are truly monotonic, incrementing by a random amount each tick
307-
/// * Timestamps advance monotonically based on internal tick counter starting at epoch
311+
/// * Monotonic sums are truly monotonic, incrementing by a random amount
312+
/// each tick
313+
/// * Timestamps advance monotonically based on internal tick counter
314+
/// starting at epoch
308315
/// * Each call advances the tick counter by a random amount (1-60)
309316
fn generate<R>(&'a mut self, rng: &mut R, budget: &mut usize) -> Result<Self::Output, Error>
310317
where
@@ -326,11 +333,14 @@ impl<'a> SizedGenerator<'a> for OpentelemetryMetrics {
326333
// Update data points in each metric. For gauges we use random values,
327334
// for accumulating sums we increment by a fixed amount per tick.
328335
// All timestamps are updated based on the current tick.
336+
let mut data_points_count = 0;
337+
329338
for scope_metrics in &mut tpl.scope_metrics {
330339
for metric in &mut scope_metrics.metrics {
331340
if let Some(data) = &mut metric.data {
332341
match data {
333342
Data::Gauge(gauge) => {
343+
data_points_count += gauge.data_points.len() as u64;
334344
for point in &mut gauge.data_points {
335345
point.time_unix_nano = self.tick * TIME_INCREMENT_NANOS;
336346
if let Some(value) = &mut point.value {
@@ -346,6 +356,7 @@ impl<'a> SizedGenerator<'a> for OpentelemetryMetrics {
346356
}
347357
}
348358
Data::Sum(sum) => {
359+
data_points_count += sum.data_points.len() as u64;
349360
let is_accumulating = sum.is_monotonic;
350361
for point in &mut sum.data_points {
351362
point.time_unix_nano = self.tick * TIME_INCREMENT_NANOS;
@@ -385,6 +396,8 @@ impl<'a> SizedGenerator<'a> for OpentelemetryMetrics {
385396
}
386397
}
387398

399+
self.data_points_per_resource = data_points_count;
400+
388401
Ok(tpl)
389402
}
390403
}
@@ -403,10 +416,14 @@ impl crate::Serialize for OpentelemetryMetrics {
403416
resource_metrics: Vec::with_capacity(8),
404417
};
405418

419+
let mut total_data_points = 0;
406420
let loop_id: u32 = rng.random();
421+
407422
while bytes_remaining >= SMALLEST_PROTOBUF {
408423
if let Ok(rm) = self.generate(&mut rng, &mut bytes_remaining) {
424+
total_data_points += self.data_points_per_resource;
409425
request.resource_metrics.push(rm);
426+
410427
let required_bytes = request.encoded_len();
411428
debug!(
412429
?loop_id,
@@ -417,6 +434,7 @@ impl crate::Serialize for OpentelemetryMetrics {
417434
"to_bytes inner loop"
418435
);
419436
if required_bytes > max_bytes {
437+
total_data_points -= self.data_points_per_resource;
420438
drop(request.resource_metrics.pop());
421439
break;
422440
}
@@ -443,8 +461,14 @@ impl crate::Serialize for OpentelemetryMetrics {
443461
writer.write_all(&buf)?;
444462
}
445463

464+
self.data_points_per_block = total_data_points;
465+
446466
Ok(())
447467
}
468+
469+
fn data_points_generated(&self) -> Option<u64> {
470+
Some(self.data_points_per_block)
471+
}
448472
}
449473

450474
#[cfg(test)]

0 commit comments

Comments
 (0)