@@ -6,7 +6,7 @@ use vector_lib::{
66} ;
77use vrl:: value:: Kind ;
88
9- use super :: transform:: { Sample , SampleMode } ;
9+ use super :: transform:: { DynamicSampleFields , Sample , SampleMode } ;
1010use crate :: {
1111 conditions:: AnyCondition ,
1212 config:: {
@@ -29,10 +29,23 @@ pub enum SampleError {
2929 #[ snafu( display( "Only non-zero numbers are allowed values for `rate`" ) ) ]
3030 InvalidRate ,
3131
32+ #[ snafu( display( "Only one value can be provided for either 'rate' or 'ratio', but not both" ) ) ]
33+ InvalidStaticConfiguration ,
34+
35+ #[ snafu( display(
36+ "Only one value can be provided for either 'ratio_field' or 'rate_field', but not both"
37+ ) ) ]
38+ InvalidDynamicConfiguration ,
39+
40+ #[ snafu( display(
41+ "Exactly one value must be provided for either 'rate' or 'ratio' to configure static sampling"
42+ ) ) ]
43+ MissingStaticConfiguration ,
44+
3245 #[ snafu( display(
33- "Exactly one value must be provided for either 'rate ' or 'ratio', but not both "
46+ "'key_field' cannot be combined with 'ratio_field ' or 'rate_field' because dynamic values can vary per event and break key-based coherence "
3447 ) ) ]
35- InvalidConfiguration ,
48+ InvalidKeyFieldDynamicCombination ,
3649}
3750
3851/// Configuration for the `sample` transform.
@@ -61,6 +74,24 @@ pub struct SampleConfig {
6174 #[ configurable( validation( range( min = 0.0 , max = 1.0 ) ) ) ]
6275 pub ratio : Option < f64 > ,
6376
77+ /// The event field whose numeric value is used as the sampling ratio on a per-event basis.
78+ ///
79+ /// Accepts integer, floating point, or string values that parse as a number. The value must be
80+ /// in `(0, 1]` to be considered valid (for example, `0.25` keeps 25%). If the field is missing
81+ /// or invalid, static sampling settings (`rate` or `ratio`) are used as a fallback.
82+ /// This option cannot be used together with `rate_field`.
83+ #[ configurable( metadata( docs:: examples = "sample_rate" ) ) ]
84+ pub ratio_field : Option < String > ,
85+
86+ /// The event field whose integer value is used as the sampling rate on a per-event basis, expressed as `1/N`.
87+ ///
88+ /// Accepts an integer, or a string that parses as a positive integer; floating point values
89+ /// are rejected. The value must be a positive integer to be considered valid. If the field is
90+ /// missing or invalid, static sampling settings (`rate` or `ratio`) are used as a fallback.
91+ /// This option cannot be used together with `ratio_field`.
92+ #[ configurable( metadata( docs:: examples = "sample_rate_n" ) ) ]
93+ pub rate_field : Option < String > ,
94+
6495 /// The name of the field whose value is hashed to determine if the event should be
6596 /// sampled.
6697 ///
@@ -72,6 +103,8 @@ pub struct SampleConfig {
72103 ///
73104 /// This can be useful to, for example, ensure that all logs for a given transaction are
74105 /// sampled together, but that overall `1/N` transactions are sampled.
106+ ///
107+ /// This option cannot be combined with `ratio_field` or `rate_field`.
75108 #[ configurable( metadata( docs:: examples = "message" ) ) ]
76109 pub key_field : Option < String > ,
77110
@@ -84,6 +117,9 @@ pub struct SampleConfig {
84117 ///
85118 /// If left unspecified, or if the event doesn't have `group_by`, then the event is not
86119 /// sampled separately.
120+ ///
121+ /// This can also be used with `ratio_field` or `rate_field` to apply dynamic sampling
122+ /// independently per rendered group value.
87123 #[ configurable( metadata(
88124 docs:: examples = "{{ service }}" ,
89125 docs:: examples = "{{ hostname }}-{{ service }}"
@@ -96,6 +132,18 @@ pub struct SampleConfig {
96132
97133impl SampleConfig {
98134 fn sample_rate ( & self ) -> Result < SampleMode , SampleError > {
135+ if self . ratio_field . is_some ( ) && self . rate_field . is_some ( ) {
136+ return Err ( SampleError :: InvalidDynamicConfiguration ) ;
137+ }
138+
139+ if self . key_field . is_some ( ) && ( self . ratio_field . is_some ( ) || self . rate_field . is_some ( ) ) {
140+ return Err ( SampleError :: InvalidKeyFieldDynamicCombination ) ;
141+ }
142+
143+ if self . rate . is_some ( ) && self . ratio . is_some ( ) {
144+ return Err ( SampleError :: InvalidStaticConfiguration ) ;
145+ }
146+
99147 match ( self . rate , self . ratio ) {
100148 ( None , Some ( ratio) ) => {
101149 if ratio <= 0.0 {
@@ -111,7 +159,8 @@ impl SampleConfig {
111159 Ok ( SampleMode :: new_rate ( rate) )
112160 }
113161 }
114- _ => Err ( SampleError :: InvalidConfiguration ) ,
162+ ( None , None ) => Err ( SampleError :: MissingStaticConfiguration ) ,
163+ _ => Err ( SampleError :: InvalidStaticConfiguration ) ,
115164 }
116165 }
117166}
@@ -121,6 +170,8 @@ impl GenerateConfig for SampleConfig {
121170 toml:: Value :: try_from ( Self {
122171 rate : None ,
123172 ratio : Some ( 0.1 ) ,
173+ ratio_field : None ,
174+ rate_field : None ,
124175 key_field : None ,
125176 group_by : None ,
126177 exclude : None :: < AnyCondition > ,
@@ -134,19 +185,37 @@ impl GenerateConfig for SampleConfig {
134185#[ typetag:: serde( name = "sample" ) ]
135186impl TransformConfig for SampleConfig {
136187 async fn build ( & self , context : & TransformContext ) -> crate :: Result < Transform > {
137- Ok ( Transform :: function ( Sample :: new (
138- Self :: NAME . to_string ( ) ,
139- self . sample_rate ( ) ?,
140- self . key_field . clone ( ) ,
141- self . group_by . clone ( ) ,
142- self . exclude
143- . as_ref ( )
144- . map ( |condition| {
145- condition. build ( & context. enrichment_tables , & context. metrics_storage )
146- } )
147- . transpose ( ) ?,
148- self . sample_rate_key . clone ( ) ,
149- ) ) )
188+ let sample_mode = self . sample_rate ( ) ?;
189+ let exclude = self
190+ . exclude
191+ . as_ref ( )
192+ . map ( |condition| condition. build ( & context. enrichment_tables , & context. metrics_storage ) )
193+ . transpose ( ) ?;
194+
195+ let sample = if self . ratio_field . is_some ( ) || self . rate_field . is_some ( ) {
196+ Sample :: new_with_dynamic (
197+ Self :: NAME . to_string ( ) ,
198+ sample_mode,
199+ DynamicSampleFields {
200+ ratio_field : self . ratio_field . clone ( ) ,
201+ rate_field : self . rate_field . clone ( ) ,
202+ } ,
203+ self . group_by . clone ( ) ,
204+ exclude,
205+ self . sample_rate_key . clone ( ) ,
206+ )
207+ } else {
208+ Sample :: new (
209+ Self :: NAME . to_string ( ) ,
210+ sample_mode,
211+ self . key_field . clone ( ) ,
212+ self . group_by . clone ( ) ,
213+ exclude,
214+ self . sample_rate_key . clone ( ) ,
215+ )
216+ } ;
217+
218+ Ok ( Transform :: function ( sample) )
150219 }
151220
152221 fn input ( & self ) -> Input {
@@ -191,10 +260,100 @@ pub fn default_sample_rate_key() -> OptionalValuePath {
191260
192261#[ cfg( test) ]
193262mod tests {
194- use crate :: transforms:: sample:: config:: SampleConfig ;
263+ use crate :: {
264+ config:: TransformConfig ,
265+ transforms:: sample:: config:: { SampleConfig , SampleError } ,
266+ } ;
195267
196268 #[ test]
197269 fn generate_config ( ) {
198270 crate :: test_util:: test_generate_config :: < SampleConfig > ( ) ;
199271 }
272+
273+ #[ test]
274+ fn rejects_dynamic_ratio_only_configuration ( ) {
275+ let config = SampleConfig {
276+ rate : None ,
277+ ratio : None ,
278+ ratio_field : Some ( "sample_rate" . to_string ( ) ) ,
279+ rate_field : None ,
280+ key_field : None ,
281+ sample_rate_key : super :: default_sample_rate_key ( ) ,
282+ group_by : None ,
283+ exclude : None ,
284+ } ;
285+
286+ let err = config. sample_rate ( ) . unwrap_err ( ) ;
287+ assert ! ( matches!( err, SampleError :: MissingStaticConfiguration ) ) ;
288+ }
289+
290+ #[ test]
291+ fn rejects_dynamic_rate_only_configuration ( ) {
292+ let config = SampleConfig {
293+ rate : None ,
294+ ratio : None ,
295+ ratio_field : None ,
296+ rate_field : Some ( "sample_rate_n" . to_string ( ) ) ,
297+ key_field : None ,
298+ sample_rate_key : super :: default_sample_rate_key ( ) ,
299+ group_by : None ,
300+ exclude : None ,
301+ } ;
302+
303+ let err = config. sample_rate ( ) . unwrap_err ( ) ;
304+ assert ! ( matches!( err, SampleError :: MissingStaticConfiguration ) ) ;
305+ }
306+
307+ #[ test]
308+ fn validates_static_with_dynamic_configuration ( ) {
309+ let config = SampleConfig {
310+ rate : Some ( 10 ) ,
311+ ratio : None ,
312+ ratio_field : None ,
313+ rate_field : Some ( "sample_rate_n" . to_string ( ) ) ,
314+ key_field : None ,
315+ sample_rate_key : super :: default_sample_rate_key ( ) ,
316+ group_by : None ,
317+ exclude : None ,
318+ } ;
319+
320+ assert ! ( config. validate( & crate :: schema:: Definition :: any( ) ) . is_ok( ) ) ;
321+ }
322+
323+ #[ test]
324+ fn rejects_both_dynamic_fields_configuration ( ) {
325+ let config = SampleConfig {
326+ rate : Some ( 10 ) ,
327+ ratio : None ,
328+ ratio_field : Some ( "sample_rate" . to_string ( ) ) ,
329+ rate_field : Some ( "sample_rate_n" . to_string ( ) ) ,
330+ key_field : None ,
331+ sample_rate_key : super :: default_sample_rate_key ( ) ,
332+ group_by : None ,
333+ exclude : None ,
334+ } ;
335+
336+ let err = config. sample_rate ( ) . unwrap_err ( ) ;
337+ assert ! ( matches!( err, SampleError :: InvalidDynamicConfiguration ) ) ;
338+ }
339+
340+ #[ test]
341+ fn rejects_key_field_with_dynamic_configuration ( ) {
342+ let config = SampleConfig {
343+ rate : Some ( 10 ) ,
344+ ratio : None ,
345+ ratio_field : Some ( "sample_ratio" . to_string ( ) ) ,
346+ rate_field : None ,
347+ key_field : Some ( "trace_id" . to_string ( ) ) ,
348+ sample_rate_key : super :: default_sample_rate_key ( ) ,
349+ group_by : None ,
350+ exclude : None ,
351+ } ;
352+
353+ let err = config. sample_rate ( ) . unwrap_err ( ) ;
354+ assert ! ( matches!(
355+ err,
356+ SampleError :: InvalidKeyFieldDynamicCombination
357+ ) ) ;
358+ }
200359}
0 commit comments