Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
The `tag_cardinality_limit` transform now supports `mode: exact_fingerprint`, a new storage
mode that can reduce memory usage for high-cardinality tag values compared to
`mode: exact`. Instead of storing the full tag-value strings, only a 64 bit fingerprint hash of
each value is kept. The trade-off is that throughput is slightly impacted due to extra hashing
operations, and there is technically a (unlikely) chance of collisions at very high cardinalities

authors: ArunPiduguDD
11 changes: 11 additions & 0 deletions src/transforms/tag_cardinality_limit/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ pub enum Mode {
/// metrics with new tags after the limit has been hit.
Exact,

/// This mode operates similarly to `exact` mode except it tracks cardinality using 64-bit hash fingerprints
/// of tag values instead of the original strings. This leads to lower memory requirements in most
/// scenarios (assuming average tag value size is greater than 8 bytes) at the cost of slightly
/// reduced throughput due to extra hashing operations and a very small chance of collisions at
/// very high cardinalities
ExactFingerprint,

/// Tracks cardinality probabilistically.
///
/// This mode has lower memory requirements than `exact`, but may occasionally allow metric
Expand Down Expand Up @@ -183,6 +190,9 @@ pub enum OverrideMode {
/// Tracks cardinality exactly. See `Mode::Exact` for details.
Exact,

/// Tracks cardinality using 64-bit hash fingerprints. See `Mode::ExactFingerprint` for details.
ExactFingerprint,

/// Tracks cardinality probabilistically. See `Mode::Probabilistic` for details.
Probabilistic(BloomFilterConfig),

Expand All @@ -196,6 +206,7 @@ impl OverrideMode {
pub const fn as_mode(&self) -> Option<Mode> {
match self {
OverrideMode::Exact => Some(Mode::Exact),
OverrideMode::ExactFingerprint => Some(Mode::ExactFingerprint),
OverrideMode::Probabilistic(b) => Some(Mode::Probabilistic(*b)),
OverrideMode::Excluded => None,
}
Expand Down
109 changes: 106 additions & 3 deletions src/transforms/tag_cardinality_limit/tag_value_set.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
use std::{collections::HashSet, fmt};
use std::{
collections::HashSet,
fmt,
hash::{BuildHasher, BuildHasherDefault},
};

use bloomy::BloomFilter;
use hash_hasher::HashedSet;
use seahash::SeaHasher;

use crate::{event::metric::TagValueSet, transforms::tag_cardinality_limit::config::Mode};

/// Container for storing the set of accepted values for a given tag key.
///
/// # Storage backend selection
///
/// | `Mode` | Storage |
/// |----------------------|---------------------------------|
/// | `Exact` | `HashSet<TagValueSet>` |
/// | `ExactFingerprint` | `HashSet<u64>` (fingerprints) |
/// | `Probabilistic` | `BloomFilter |

#[derive(Debug)]
pub struct AcceptedTagValueSet {
storage: TagValueSetStorage,
Expand All @@ -13,6 +28,8 @@ pub struct AcceptedTagValueSet {
enum TagValueSetStorage {
Set(HashSet<TagValueSet>),
Bloom(BloomFilterStorage),
/// Stores 64-bit hash fingerprints of accepted tag values
Fingerprint(FingerprintStorage),
}

/// A bloom filter that tracks the number of items inserted into it.
Expand Down Expand Up @@ -49,19 +66,48 @@ impl BloomFilterStorage {
}
}

#[derive(Default)]
struct FingerprintStorage {
fingerprints: HashedSet<u64>,
}

impl FingerprintStorage {
/// Compute a 64-bit fingerprint of a tag value
fn fingerprint(value: &TagValueSet) -> u64 {
BuildHasherDefault::<SeaHasher>::default().hash_one(value)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be any value in pre-computing BuildHasherDefault::<SeaHasher>::default() when FingerprintStorage is created? You might want to seed it differently for each storage unit to avoid known-seed collision attacks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, what is a scenario where we would need to guard against known-seed collision attacks in Vector?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The path would be being able to synthetically cause a collision with attacker-controlled data. In this case that would cause an undercount, allowing through a higher cardinality than otherwise IIUC, which amounts to a DoS attack due to service costs. Having an unknown seed makes that much harder, particularly if Vector is used in a cluster where every node has different seeds. The question then is if this is concerning.

FWIW microbenchmarks confirm that the setup of the hasher is free for this use.

}

fn insert(&mut self, value: &TagValueSet) {
self.fingerprints.insert(Self::fingerprint(value));
}

fn contains(&self, value: &TagValueSet) -> bool {
self.fingerprints.contains(&Self::fingerprint(value))
}

fn len(&self) -> usize {
self.fingerprints.len()
}
}

impl fmt::Debug for TagValueSetStorage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TagValueSetStorage::Set(set) => write!(f, "Set({set:?})"),
TagValueSetStorage::Bloom(_) => write!(f, "Bloom"),
TagValueSetStorage::Fingerprint(_) => write!(f, "Fingerprint"),
}
}
}

impl AcceptedTagValueSet {
/// Create a new `AcceptedTagValueSet` for the given mode.
pub fn new(mode: &Mode) -> Self {
let storage = match &mode {
Mode::Exact => TagValueSetStorage::Set(HashSet::new()),
Mode::ExactFingerprint => {
TagValueSetStorage::Fingerprint(FingerprintStorage::default())
}
Mode::Probabilistic(config) => {
TagValueSetStorage::Bloom(BloomFilterStorage::new(config.cache_size_per_key))
}
Expand All @@ -73,13 +119,15 @@ impl AcceptedTagValueSet {
match &self.storage {
TagValueSetStorage::Set(set) => set.contains(value),
TagValueSetStorage::Bloom(bloom) => bloom.contains(value),
TagValueSetStorage::Fingerprint(fp) => fp.contains(value),
}
}

pub fn len(&self) -> usize {
match &self.storage {
TagValueSetStorage::Set(set) => set.len(),
TagValueSetStorage::Bloom(bloom) => bloom.count(),
TagValueSetStorage::Fingerprint(fp) => fp.len(),
}
}

Expand All @@ -89,14 +137,18 @@ impl AcceptedTagValueSet {
set.insert(value);
}
TagValueSetStorage::Bloom(bloom) => bloom.insert(&value),
TagValueSetStorage::Fingerprint(fp) => fp.insert(&value),
};
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{event::metric::TagValueSet, transforms::tag_cardinality_limit::config::Mode};
use crate::{
event::metric::TagValueSet,
transforms::tag_cardinality_limit::config::{BloomFilterConfig, Mode},
};

#[test]
fn test_accepted_tag_value_set_exact() {
Expand All @@ -116,7 +168,11 @@ mod tests {

#[test]
fn test_accepted_tag_value_set_probabilistic() {
let mut accepted_tag_value_set = AcceptedTagValueSet::new(&Mode::Exact);
// Previously this test mistakenly constructed Mode::Exact; fixed to use Probabilistic.
let mut accepted_tag_value_set =
AcceptedTagValueSet::new(&Mode::Probabilistic(BloomFilterConfig {
cache_size_per_key: 5 * 1024,
}));

assert!(!accepted_tag_value_set.contains(&TagValueSet::from(["value1".to_string()])));
assert_eq!(accepted_tag_value_set.len(), 0);
Expand All @@ -134,4 +190,51 @@ mod tests {
assert_eq!(accepted_tag_value_set.len(), 2);
assert!(accepted_tag_value_set.contains(&TagValueSet::from(["value2".to_string()])));
}

#[test]
fn test_accepted_tag_value_set_fingerprint() {
let mut set = AcceptedTagValueSet::new(&Mode::ExactFingerprint);

assert!(!set.contains(&TagValueSet::from(["value1".to_string()])));
assert_eq!(set.len(), 0);

set.insert(TagValueSet::from(["value1".to_string()]));
assert_eq!(set.len(), 1);
assert!(set.contains(&TagValueSet::from(["value1".to_string()])));

// Inserting the same value again must not increase the count.
set.insert(TagValueSet::from(["value1".to_string()]));
assert_eq!(set.len(), 1);

set.insert(TagValueSet::from(["value2".to_string()]));
assert_eq!(set.len(), 2);
assert!(set.contains(&TagValueSet::from(["value2".to_string()])));

// An un-inserted value must not appear to be contained.
assert!(!set.contains(&TagValueSet::from(["value3".to_string()])));

// Fingerprinting is deterministic, so a separate set must agree on membership.
let mut set2 = AcceptedTagValueSet::new(&Mode::ExactFingerprint);
set2.insert(TagValueSet::from(["value1".to_string()]));
assert!(set2.contains(&TagValueSet::from(["value1".to_string()])));
assert!(!set2.contains(&TagValueSet::from(["value3".to_string()])));
}

#[test]
fn test_fingerprint_distribution_no_collisions() {
// Empirically guards the "good distribution" claim: inserting many distinct values
// must yield an equal number of distinct fingerprints. At 64 bits the birthday
// collision probability for 100k values is ~2.7e-10, so any collision here would
// indicate a badly-distributed hash rather than bad luck.
let mut set = AcceptedTagValueSet::new(&Mode::ExactFingerprint);
let n = 100_000;
for i in 0..n {
set.insert(TagValueSet::from([format!("tag-value-{i}")]));
}
assert_eq!(
set.len(),
n,
"distinct values must produce distinct fingerprints"
);
}
}
110 changes: 110 additions & 0 deletions src/transforms/tag_cardinality_limit/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ fn make_transform_bloom_with_per_metric_limits(
}
}

fn make_transform_fingerprint(
value_limit: usize,
limit_exceeded_action: LimitExceededAction,
) -> Config {
Config {
global: Inner {
value_limit,
limit_exceeded_action,
mode: Mode::ExactFingerprint,
internal_metrics: InternalMetricsConfig::default(),
},
tracking_scope: TrackingScope::default(),
max_tracked_keys: None,
per_metric_limits: HashMap::new(),
per_tag_limits: HashMap::new(),
}
}

fn make_transform_with_global_per_tag_limits(
value_limit: usize,
limit_exceeded_action: LimitExceededAction,
Expand Down Expand Up @@ -151,6 +169,15 @@ async fn tag_cardinality_limit_drop_event_bloom() {
drop_event(make_transform_bloom(2, LimitExceededAction::DropEvent)).await;
}

#[tokio::test]
async fn tag_cardinality_limit_drop_event_fingerprint() {
drop_event(make_transform_fingerprint(
2,
LimitExceededAction::DropEvent,
))
.await;
}

async fn drop_event(config: Config) {
assert_transform_compliance(async move {
let mut event1 = make_metric(metric_tags!("tag1" => "val1"));
Expand Down Expand Up @@ -203,6 +230,11 @@ async fn tag_cardinality_limit_drop_tag_bloom() {
drop_tag(make_transform_bloom(2, LimitExceededAction::DropTag)).await;
}

#[tokio::test]
async fn tag_cardinality_limit_drop_tag_fingerprint() {
drop_tag(make_transform_fingerprint(2, LimitExceededAction::DropTag)).await;
}

async fn drop_tag(config: Config) {
assert_transform_compliance(async move {
let tags1 = metric_tags!("tag1" => "val1", "tag2" => "val1");
Expand Down Expand Up @@ -1235,6 +1267,11 @@ fn global_per_tag_excluded_drop_tag_passthrough_bloom() {
}));
}

#[test]
fn global_per_tag_excluded_drop_tag_passthrough_fingerprint() {
global_per_tag_excluded_drop_tag_passthrough(Mode::ExactFingerprint);
}

/// A globally-excluded tag passes through unchanged on every metric, even when its values
/// would have exceeded `value_limit`. Sibling non-excluded tags still respect the limit.
fn global_per_tag_excluded_drop_tag_passthrough(mode: Mode) {
Expand Down Expand Up @@ -1287,6 +1324,11 @@ fn global_per_tag_excluded_drop_event_passthrough_bloom() {
}));
}

#[test]
fn global_per_tag_excluded_drop_event_passthrough_fingerprint() {
global_per_tag_excluded_drop_event_passthrough(Mode::ExactFingerprint);
}

/// Under `DropEvent`, a globally-excluded tag never triggers a drop, but a non-excluded
/// tag exceeding `value_limit` still does.
fn global_per_tag_excluded_drop_event_passthrough(mode: Mode) {
Expand Down Expand Up @@ -1461,3 +1503,71 @@ per_tag_limits:
let excluded = parsed.per_tag_limits.get("excluded_tag").unwrap();
assert_eq!(excluded.mode, PerTagMode::Excluded);
}

/// A re-sent already-accepted tag value must pass through even after the limit is hit,
/// for both DropTag and DropEvent actions.
#[test]
fn fingerprint_accepted_value_passes_through_after_limit() {
for action in [LimitExceededAction::DropTag, LimitExceededAction::DropEvent] {
let mut transform = TagCardinalityLimit::new(make_transform_fingerprint(2, action));
transform
.transform_one(make_metric(metric_tags!("env" => "prod")))
.unwrap();
transform
.transform_one(make_metric(metric_tags!("env" => "staging")))
.unwrap();
// Limit now hit; re-send of an already-accepted value must still pass through.
let e = transform
.transform_one(make_metric(metric_tags!("env" => "prod")))
.unwrap();
assert_eq!("prod", e.as_metric().tags().unwrap().get("env").unwrap());
}
}

/// Fingerprint mode must never allocate a tracking entry for a tag that is globally
/// excluded, matching the `Mode::Exact` "never allocate" contract.
#[test]
fn fingerprint_excluded_tag_never_populates_cache() {
let config = make_transform_with_global_per_tag_limits(
2,
LimitExceededAction::DropTag,
Mode::ExactFingerprint,
HashMap::from([("kube_pod_name".to_string(), make_per_tag_excluded())]),
);
let mut transform = TagCardinalityLimit::new(config);

for i in 0..10 {
let event = make_metric(metric_tags!(
"kube_pod_name" => format!("pod-{i}").as_str(),
"tag1" => "val1"
));
transform.transform_one(event).unwrap();
}

let bucket = transform
.accepted_tags
.get(&None)
.expect("non-excluded tag1 should still allocate a global bucket");
assert!(
bucket.contains_key("tag1"),
"non-excluded tag must still be tracked"
);
assert!(
!bucket.contains_key("kube_pod_name"),
"excluded tag key must never enter the fingerprint cache"
);
}

/// Fingerprint mode YAML round-trips: `mode: exact_fingerprint` deserializes cleanly.
#[test]
fn fingerprint_mode_deserializes() {
let yaml = "mode: exact_fingerprint";
let mode: Mode = serde_yaml::from_str(yaml).expect("exact_fingerprint should deserialize");
assert_eq!(mode, Mode::ExactFingerprint);

let serialized = serde_yaml::to_string(&mode).expect("should serialize");
assert!(
serialized.contains("exact_fingerprint"),
"serialized form should contain 'exact_fingerprint'"
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ generated: components: transforms: tag_cardinality_limit: configuration: {
This mode has higher memory requirements than `probabilistic`, but never falsely outputs
metrics with new tags after the limit has been hit.
"""
exact_fingerprint: """
This mode operates similarly to `exact` mode except it tracks cardinality using 64-bit hash fingerprints
of tag values instead of the original strings. This leads to lower memory requirements in most
scenarios (assuming average tag value size is greater than 8 bytes) at the cost of slightly
reduced throughput due to extra hashing operations and a very small chance of collisions at
very high cardinalities

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
very high cardinalities
very high cardinalities.

"""
probabilistic: """
Tracks cardinality probabilistically.

Expand Down Expand Up @@ -126,7 +133,8 @@ generated: components: transforms: tag_cardinality_limit: configuration: {
description: "Controls the approach taken for tracking tag cardinality."
required: true
type: string: enum: {
exact: "Tracks cardinality exactly. See `Mode::Exact` for details."
exact: "Tracks cardinality exactly. See `Mode::Exact` for details."
exact_fingerprint: "Tracks cardinality using 64-bit hash fingerprints. See `Mode::ExactFingerprint` for details."
excluded: """
Skip cardinality tracking for this metric. All tag values pass through and nothing is
limited. Other fields in this per-metric configuration are ignored when this is selected.
Expand Down
Loading
Loading