diff --git a/Cargo.lock b/Cargo.lock index b94cd833d..f32c49f26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1629,6 +1629,26 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" +[[package]] +name = "jemalloc-sys" +version = "0.5.4+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac6c1946e1cea1788cbfde01c993b52a10e2da07f4bac608228d1bed20bfebf2" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "jemallocator" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0de374a9f8e63150e6f5e8a60cc14c668226d7a347d8aee1a45766e3c4dd3bc" +dependencies = [ + "jemalloc-sys", + "libc", +] + [[package]] name = "jobserver" version = "0.1.32" @@ -1669,6 +1689,7 @@ dependencies = [ "hyper 1.6.0", "hyper-util", "is_executable", + "jemallocator", "lading-capture", "lading-payload", "lading-signal", @@ -3107,9 +3128,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.13.2" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9" [[package]] name = "socket2" diff --git a/Cargo.toml b/Cargo.toml index 368c23216..7a1270506 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ lto = true # Optimize our binary at link stage. codegen-units = 1 # Increases compile time but improves optimization alternatives. opt-level = 3 # Optimize with 'all' optimization flipped on. May produce larger binaries than 's' or 'z'. panic = "abort" +debug = false [profile.dev] panic = "abort" diff --git a/examples/lading-otel-metrics.yaml b/examples/lading-otel-metrics.yaml new file mode 100644 index 000000000..7e8031cdf --- /dev/null +++ b/examples/lading-otel-metrics.yaml @@ -0,0 +1,63 @@ +generator: + - http: + seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, + 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131] + headers: + content-type: "application/x-protobuf" + target_uri: "http://127.0.0.1:4318/v1/metrics" + bytes_per_second: "50 MiB" + parallel_connections: 5 + method: + post: + maximum_prebuild_cache_size_bytes: "1 GiB" + variant: + # all numbers arbitraray + opentelemetry_metrics: + metric_weights: + gauge: 50 + sum: 50 + contexts: + total_contexts: + constant: 100 + # host., service. etc + attributes_per_resource: + inclusive: + min: 1 + max: 64 + # auto-instrumentation in client libraries, DB connection etc + scopes_per_resource: + inclusive: + min: 1 + max: 32 + # rare? build info possibly + attributes_per_scope: + inclusive: + min: 0 + max: 4 + # exported instruments / telemetry by libraries and custom code + metrics_per_scope: + inclusive: + min: 1 + max: 128 + # stuff like exit code, user id, cgroup id + attributes_per_metric: + inclusive: + min: 0 + max: 255 + +blackhole: + - http: + binding_addr: "127.0.0.1:9091" + - http: + binding_addr: "127.0.0.1:9092" + - http: + binding_addr: "127.0.0.1:4217" # OTLP HTTP endpoint + body_variant: "nothing" + # - grpc: + # binding_addr: "127.0.0.1:4318" # OTLP gRPC endpoint + +target_metrics: + - prometheus: + uri: "http://127.0.0.1:8888/metrics" # OTel collector metrics endpoint + tags: + component: "otel-collector" diff --git a/lading/Cargo.toml b/lading/Cargo.toml index e8779c469..c21a232cc 100644 --- a/lading/Cargo.toml +++ b/lading/Cargo.toml @@ -43,6 +43,7 @@ http-serde = "2.1" hyper = { workspace = true, features = ["client", "http1", "http2", "server"] } hyper-util = { workspace = true, features = ["default", "client", "client-legacy"] } is_executable = "1.0.4" +jemallocator = { version = "0.5" } metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } metrics-util = { workspace = true } diff --git a/lading/src/bin/captool.rs b/lading/src/bin/captool.rs index aae4e87a9..d6d4f48e9 100644 --- a/lading/src/bin/captool.rs +++ b/lading/src/bin/captool.rs @@ -7,6 +7,7 @@ use async_compression::tokio::bufread::ZstdDecoder; use average::{Estimate, Max, Min, Variance, concatenate}; use clap::Parser; use futures::io; +use jemallocator::Jemalloc; use lading_capture::json::{Line, MetricKind}; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio_stream::StreamExt; @@ -14,6 +15,9 @@ use tokio_stream::wrappers::LinesStream; use tracing::{error, info}; use tracing_subscriber::{fmt::format::FmtSpan, util::SubscriberInitExt}; +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] struct Args { diff --git a/lading/src/bin/lading.rs b/lading/src/bin/lading.rs index 111d3055d..6e614d54b 100644 --- a/lading/src/bin/lading.rs +++ b/lading/src/bin/lading.rs @@ -8,6 +8,7 @@ use std::{ }; use clap::{ArgGroup, Parser, Subcommand}; +use jemallocator::Jemalloc; use lading::{ blackhole, captures::CaptureManager, @@ -32,6 +33,9 @@ use tokio::{ use tracing::{Instrument, debug, error, info, info_span, warn}; use tracing_subscriber::{EnvFilter, util::SubscriberInitExt}; +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + #[derive(thiserror::Error, Debug)] enum Error { #[error("Target related error: {0}")] diff --git a/lading/src/bin/payloadtool.rs b/lading/src/bin/payloadtool.rs index 78559e78c..590b95424 100644 --- a/lading/src/bin/payloadtool.rs +++ b/lading/src/bin/payloadtool.rs @@ -4,12 +4,16 @@ use std::{io, num::NonZeroU32}; use byte_unit::{Unit, UnitType}; use clap::Parser; +use jemallocator::Jemalloc; use lading::generator::http::Method; use lading_payload::block; use rand::{SeedableRng, rngs::StdRng}; -use tracing::{debug, error, info, warn}; +use tracing::{error, info, trace, warn}; use tracing_subscriber::{fmt::format::FmtSpan, util::SubscriberInitExt}; +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + const UDP_PACKET_LIMIT_BYTES: byte_unit::Byte = byte_unit::Byte::from_u64_with_unit(65_507, Unit::B).expect("valid bytes"); @@ -51,26 +55,28 @@ fn generate_and_check( block::Cache::Fixed { blocks, .. } => blocks, }; info!("Payload generation took {:?}", start.elapsed()); - debug!("Payload: {:#?}", blocks); + trace!("Payload: {:#?}", blocks); let mut total_generated_bytes: u32 = 0; for block in blocks.iter() { total_generated_bytes += block.total_bytes.get(); } + let total_requested_bytes = + byte_unit::Byte::from_u128(total_bytes.get().into()).expect("total_bytes must be non-zero"); + let total_requested_bytes_str = total_requested_bytes + .get_appropriate_unit(UnitType::Binary) + .to_string(); if total_bytes.get().abs_diff(total_generated_bytes) > 1_000_000 { - let total_requested_bytes = byte_unit::Byte::from_u128(total_bytes.get().into()) - .expect("total_bytes must be non-zero"); - let total_requested_bytes_str = total_requested_bytes - .get_appropriate_unit(UnitType::Binary) - .to_string(); let total_generated_bytes = byte_unit::Byte::from_u128(total_generated_bytes.into()) .expect("total_generated_bytes must be non-zero"); let total_generated_bytes_str = total_generated_bytes .get_appropriate_unit(UnitType::Binary) .to_string(); warn!( - "Generator failed to generate {total_requested_bytes_str}, instead only found {total_generated_bytes_str} of data" + "Generator failed to generate {total_requested_bytes_str}, producing {total_generated_bytes_str} of data" ) + } else { + info!("Generator succeeded in generating {total_requested_bytes_str} of data") } Ok(()) diff --git a/lading/src/bin/simple_target.rs b/lading/src/bin/simple_target.rs index 4dde223b2..daed52ff0 100644 --- a/lading/src/bin/simple_target.rs +++ b/lading/src/bin/simple_target.rs @@ -1,6 +1,9 @@ //! A simple target for lading that runs forever +use jemallocator::Jemalloc; use std::{thread, time}; +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; pub fn main() { loop { diff --git a/lading_payload/benches/opentelemetry_metric.rs b/lading_payload/benches/opentelemetry_metric.rs index 9889564af..6f82b2708 100644 --- a/lading_payload/benches/opentelemetry_metric.rs +++ b/lading_payload/benches/opentelemetry_metric.rs @@ -1,6 +1,10 @@ use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; -use lading_payload::{OpentelemetryMetrics, Serialize, opentelemetry_metric::Config}; +use lading_payload::common::config::ConfRange; +use lading_payload::{ + OpentelemetryMetrics, Serialize, + opentelemetry_metric::{Config, Contexts, MetricWeights}, +}; use rand::{SeedableRng, rngs::SmallRng}; use std::time::Duration; @@ -8,7 +12,17 @@ fn opentelemetry_metric_setup(c: &mut Criterion) { c.bench_function("opentelemetry_metric_setup", |b| { b.iter(|| { let mut rng = SmallRng::seed_from_u64(19690716); - let config = Config::default(); + let config = Config { + metric_weights: MetricWeights { gauge: 50, sum: 50 }, + contexts: Contexts { + total_contexts: ConfRange::Constant(100), + attributes_per_resource: ConfRange::Inclusive { min: 1, max: 64 }, + scopes_per_resource: ConfRange::Inclusive { min: 1, max: 32 }, + attributes_per_scope: ConfRange::Inclusive { min: 0, max: 4 }, + metrics_per_scope: ConfRange::Inclusive { min: 1, max: 128 }, + attributes_per_metric: ConfRange::Inclusive { min: 0, max: 255 }, + }, + }; let _ot = OpentelemetryMetrics::new(config, &mut rng) .expect("failed to create metrics generator"); }) @@ -24,7 +38,17 @@ fn opentelemetry_metric_all(c: &mut Criterion) { group.bench_with_input(BenchmarkId::from_parameter(size), size, |b, &size| { b.iter(|| { let mut rng = SmallRng::seed_from_u64(19690716); - let config = Config::default(); + let config = Config { + metric_weights: MetricWeights { gauge: 50, sum: 50 }, + contexts: Contexts { + total_contexts: ConfRange::Constant(100), + attributes_per_resource: ConfRange::Inclusive { min: 1, max: 64 }, + scopes_per_resource: ConfRange::Inclusive { min: 1, max: 32 }, + attributes_per_scope: ConfRange::Inclusive { min: 0, max: 4 }, + metrics_per_scope: ConfRange::Inclusive { min: 1, max: 128 }, + attributes_per_metric: ConfRange::Inclusive { min: 0, max: 255 }, + }, + }; let ot = OpentelemetryMetrics::new(config, &mut rng) .expect("failed to create metrics generator"); let mut writer = Vec::with_capacity(size); diff --git a/lading_payload/src/block.rs b/lading_payload/src/block.rs index ea1d1f45b..e39950bce 100644 --- a/lading_payload/src/block.rs +++ b/lading_payload/src/block.rs @@ -489,17 +489,18 @@ where S: crate::Serialize, R: Rng + ?Sized, { - let mut block_cache: Vec = Vec::with_capacity(128); - let mut bytes_remaining = total_bytes; let mut min_block_size = 0; let mut max_actual_block_size = 0; let mut rejected_block_sizes = 0; let mut success_block_sizes = 0; + info!( ?max_block_size, ?total_bytes, "Constructing requested block cache" ); + let mut block_cache: Vec = Vec::with_capacity(128); + let mut bytes_remaining = total_bytes; let start = Instant::now(); let mut next_minute = 1; diff --git a/lading_payload/src/common.rs b/lading_payload/src/common.rs index 4712a7a3b..9de073edf 100644 --- a/lading_payload/src/common.rs +++ b/lading_payload/src/common.rs @@ -1,3 +1,5 @@ -pub(crate) mod config; +//! Common utilities for all lading payloads + +pub mod config; pub(crate) mod strings; pub(crate) mod tags; diff --git a/lading_payload/src/common/config.rs b/lading_payload/src/common/config.rs index bf215f432..461ace739 100644 --- a/lading_payload/src/common/config.rs +++ b/lading_payload/src/common/config.rs @@ -1,3 +1,5 @@ +//! Common configuration for all lading payloads + use rand::distr::uniform::SampleUniform; use serde::{Deserialize, Serialize as SerdeSerialize}; use std::cmp; diff --git a/lading_payload/src/lib.rs b/lading_payload/src/lib.rs index a2b92335e..6dfc6898b 100644 --- a/lading_payload/src/lib.rs +++ b/lading_payload/src/lib.rs @@ -46,7 +46,7 @@ pub use trace_agent::TraceAgent; pub mod apache_common; pub mod ascii; -pub(crate) mod common; +pub mod common; pub mod datadog_logs; pub mod dogstatsd; pub mod fluent; @@ -84,6 +84,9 @@ pub enum Error { /// See [`unit::Error`] #[error(transparent)] Unit(#[from] opentelemetry_metric::unit::Error), + /// See [`prost::EncodeError`] + #[error(transparent)] + ProstEncode(#[from] prost::EncodeError), } /// To serialize into bytes diff --git a/lading_payload/src/opentelemetry_metric.rs b/lading_payload/src/opentelemetry_metric.rs index 45aba5814..5d6368e41 100644 --- a/lading_payload/src/opentelemetry_metric.rs +++ b/lading_payload/src/opentelemetry_metric.rs @@ -42,17 +42,20 @@ pub(crate) mod tags; mod templates; pub(crate) mod unit; -use std::io::Write; use std::rc::Rc; +use std::{cell::RefCell, io::Write}; use crate::{Error, Generator, common::config::ConfRange, common::strings}; +use bytes::BytesMut; use opentelemetry_proto::tonic::metrics::v1; use prost::Message; -use rand::{Rng, seq::IndexedRandom}; +use rand::seq::IndexedRandom; use serde::{Deserialize, Serialize as SerdeSerialize}; use templates::ResourceTemplateGenerator; use unit::UnitGenerator; +const SMALLEST_PROTOBUF: usize = 8; // safety buffer, smallest useful protobuf + /// Configure the OpenTelemetry metric payload. #[derive(Debug, Deserialize, SerdeSerialize, Clone, PartialEq, Copy)] #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] @@ -93,8 +96,10 @@ impl Default for Contexts { #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] #[serde(deny_unknown_fields, default)] pub struct MetricWeights { - gauge: u8, - sum: u8, + /// The relative probability of generating a gauge metric. + pub gauge: u8, + /// The relative probability of generating a sum metric. + pub sum: u8, } impl Default for MetricWeights { @@ -114,7 +119,7 @@ pub struct Config { /// Defines the relative probability of each kind of OpenTelemetry metric. pub metric_weights: MetricWeights, /// Define the contexts available when generating metrics - contexts: Contexts, + pub contexts: Contexts, } impl Config { @@ -213,6 +218,7 @@ impl Config { /// OTLP metric payload pub struct OpentelemetryMetrics { pool: Vec, + scratch: RefCell, } impl OpentelemetryMetrics { @@ -225,7 +231,9 @@ impl OpentelemetryMetrics { R: rand::Rng + ?Sized, { let context_cap = config.contexts.total_contexts.sample(rng); - let str_pool = Rc::new(strings::Pool::with_size(rng, 1_000_000)); + // Moby Dick is 1.2Mb. 256Kb should be more than enough for metric + // names, descriptions, etc. + let str_pool = Rc::new(strings::Pool::with_size(rng, 256_000)); let resource_template_generator = ResourceTemplateGenerator::new(&config, &str_pool, rng)?; let mut pool = Vec::with_capacity(context_cap as usize); @@ -234,7 +242,10 @@ impl OpentelemetryMetrics { pool.push(r); } - Ok(Self { pool }) + Ok(Self { + pool, + scratch: RefCell::new(BytesMut::with_capacity(4096)), + }) } } @@ -249,13 +260,12 @@ impl<'a> Generator<'a> for OpentelemetryMetrics { let tpl = self .pool .choose(rng) - .expect("template pool cannot be empty") - .clone(); + .expect("template pool cannot be empty"); let mut scopes = Vec::with_capacity(tpl.scopes.len()); - for s_tpl in tpl.scopes { + for s_tpl in &tpl.scopes { let mut metrics = Vec::with_capacity(s_tpl.metrics.len()); - for m_tpl in s_tpl.metrics { + for m_tpl in &s_tpl.metrics { metrics.push(m_tpl.instantiate(rng)); } scopes.push(v1::ScopeMetrics { @@ -266,7 +276,7 @@ impl<'a> Generator<'a> for OpentelemetryMetrics { } Ok(v1::ResourceMetrics { - resource: tpl.resource, + resource: tpl.resource.clone(), scope_metrics: scopes, schema_url: String::new(), }) @@ -276,47 +286,155 @@ impl<'a> Generator<'a> for OpentelemetryMetrics { impl crate::Serialize for OpentelemetryMetrics { fn to_bytes(&mut self, mut rng: R, max_bytes: usize, writer: &mut W) -> Result<(), Error> where - R: Rng + Sized, + R: rand::Rng + Sized, W: Write, { - // What we're making here is the ExportMetricServiceRequest. It has 5 - // bytes of fixed values plus a varint-encoded message length field to - // it. The worst case for the message length field is the max message - // size divided by 0b0111_1111. - // - // The user _does not_ set the number of Resoures per request -- we pack - // those in until max_bytes -- but they do set the scopes per request - // etc. All of that is transparent here, handled by the generators - // above. + // Scott's idea to generate directly into the protobuf + + let header_overhead = 1 /*tag*/ + max_bytes.div_ceil(0x7F); let mut bytes_remaining = max_bytes - .checked_sub(5 + max_bytes.div_ceil(0b0111_1111)) + .checked_sub(header_overhead) .ok_or(Error::Serialize)?; - let mut acc = Vec::with_capacity(128); // arbitrary constant - - // Generate resources as space allows. We will always generate at least - // one, whether it fits or not. - loop { - let resource: v1::ResourceMetrics = self.generate(&mut rng)?; - let len = resource.encoded_len() + 2; - match bytes_remaining.checked_sub(len) { - Some(remainder) => { - acc.push(resource); - bytes_remaining = remainder; + + let mut resources = Vec::new(); + + 'resource: while bytes_remaining >= SMALLEST_PROTOBUF { + let tpl = self + .pool + .choose(&mut rng) + .expect("template pool cannot be empty"); + + let mut res = opentelemetry_proto::tonic::metrics::v1::ResourceMetrics { + resource: tpl.resource.clone(), + scope_metrics: Vec::new(), + schema_url: String::new(), + }; + + // Fill scopes until we exhaust budget + for s_tpl in &tpl.scopes { + let mut scope = opentelemetry_proto::tonic::metrics::v1::ScopeMetrics { + scope: s_tpl.scope.clone(), + metrics: Vec::new(), + schema_url: String::new(), + }; + + for m_tpl in &s_tpl.metrics { + // Space needed by res + scope so far plus field tag/len + let envelope = res.encoded_len() + scope.encoded_len() + 2; + if envelope + SMALLEST_PROTOBUF > bytes_remaining { + break; // out of room already + } + + let budget = bytes_remaining - envelope; + let metric = m_tpl.fit_into(budget, &mut rng); + + let metric_size = metric.encoded_len() + 2; + if metric_size > budget { + break; // trimmed metric is too large + } + + scope.metrics.push(metric); + bytes_remaining -= metric_size; } - None => break, + + if !scope.metrics.is_empty() { + res.scope_metrics.push(scope); + } + + if bytes_remaining < SMALLEST_PROTOBUF { + break; + } + } + + if res.scope_metrics.is_empty() { + break 'resource; } + + resources.push(res); } - let proto = + let request = opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest { - resource_metrics: acc, + resource_metrics: resources, }; - let buf = proto.encode_to_vec(); - writer.write_all(&buf)?; + + let needed = request.encoded_len(); + { + let mut buf = self.scratch.borrow_mut(); + buf.clear(); // keep the allocation, drop the contents + let capacity = buf.capacity(); + let diff = capacity.saturating_sub(needed); + if buf.capacity() < needed { + buf.reserve(diff); // at most one malloc here + } + request.encode(&mut *buf)?; + writer.write_all(&buf)?; + } + Ok(()) } } +// impl crate::Serialize for OpentelemetryMetrics { +// fn to_bytes(&self, mut rng: R, max_bytes: usize, writer: &mut W) -> Result<(), Error> +// where +// R: Rng + Sized, +// W: Write, +// { +// // What we're making here is the ExportMetricServiceRequest. It has 5 +// // bytes of fixed values plus a varint-encoded message length field to +// // it. The worst case for the message length field is the max message +// // size divided by 0b0111_1111. +// // +// // The user _does not_ set the number of Resoures per request -- we pack +// // those in until max_bytes -- but they do set the scopes per request +// // etc. All of that is transparent here, handled by the generators +// // above. +// let mut bytes_remaining = max_bytes +// .checked_sub(5 + max_bytes.div_ceil(0b0111_1111)) +// .ok_or(Error::Serialize)?; +// // Each ResourceMetrics contains: +// // +// // - Resource with attributes +// // - ScopeMetrics array +// // - Each scope has metrics array +// // +// // Give or take, figure maybe 64 bytes a pop? +// // let estimated_resources = bytes_remaining / 64; +// let mut acc = Vec::with_capacity(estimated_resources.min(128)); + +// // Generate resources as space allows. We will always generate at least +// // one, whether it fits or not. +// loop { +// let resource: v1::ResourceMetrics = self.generate(&mut rng)?; +// let len = resource.encoded_len() + 2; +// match bytes_remaining.checked_sub(len) { +// Some(remainder) => { +// acc.push(resource); +// bytes_remaining = remainder; +// } +// None => break, +// } +// } + +// let proto = +// opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest { +// resource_metrics: acc, +// }; + +// let mut buf = self.scratch.borrow_mut(); +// let needed = proto.encoded_len(); +// let capacity = buf.capacity(); +// buf.clear(); // keep the allocation, drop the contents +// if capacity < needed { +// buf.reserve(needed - capacity); // at most one malloc here +// } +// proto.encode(&mut *buf)?; +// writer.write_all(&buf)?; +// Ok(()) +// } +// } + #[cfg(test)] mod test { use super::{Config, Contexts, OpentelemetryMetrics}; @@ -363,9 +481,9 @@ mod test { }; let mut rng1 = SmallRng::seed_from_u64(seed); - let otel_metrics1 = OpentelemetryMetrics::new(config.clone(), &mut rng1)?; + let otel_metrics1 = OpentelemetryMetrics::new(config, &mut rng1)?; let mut rng2 = SmallRng::seed_from_u64(seed); - let otel_metrics2 = OpentelemetryMetrics::new(config.clone(), &mut rng2)?; + let otel_metrics2 = OpentelemetryMetrics::new(config, &mut rng2)?; for _ in 0..steps { let gen_1 = otel_metrics1.generate(&mut rng1)?; @@ -375,33 +493,6 @@ mod test { } } - // We want to be sure that the serialized size of the payload does not - // exceed `max_bytes`. - proptest! { - #[test] - fn payload_not_exceed_max_bytes(seed: u64, max_bytes in 128u16..u16::MAX) { - let config = Config { - contexts: Contexts { - total_contexts: ConfRange::Constant(1), - attributes_per_resource: ConfRange::Constant(0), - scopes_per_resource: ConfRange::Constant(1), - attributes_per_scope: ConfRange::Constant(0), - metrics_per_scope: ConfRange::Constant(1), - attributes_per_metric: ConfRange::Constant(0), - }, - ..Default::default() - }; - - let max_bytes = max_bytes as usize; - let mut rng = SmallRng::seed_from_u64(seed); - let mut metrics = OpentelemetryMetrics::new(config, &mut rng).expect("failed to create metrics generator"); - - let mut bytes = Vec::with_capacity(max_bytes); - metrics.to_bytes(rng, max_bytes, &mut bytes).expect("failed to convert to bytes"); - prop_assert!(bytes.len() <= max_bytes, "max len: {max_bytes}, actual: {}", bytes.len()); - } - } - // Generation of metrics must be context bounded. If `generate` is called // more than total_context times only total_context contexts should be // produced. @@ -433,7 +524,7 @@ mod test { let mut ids = HashSet::new(); let mut rng = SmallRng::seed_from_u64(seed); - let otel_metrics = OpentelemetryMetrics::new(config.clone(), &mut rng)?; + let otel_metrics = OpentelemetryMetrics::new(config, &mut rng)?; let total_generations = total_contexts_max + (total_contexts_max / 2); for _ in 0..total_generations { @@ -516,7 +607,7 @@ mod test { }; let mut rng = SmallRng::seed_from_u64(seed); - let otel_metrics = OpentelemetryMetrics::new(config.clone(), &mut rng)?; + let otel_metrics = OpentelemetryMetrics::new(config, &mut rng)?; for _ in 0..steps { let metric = otel_metrics.generate(&mut rng)?; @@ -562,7 +653,7 @@ mod test { } } - /// Extracts and hashes the context from a ResourceMetrics. + /// Extracts and hashes the context from a `ResourceMetrics`. /// /// A context is defined by the unique combination of: /// - Resource attributes @@ -658,12 +749,12 @@ mod test { }; let mut rng = SmallRng::seed_from_u64(seed); - let otel_metrics = OpentelemetryMetrics::new(config.clone(), &mut rng)?; + let otel_metrics = OpentelemetryMetrics::new(config, &mut rng)?; // Generate two identical metrics let metric1 = otel_metrics.generate(&mut rng)?; let mut rng = SmallRng::seed_from_u64(seed); - let otel_metrics = OpentelemetryMetrics::new(config.clone(), &mut rng)?; + let otel_metrics = OpentelemetryMetrics::new(config, &mut rng)?; let metric2 = otel_metrics.generate(&mut rng)?; // Ensure that the metrics are equal. diff --git a/lading_payload/src/opentelemetry_metric/attributes.rs b/lading_payload/src/opentelemetry_metric/attributes.rs deleted file mode 100644 index 8d6ec2787..000000000 --- a/lading_payload/src/opentelemetry_metric/attributes.rs +++ /dev/null @@ -1,290 +0,0 @@ -//! Attribute generator for OpenTelemetry metrics payloads -use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value}; - -use crate::common::{config::ConfRange, strings::Pool}; -use std::rc::Rc; - -#[derive(Debug, Clone)] -pub(crate) struct Generator { - inner: crate::common::tags::Generator, -} - -/// Error type for `TagGenerator` -#[derive(thiserror::Error, Debug)] -pub(crate) enum Error { - /// Invalid construction - #[error("Invalid construction: {0}")] - InvalidConstruction(#[from] crate::common::tags::Error), -} - -impl Generator { - /// Creates a new attribute generator - /// - /// # Errors - /// - If `tags_per_msg` is invalid or exceeds the maximum - /// - If `tag_length` is invalid or has minimum value less than 3 - /// - If `unique_tag_probability` is not between 0.10 and 1.0 - pub(crate) fn new( - seed: u64, - tags_per_msg: ConfRange, - tag_length: ConfRange, - num_tagsets: usize, - str_pool: Rc, - unique_tag_probability: f32, - ) -> Result { - // Adjust tag_length range to account for the colon separator - let adjusted_tag_length = ConfRange::Inclusive { - min: tag_length.start(), - max: tag_length.end().saturating_sub(1), - }; - - let inner = crate::common::tags::Generator::new( - seed, - tags_per_msg, - adjusted_tag_length, - num_tagsets, - str_pool, - unique_tag_probability, - )?; - Ok(Generator { inner }) - } -} - -// https://docs.datadoghq.com/getting_started/tagging/#define-tags -impl<'a> crate::Generator<'a> for Generator { - type Output = Vec; - type Error = crate::Error; - - /// Return a tagset -- a list of tags, each tag having the format `key:value` - /// Note that after `num_tagsets` have been produced, the tagsets will loop and produce - /// identical tagsets. - /// Each tagset is randomly chosen. There is a very high probability that each tagset - /// will be unique, however see the note in the component documentation. - fn generate(&'a self, rng: &mut R) -> Result - where - R: rand::Rng + ?Sized, - { - let mut tag_handles = self.inner.generate(rng)?; - let mut kvs = Vec::with_capacity(tag_handles.len()); - for crate::common::tags::Tag { key, value } in tag_handles.drain(..) { - let key_s = self - .inner - .using_handle(key) - .expect("invalid handle, catastrophic bug"); - let val_s = self - .inner - .using_handle(value) - .expect("invalid handle, catastrophic bug"); - - kvs.push(KeyValue { - key: String::from(key_s), - value: Some(AnyValue { - value: Some(any_value::Value::StringValue(String::from(val_s))), - }), - }); - } - Ok(kvs) - } -} - -#[cfg(test)] -mod test { - use std::collections::{BTreeSet, HashMap, hash_map::RandomState}; - use std::hash::BuildHasher; - use std::hash::Hasher; - use std::rc::Rc; - - use proptest::prelude::*; - use rand::{SeedableRng, rngs::SmallRng}; - - use crate::Generator; - use crate::common::strings::Pool; - use crate::common::tags::{MAX_UNIQUE_TAG_RATIO, WARN_UNIQUE_TAG_RATIO}; - use crate::dogstatsd::{ConfRange, tags}; - - /// given a list of tagsets, this returns the number of unique timeseries that they represent - /// in dogstatsd terms, if we assume a constant metric name, - fn count_num_contexts(tagsets: &Vec) -> usize { - let mut context_map: HashMap = HashMap::new(); - let hash_builder = RandomState::new(); - - for tagset in tagsets { - let mut sorted_tags: BTreeSet = BTreeSet::new(); - for tag in tagset { - sorted_tags.insert(tag.to_string()); - } - - let mut context_key = hash_builder.build_hasher(); - for tag in &sorted_tags { - context_key.write_usize(tag.len()); - context_key.write(tag.as_bytes()); - } - let entry = context_map.entry(context_key.finish()).or_default(); - *entry += 1; - } - context_map.len() - } - - #[test] - fn count_contexts_works() { - let tagsets = vec![ - vec!["a:1".to_string(), "b:2".to_string()], - vec!["a:1".to_string(), "b:2".to_string()], - vec!["a:1".to_string(), "b:2".to_string()], - vec!["a:1".to_string(), "b:2".to_string()], - ]; - let num_contexts = count_num_contexts(&tagsets); - assert_eq!(num_contexts, 1); - - let tagsets = vec![ - vec!["a:3".to_string(), "b:2".to_string()], - vec!["a:4".to_string(), "b:2".to_string()], - vec!["a:5".to_string(), "b:2".to_string()], - vec!["a:1".to_string(), "b:2".to_string()], - ]; - let num_contexts = count_num_contexts(&tagsets); - assert_eq!(num_contexts, 4); - } - - proptest! { - #[test] - fn tagsets_repeat_after_reaching_tagset_max(seed: u64, num_tagsets in 1..10_000_usize) { - let mut rng = SmallRng::seed_from_u64(seed); - - let str_pool = Rc::new(Pool::with_size(&mut rng, 1_000_000)); - let tags_per_msg_range = ConfRange::Inclusive { min: 0, max: 25 }; - let tag_size_range = ConfRange::Inclusive { min: 3, max: 128 }; - let generator = - tags::Generator::new(seed, tags_per_msg_range, tag_size_range, num_tagsets, str_pool, 1.0) - .expect("Tag generator to be valid"); - - let first_batch = (0..num_tagsets) - .map(|_| { - generator - .generate(&mut rng) - .expect("failed to generate tagset") - }) - .collect::>(); - - let second_batch = (0..num_tagsets) - .map(|_| { - generator - .generate(&mut rng) - .expect("failed to generate tagset") - }) - .collect::>(); - - assert_eq!(first_batch.len(), second_batch.len()); - for i in 0..first_batch.len() { - let first = &first_batch[i]; - let second = &second_batch[i]; - assert_eq!(first, second); - } - } - } - - proptest! { - #[test] - fn generator_yields_valid_tagsets(seed: u64, num_tagsets in 1..10_000_usize, tags_per_msg_max in 1..u8::MAX) { - let mut rng = SmallRng::seed_from_u64(seed); - - let str_pool = Rc::new(Pool::with_size(&mut rng, 1_000_000)); - let tags_per_msg_range = ConfRange::Inclusive{min: 0, max: tags_per_msg_max}; - let tag_size_range = ConfRange::Inclusive{min: 3, max: 128}; - let generator = tags::Generator::new( - seed, - tags_per_msg_range, - tag_size_range, - num_tagsets, - str_pool, - 1.0 - ).expect("Tag generator to be valid"); - - for _ in 0..num_tagsets { - let tagset = generator.generate(&mut rng)?; - for tag in &tagset { - let start = tag_size_range.start().into(); - let end = tag_size_range.end().into(); - debug_assert!(tag.len() <= end, "tag len: {}, tag_size_range end: {end}", tag.len()); - debug_assert!(tag.len() >= start, "tag len: {}, tag_size_range start: {start}", tag.len()); - let num_delimiters = tag.chars().filter(|c| *c == ':').count(); - assert_eq!(num_delimiters, 1); - } - assert!(tagset.len() <= tags_per_msg_range.end() as usize); - assert!(tagset.len() >= tags_per_msg_range.start() as usize); - } - } - } - - proptest! { - /// This test asserts that when the is 1.0, we always are able to hit - /// the desired number of unique tagsets no matter what. - #[test] - fn unique_tagsets_respected_always_unique_tags(seed: u64, desired_num_tagsets in 1..10_000_usize) { - let tags_per_msg_range = ConfRange::Inclusive { min: 2, max: 25 }; - let tag_size_range = ConfRange::Inclusive { min: 3, max: 128 }; - let mut rng = SmallRng::seed_from_u64(seed); - - let str_pool = Rc::new(Pool::with_size(&mut rng, 1_000_000)); - let generator = tags::Generator::new( - seed, - tags_per_msg_range, - tag_size_range, - desired_num_tagsets, - str_pool, - 1.0, - ) - .expect("Tag generator to be valid"); - - // need guarantee that calling generate N times will generate N unique tagsets - let tagsets = (0..desired_num_tagsets) - .map(|_| { - generator - .generate(&mut rng) - .expect("failed to generate tagset") - }) - .collect::>(); - - let num_contexts = count_num_contexts(&tagsets); - assert_eq!(num_contexts, desired_num_tagsets); - } - } - - proptest! { - /// This test varies the unique_tag_probability. This config option makes it possible - /// to specify inputs that will force the tagsets to repeat - /// A concern of the dogstatsd consumer is that the tagsets yielded have a cardinality of - /// `num_tagsets` - /// The goal of this test is to vary the unique_tag_probability between the WARN and MAX - /// levels and ensure that we are always able to generate the desired number of unique tagsets - #[test] - fn unique_tagsets_respected_with_varying_ratio(seed: u64, desired_num_tagsets in 5..10_000_usize, unique_tag_ratio in WARN_UNIQUE_TAG_RATIO..MAX_UNIQUE_TAG_RATIO) { - let tags_per_msg_range = ConfRange::Inclusive { min: 2, max: 25 }; - let tag_size_range = ConfRange::Inclusive { min: 3, max: 128 }; - let mut rng = SmallRng::seed_from_u64(seed); - - let str_pool = Rc::new(Pool::with_size(&mut rng, 1_000_000)); - let generator = tags::Generator::new( - seed, - tags_per_msg_range, - tag_size_range, - desired_num_tagsets, - str_pool, - unique_tag_ratio - ) - .expect("Tag generator to be valid"); - - let tagsets = (0..desired_num_tagsets) - .map(|_| { - generator - .generate(&mut rng) - .expect("failed to generate tagset") - }) - .collect::>(); - - let margin_of_error = 3; - let num_contexts = count_num_contexts(&tagsets); - assert!(num_contexts >= desired_num_tagsets - margin_of_error || num_contexts <= desired_num_tagsets + margin_of_error); - } - } -} diff --git a/lading_payload/src/opentelemetry_metric/tags.rs b/lading_payload/src/opentelemetry_metric/tags.rs index 81cb35026..a5a5eafa0 100644 --- a/lading_payload/src/opentelemetry_metric/tags.rs +++ b/lading_payload/src/opentelemetry_metric/tags.rs @@ -1,9 +1,8 @@ //! Tag generation for OpenTelemetry metric payloads use std::rc::Rc; -use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value}; - use crate::{Error, common::config::ConfRange, common::strings::Pool}; +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value}; #[derive(Debug, Clone)] pub(crate) struct TagGenerator { @@ -39,7 +38,7 @@ impl TagGenerator { } impl<'a> crate::Generator<'a> for TagGenerator { - type Output = Vec; + type Output = Rc>; type Error = Error; fn generate(&'a self, rng: &mut R) -> Result @@ -56,7 +55,7 @@ impl<'a> crate::Generator<'a> for TagGenerator { .inner .generate(rng) .map_err(|_| Error::StringGenerate)?; - let mut attributes = Vec::with_capacity(tagset.len()); + let mut attributes = Vec::::with_capacity(tagset.len()); for tag in tagset { let key = self @@ -76,6 +75,6 @@ impl<'a> crate::Generator<'a> for TagGenerator { }); } - Ok(attributes) + Ok(Rc::new(attributes)) } } diff --git a/lading_payload/src/opentelemetry_metric/templates.rs b/lading_payload/src/opentelemetry_metric/templates.rs index e529f95d6..062172174 100644 --- a/lading_payload/src/opentelemetry_metric/templates.rs +++ b/lading_payload/src/opentelemetry_metric/templates.rs @@ -8,6 +8,7 @@ use opentelemetry_proto::tonic::{ }, resource, }; +use prost::Message; use rand::{ Rng, distr::{Distribution, StandardUniform, weighted::WeightedIndex}, @@ -54,7 +55,7 @@ pub(crate) struct MetricTemplate { pub name: String, pub description: String, pub unit: String, - pub metadata: Vec, + pub metadata: Rc>, pub kind: Kind, } @@ -176,9 +177,28 @@ impl MetricTemplate { description: self.description.clone(), unit: self.unit.clone(), data: Some(data), - metadata: self.metadata.clone(), + metadata: self.metadata.as_slice().to_owned(), } } + + /// Shrink self by removing data-points until `encoded_len() <= limit`. + pub(crate) fn fit_into(&self, limit: usize, rng: &mut impl Rng) -> metrics::v1::Metric { + let mut data = self.instantiate(rng); + + while data.encoded_len() + 2 > limit { + let res = match data.data.as_mut() { + Some(Data::Gauge(g)) => g.data_points.pop(), + Some(Data::Sum(s)) => s.data_points.pop(), + None => None, + _ => unreachable!(), + }; + + if res.is_none() { + break; // cannot shrink further + } + } + data + } } /// Static description of a Scope and its metrics. @@ -244,7 +264,7 @@ impl<'a> crate::Generator<'a> for ScopeTemplateGenerator { .ok_or(Error::StringGenerate)? .to_owned(), version: String::new(), - attributes, + attributes: attributes.as_slice().to_owned(), dropped_attributes_count: 0, }) }; @@ -315,7 +335,7 @@ impl<'a> crate::Generator<'a> for ResourceTemplateGenerator { } else { let attributes = self.tags.generate(rng)?; let res = resource::v1::Resource { - attributes, + attributes: attributes.as_slice().to_owned(), dropped_attributes_count: 0, }; Some(res)