Skip to content

Commit 26239f0

Browse files
committed
Implement per tag control in tag cardinality processor
1 parent 8073e93 commit 26239f0

7 files changed

Lines changed: 774 additions & 80 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
The `tag_cardinality_limit` transform now supports an `exclude: true` option that opts a metric or
2+
tag entirely out of cardinality control — all tag values pass through and nothing is tracked.
3+
`exclude` works at the global, per-metric, and per-tag levels, and takes precedence over `mode`,
4+
`value_limit`, `limit_exceeded_action`, and `internal_metrics` when also set. At least one of
5+
`mode` or `exclude: true` must be present on every entry.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
The `tag_cardinality_limit` transform now supports per-tag-key configuration overrides nested
2+
under each entry in `per_metric_limits`. Each per-tag entry has the same fields as a per-metric
3+
configuration (`value_limit`, `limit_exceeded_action`, `mode`, `internal_metrics`), and replaces
4+
the per-metric configuration for that tag. Resolution order is per-tag → per-metric → global.

lib/vector-core/src/event/metric/tags.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,4 +777,34 @@ mod tests {
777777
}
778778
}
779779
}
780+
781+
fn make_tags(pairs: &[(&str, &str)]) -> MetricTags {
782+
pairs
783+
.iter()
784+
.map(|(k, v)| (k.to_string(), v.to_string()))
785+
.collect()
786+
}
787+
788+
#[test]
789+
fn rename_with_replacement_basic() {
790+
let mut tags = make_tags(&[("old", "v1")]);
791+
assert!(tags.rename_with_replacement("old", "new".to_string()));
792+
assert_eq!(tags.get("new"), Some("v1"));
793+
assert!(!tags.contains_key("old"));
794+
}
795+
796+
#[test]
797+
fn rename_with_replacement_missing_old_returns_false() {
798+
let mut tags = make_tags(&[("a", "1")]);
799+
assert!(!tags.rename_with_replacement("missing", "b".to_string()));
800+
assert!(tags.contains_key("a"));
801+
}
802+
803+
#[test]
804+
fn rename_with_replacement_overwrites_existing_destination() {
805+
let mut tags = make_tags(&[("old", "v1"), ("new", "v2")]);
806+
assert!(tags.rename_with_replacement("old", "new".to_string()));
807+
assert_eq!(tags.get("new"), Some("v1"));
808+
assert!(!tags.contains_key("old"));
809+
}
780810
}

src/transforms/tag_cardinality_limit/config.rs

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,20 +48,53 @@ pub struct Config {
4848
#[configurable_component]
4949
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
5050
pub struct Inner {
51-
/// How many distinct values to accept for any given key.
51+
/// How many distinct values to accept for any given key. Ignored when `exclude` is `true`.
5252
#[serde(default = "default_value_limit")]
5353
pub value_limit: usize,
5454

5555
#[configurable(derived)]
5656
#[serde(default = "default_limit_exceeded_action")]
5757
pub limit_exceeded_action: LimitExceededAction,
5858

59-
#[serde(flatten)]
60-
pub mode: Mode,
59+
/// Cardinality tracking mode. Required unless `exclude` is `true`. When both `exclude` and
60+
/// `mode` are set, `exclude` takes precedence and `mode` is ignored.
61+
#[serde(flatten, default)]
62+
pub mode: Option<Mode>,
6163

6264
#[configurable(derived)]
6365
#[serde(default)]
6466
pub internal_metrics: InternalMetricsConfig,
67+
68+
/// Exclude this metric or tag from cardinality control entirely. When `true`, all tag values
69+
/// pass through and nothing is tracked, regardless of any other fields on this entry.
70+
#[serde(default)]
71+
pub exclude: bool,
72+
}
73+
74+
impl Inner {
75+
/// Validate that at least one of `exclude` or `mode` is set.
76+
pub(super) fn validate(&self, location: &str) -> crate::Result<()> {
77+
if !self.exclude && self.mode.is_none() {
78+
return Err(
79+
format!("{location}: `mode` is required unless `exclude: true` is set").into(),
80+
);
81+
}
82+
Ok(())
83+
}
84+
}
85+
86+
impl Config {
87+
pub(super) fn validate(&self) -> crate::Result<()> {
88+
self.global.validate("global")?;
89+
for (name, pmc) in &self.per_metric_limits {
90+
pmc.config.validate(&format!("per_metric_limits[{name}]"))?;
91+
for (tag, ptc) in &pmc.per_tag_limits {
92+
ptc.config
93+
.validate(&format!("per_metric_limits[{name}].per_tag_limits[{tag}]"))?;
94+
}
95+
}
96+
Ok(())
97+
}
6598
}
6699

67100
/// Controls the approach taken for tracking tag cardinality.
@@ -121,6 +154,26 @@ pub struct PerMetricConfig {
121154
#[serde(default)]
122155
pub namespace: Option<String>,
123156

157+
/// Per-tag-key overrides scoped to this metric.
158+
///
159+
/// Each entry has the same fields as a per-metric configuration. When a tag has an entry here,
160+
/// that entry replaces the per-metric configuration for that tag. Tags not listed here use this
161+
/// per-metric configuration.
162+
#[configurable(
163+
derived,
164+
metadata(docs::additional_props_description = "An individual tag configuration.")
165+
)]
166+
#[serde(default)]
167+
pub per_tag_limits: HashMap<String, PerTagConfig>,
168+
169+
#[serde(flatten)]
170+
pub config: Inner,
171+
}
172+
173+
/// Tag cardinality limit configuration for a specific tag key, scoped under a per-metric override.
174+
#[configurable_component]
175+
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
176+
pub struct PerTagConfig {
124177
#[serde(flatten)]
125178
pub config: Inner,
126179
}
@@ -145,10 +198,11 @@ impl GenerateConfig for Config {
145198
fn generate_config() -> toml::Value {
146199
toml::Value::try_from(Self {
147200
global: Inner {
148-
mode: Mode::Exact,
201+
mode: Some(Mode::Exact),
149202
value_limit: default_value_limit(),
150203
limit_exceeded_action: default_limit_exceeded_action(),
151204
internal_metrics: InternalMetricsConfig::default(),
205+
exclude: false,
152206
},
153207
per_metric_limits: HashMap::default(),
154208
})
@@ -160,6 +214,7 @@ impl GenerateConfig for Config {
160214
#[typetag::serde(name = "tag_cardinality_limit")]
161215
impl TransformConfig for Config {
162216
async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
217+
self.validate()?;
163218
Ok(Transform::event_task(TagCardinalityLimit::new(
164219
self.clone(),
165220
)))

src/transforms/tag_cardinality_limit/mod.rs

Lines changed: 63 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ mod tag_value_set;
1515
#[cfg(test)]
1616
mod tests;
1717

18-
pub use config::{BloomFilterConfig, Config, Inner, LimitExceededAction, Mode, PerMetricConfig};
18+
pub use config::{
19+
BloomFilterConfig, Config, Inner, LimitExceededAction, Mode, PerMetricConfig, PerTagConfig,
20+
};
1921
use tag_value_set::AcceptedTagValueSet;
2022

2123
use crate::event::metric::TagValueSet;
@@ -36,19 +38,22 @@ impl TagCardinalityLimit {
3638
}
3739
}
3840

39-
fn get_config_for_metric(&self, metric_key: Option<&MetricId>) -> &Inner {
40-
match metric_key {
41-
Some(id) => self
42-
.config
43-
.per_metric_limits
44-
.iter()
45-
.find(|(name, config)| {
46-
**name == id.1 && (config.namespace.is_none() || config.namespace == id.0)
41+
/// Resolve the configuration that applies to a specific (metric, tag) pair.
42+
///
43+
/// Lookup chain: per-tag override → per-metric override → global.
44+
fn resolve_tag(&self, metric_key: Option<&MetricId>, tag_key: &str) -> &Inner {
45+
if let Some(id) = metric_key
46+
&& let Some((_, pmc)) =
47+
self.config.per_metric_limits.iter().find(|(name, c)| {
48+
**name == id.1 && (c.namespace.is_none() || c.namespace == id.0)
4749
})
48-
.map(|(_, c)| &c.config)
49-
.unwrap_or(&self.config.global),
50-
None => &self.config.global,
50+
{
51+
if let Some(t) = pmc.per_tag_limits.get(tag_key) {
52+
return &t.config;
53+
}
54+
return &pmc.config;
5155
}
56+
&self.config.global
5257
}
5358

5459
/// Takes in key and a value corresponding to a tag on an incoming Metric
@@ -65,11 +70,15 @@ impl TagCardinalityLimit {
6570
key: &str,
6671
value: &TagValueSet,
6772
) -> bool {
68-
let config = *self.get_config_for_metric(metric_key);
73+
let config = *self.resolve_tag(metric_key, key);
74+
if config.exclude {
75+
return true;
76+
}
77+
let mode = config.mode.expect("validated at build time");
6978
let metric_accepted_tags = self.accepted_tags.entry(metric_key.cloned()).or_default();
7079
let tag_value_set = metric_accepted_tags
7180
.entry_ref(key)
72-
.or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode));
81+
.or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &mode));
7382

7483
if tag_value_set.contains(value) {
7584
// Tag value has already been accepted, nothing more to do.
@@ -100,27 +109,20 @@ impl TagCardinalityLimit {
100109
key: &str,
101110
value: &TagValueSet,
102111
) -> bool {
112+
let resolved = self.resolve_tag(metric_key, key);
113+
if resolved.exclude {
114+
return false;
115+
}
103116
self.accepted_tags
104117
.get(&metric_key.cloned())
105118
.and_then(|metric_accepted_tags| {
106119
metric_accepted_tags.get(key).map(|value_set| {
107-
!value_set.contains(value)
108-
&& value_set.len() >= self.get_config_for_metric(metric_key).value_limit
120+
!value_set.contains(value) && value_set.len() >= resolved.value_limit
109121
})
110122
})
111123
.unwrap_or(false)
112124
}
113125

114-
/// Record a key and value corresponding to a tag on an incoming Metric.
115-
fn record_tag_value(&mut self, metric_key: Option<&MetricId>, key: &str, value: &TagValueSet) {
116-
let config = *self.get_config_for_metric(metric_key);
117-
let metric_accepted_tags = self.accepted_tags.entry(metric_key.cloned()).or_default();
118-
metric_accepted_tags
119-
.entry_ref(key)
120-
.or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode))
121-
.insert(value.clone());
122-
}
123-
124126
pub fn transform_one(&mut self, mut event: Event) -> Option<Event> {
125127
let metric = event.as_mut_metric();
126128
let metric_name = metric.name().to_string();
@@ -135,50 +137,44 @@ impl TagCardinalityLimit {
135137
None
136138
};
137139
if let Some(tags_map) = metric.tags_mut() {
138-
match self
139-
.get_config_for_metric(metric_key.as_ref())
140-
.limit_exceeded_action
141-
{
142-
LimitExceededAction::DropEvent => {
143-
// This needs to check all the tags, to ensure that the ordering of tag names
144-
// doesn't change the behavior of the check.
145-
146-
for (key, value) in tags_map.iter_sets() {
147-
if self.tag_limit_exceeded(metric_key.as_ref(), key, value) {
148-
let config = self.get_config_for_metric(metric_key.as_ref());
149-
emit!(TagCardinalityLimitRejectingEvent {
150-
metric_name: &metric_name,
151-
tag_key: key,
152-
tag_value: &value.to_string(),
153-
include_extended_tags: config
154-
.internal_metrics
155-
.include_extended_tags,
156-
});
157-
return None;
158-
}
159-
}
160-
for (key, value) in tags_map.iter_sets() {
161-
self.record_tag_value(metric_key.as_ref(), key, value);
162-
}
163-
}
164-
LimitExceededAction::DropTag => {
165-
let config = self.get_config_for_metric(metric_key.as_ref());
166-
let include_extended_tags = config.internal_metrics.include_extended_tags;
167-
tags_map.retain(|key, value| {
168-
if self.try_accept_tag(metric_key.as_ref(), key, value) {
169-
true
170-
} else {
171-
emit!(TagCardinalityLimitRejectingTag {
172-
metric_name: &metric_name,
173-
tag_key: key,
174-
tag_value: &value.to_string(),
175-
include_extended_tags,
176-
});
177-
false
178-
}
140+
// Pre-check pass: tags whose resolved action is `DropEvent` must be checked before
141+
// any mutation, so the drop decision is order-independent across tag iteration.
142+
for (key, value) in tags_map.iter_sets() {
143+
let resolved = self.resolve_tag(metric_key.as_ref(), key);
144+
if resolved.limit_exceeded_action == LimitExceededAction::DropEvent
145+
&& self.tag_limit_exceeded(metric_key.as_ref(), key, value)
146+
{
147+
let include_extended_tags = resolved.internal_metrics.include_extended_tags;
148+
emit!(TagCardinalityLimitRejectingEvent {
149+
metric_name: &metric_name,
150+
tag_key: key,
151+
tag_value: &value.to_string(),
152+
include_extended_tags,
179153
});
154+
return None;
180155
}
181156
}
157+
158+
// Apply pass: accept each tag. Tags whose resolved action is `DropEvent` are
159+
// guaranteed to pass after the pre-check; a rejection here means the resolved
160+
// action is `DropTag`.
161+
tags_map.retain(|key, value| {
162+
if self.try_accept_tag(metric_key.as_ref(), key, value) {
163+
true
164+
} else {
165+
let include_extended_tags = self
166+
.resolve_tag(metric_key.as_ref(), key)
167+
.internal_metrics
168+
.include_extended_tags;
169+
emit!(TagCardinalityLimitRejectingTag {
170+
metric_name: &metric_name,
171+
tag_key: key,
172+
tag_value: &value.to_string(),
173+
include_extended_tags,
174+
});
175+
false
176+
}
177+
});
182178
}
183179
Some(event)
184180
}

0 commit comments

Comments
 (0)