Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/dnstap-parser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ hickory-proto.workspace = true
prost.workspace = true
snafu.workspace = true
tracing.workspace = true
vector-config = { path = "../vector-config" }
vector-lib = { path = "../vector-lib" }
vrl.workspace = true
paste.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions lib/dnstap-parser/src/internal_events.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use tracing::warn;
use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
use vector_config::internal_event;

#[internal_event]
#[derive(Debug)]
pub(crate) struct DnstapParseWarning<E> {
pub error: E,
Expand Down
6 changes: 6 additions & 0 deletions lib/vector-buffers/src/internal_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use vector_common::{
internal_event::{InternalEvent, error_type},
registered_event,
};
use vector_config::internal_event;

#[internal_event]
pub struct BufferCreated {
pub buffer_id: String,
pub idx: usize,
Expand Down Expand Up @@ -35,6 +37,7 @@ impl InternalEvent for BufferCreated {
}
}

#[internal_event]
pub struct BufferEventsReceived {
pub buffer_id: String,
pub idx: usize,
Expand Down Expand Up @@ -75,6 +78,7 @@ impl InternalEvent for BufferEventsReceived {
}
}

#[internal_event]
pub struct BufferEventsSent {
pub buffer_id: String,
pub idx: usize,
Expand Down Expand Up @@ -114,6 +118,7 @@ impl InternalEvent for BufferEventsSent {
}
}

#[internal_event]
pub struct BufferEventsDropped {
pub buffer_id: String,
pub idx: usize,
Expand Down Expand Up @@ -180,6 +185,7 @@ impl InternalEvent for BufferEventsDropped {
}
}

#[internal_event]
pub struct BufferReadError {
pub error_code: &'static str,
pub error: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use metrics::{Counter, counter};
use vector_config::internal_event;

use super::{Count, InternalEvent, InternalEventHandle, RegisterInternalEvent};

pub const INTENTIONAL: bool = true;
pub const UNINTENTIONAL: bool = false;

#[internal_event]
#[derive(Debug)]
pub struct ComponentEventsDropped<'a, const INTENTIONAL: bool> {
pub count: usize,
Expand All @@ -16,10 +18,6 @@ impl<const INTENTIONAL: bool> InternalEvent for ComponentEventsDropped<'_, INTEN
let count = self.count;
self.register().emit(Count(count));
}

fn name(&self) -> Option<&'static str> {
Some("ComponentEventsDropped")
}
}

impl<'a, const INTENTIONAL: bool> From<&'a str> for ComponentEventsDropped<'a, INTENTIONAL> {
Expand Down
27 changes: 12 additions & 15 deletions lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ pub use service::{CallError, PollReadyError};

use crate::json_size::JsonSize;

pub trait InternalEvent: Sized {
fn emit(self);
pub trait NamedInternalEvent {
fn name(&self) -> &'static str;
}

// Optional for backwards compat until all events implement this
fn name(&self) -> Option<&'static str> {
None
}
pub trait InternalEvent: NamedInternalEvent + Sized {
fn emit(self);
}

#[allow(clippy::module_name_repetitions)]
Expand Down Expand Up @@ -60,29 +59,27 @@ impl<E: InternalEvent> InternalEvent for DefaultName<E> {
fn emit(self) {
self.event.emit();
}
}

fn name(&self) -> Option<&'static str> {
Some(self.event.name().unwrap_or(self.name))
impl<E: NamedInternalEvent> NamedInternalEvent for DefaultName<E> {
fn name(&self) -> &'static str {
self.event.name()
}
}

impl<E: RegisterInternalEvent> RegisterInternalEvent for DefaultName<E> {
impl<E: RegisterInternalEvent + NamedInternalEvent> RegisterInternalEvent for DefaultName<E> {
type Handle = E::Handle;

fn register(self) -> Self::Handle {
self.event.register()
}

fn name(&self) -> Option<&'static str> {
Some(self.event.name().unwrap_or(self.name))
}
fn name(&self) -> Option<&'static str> { Some(NamedInternalEvent::name(&self.event)) }
}

#[cfg(any(test, feature = "test"))]
pub fn emit(event: impl InternalEvent) {
if let Some(name) = event.name() {
super::event_test_util::record_internal_event(name);
}
super::event_test_util::record_internal_event(event.name());
event.emit();
}

Expand Down
11 changes: 3 additions & 8 deletions lib/vector-common/src/internal_event/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use metrics::counter;
use vector_config::internal_event;

use super::{ComponentEventsDropped, InternalEvent, UNINTENTIONAL, emit, error_stage, error_type};

#[internal_event]
#[derive(Debug)]
pub struct PollReadyError<E> {
pub error: E,
Expand All @@ -23,12 +25,9 @@ impl<E: std::fmt::Debug> InternalEvent for PollReadyError<E> {
)
.increment(1);
}

fn name(&self) -> Option<&'static str> {
Some("ServicePollReadyError")
}
}

#[internal_event]
#[derive(Debug)]
pub struct CallError<E> {
pub error: E,
Expand Down Expand Up @@ -59,8 +58,4 @@ impl<E: std::fmt::Debug> InternalEvent for CallError<E> {
count: self.count,
});
}

fn name(&self) -> Option<&'static str> {
Some("ServiceCallError")
}
}
41 changes: 41 additions & 0 deletions lib/vector-config-macros/src/internal_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, ItemStruct};

// #[internal_event] attribute macro implementation.
//
// Apply to a struct that also implements `InternalEvent`. This generates an
// implementation of `NamedInternalEvent` so `InternalEvent::name()` returns
// a canonical &'static str for the event's type.
pub fn internal_event_impl(item: TokenStream) -> TokenStream {
let item_struct = parse_macro_input!(item as ItemStruct);

let ident = &item_struct.ident;
let generics = item_struct.generics.clone();
let (impl_generics, ty_generics, where_clause2) = generics.split_for_impl();

// Use a path that works from both vector-common (crate::internal_event)
// and from other crates using vector-lib (vector_lib::internal_event).
// For crates that don't depend on vector-lib but do depend on vector-common,
// we use vector_common::internal_event.
let pkg_name = std::env::var("CARGO_PKG_NAME").unwrap_or_default();
let internal_event_path = if pkg_name == "vector-common" {
quote! { crate::internal_event }
} else if pkg_name == "vector-buffers" || pkg_name.starts_with("vector-") {
// Most vector-* crates depend on vector-common but not vector-lib
quote! { ::vector_common::internal_event }
} else {
// Main vector crate and its internal modules use vector_lib
quote! { ::vector_lib::internal_event }
};

let expanded = quote! {
#item_struct

impl #impl_generics #internal_event_path::NamedInternalEvent for #ident #ty_generics #where_clause2 {
fn name(&self) -> &'static str { stringify!(#ident) }
}
};

TokenStream::from(expanded)
}
39 changes: 39 additions & 0 deletions lib/vector-config-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod attrs;
mod component_name;
mod configurable;
mod configurable_component;
mod internal_event;

/// Designates a type as being part of a Vector configuration.
///
Expand Down Expand Up @@ -110,3 +111,41 @@ pub fn derive_configurable(input: TokenStream) -> TokenStream {
pub fn derive_component_name(input: TokenStream) -> TokenStream {
component_name::derive_component_name_impl(input)
}

/// Marks an internal event struct for automatic name reporting.
///
/// Apply this attribute to the struct type that implements `InternalEvent`.
/// It generates an implementation of `NamedInternalEvent` so `InternalEvent::name()`
/// returns a stable, compile-time string for the event type.
///
/// Usage:
///
/// ```ignore
/// use vector_config::internal_event;
/// use vector_lib::internal_event::InternalEvent;
///
/// #[internal_event]
/// #[derive(Debug)]
/// pub struct UdpSendIncompleteError {
/// pub data_size: usize,
/// pub sent: usize,
/// }
///
/// impl InternalEvent for UdpSendIncompleteError {
/// fn emit(self) {
/// // ... emit metrics/logging ...
/// }
/// }
///
/// // Later, `UdpSendIncompleteError.name()` yields the full type path.
/// ```
///
/// Notes:
/// - The attribute has no parameters.
/// - Works with generics and lifetimes on the struct.
/// - The returned name is the struct identifier, via `stringify!(TypeName)`.
/// This avoids canonicalized module paths and yields stable event names.
#[proc_macro_attribute]
pub fn internal_event(_attrs: TokenStream, item: TokenStream) -> TokenStream {
internal_event::internal_event_impl(item)
}
31 changes: 7 additions & 24 deletions src/enrichment_tables/memory/internal_events.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use metrics::{counter, gauge};
use vector_config::internal_event;
use vector_lib::{configurable::configurable_component, internal_event::InternalEvent};

/// Configuration of internal metrics for enrichment memory table.
Expand All @@ -14,6 +15,7 @@ pub struct InternalMetricsConfig {
pub include_key_tag: bool,
}

#[internal_event]
#[derive(Debug)]
pub(crate) struct MemoryEnrichmentTableRead<'a> {
pub key: &'a str,
Expand All @@ -32,12 +34,9 @@ impl InternalEvent for MemoryEnrichmentTableRead<'_> {
counter!("memory_enrichment_table_reads_total",).increment(1);
}
}

fn name(&self) -> Option<&'static str> {
Some("MemoryEnrichmentTableRead")
}
}

#[internal_event]
#[derive(Debug)]
pub(crate) struct MemoryEnrichmentTableInserted<'a> {
pub key: &'a str,
Expand All @@ -56,12 +55,9 @@ impl InternalEvent for MemoryEnrichmentTableInserted<'_> {
counter!("memory_enrichment_table_insertions_total",).increment(1);
}
}

fn name(&self) -> Option<&'static str> {
Some("MemoryEnrichmentTableInserted")
}
}

#[internal_event]
#[derive(Debug)]
pub(crate) struct MemoryEnrichmentTableFlushed {
pub new_objects_count: usize,
Expand All @@ -74,12 +70,9 @@ impl InternalEvent for MemoryEnrichmentTableFlushed {
gauge!("memory_enrichment_table_objects_count",).set(self.new_objects_count as f64);
gauge!("memory_enrichment_table_byte_size",).set(self.new_byte_size as f64);
}

fn name(&self) -> Option<&'static str> {
Some("MemoryEnrichmentTableFlushed")
}
}

#[internal_event]
#[derive(Debug)]
pub(crate) struct MemoryEnrichmentTableTtlExpired<'a> {
pub key: &'a str,
Expand All @@ -98,12 +91,9 @@ impl InternalEvent for MemoryEnrichmentTableTtlExpired<'_> {
counter!("memory_enrichment_table_ttl_expirations",).increment(1);
}
}

fn name(&self) -> Option<&'static str> {
Some("MemoryEnrichmentTableTtlExpired")
}
}

#[internal_event]
#[derive(Debug)]
pub(crate) struct MemoryEnrichmentTableReadFailed<'a> {
pub key: &'a str,
Expand All @@ -122,12 +112,9 @@ impl InternalEvent for MemoryEnrichmentTableReadFailed<'_> {
counter!("memory_enrichment_table_failed_reads",).increment(1);
}
}

fn name(&self) -> Option<&'static str> {
Some("MemoryEnrichmentTableReadFailed")
}
}

#[internal_event]
#[derive(Debug)]
pub(crate) struct MemoryEnrichmentTableInsertFailed<'a> {
pub key: &'a str,
Expand All @@ -146,8 +133,4 @@ impl InternalEvent for MemoryEnrichmentTableInsertFailed<'_> {
counter!("memory_enrichment_table_failed_insertions",).increment(1);
}
}

fn name(&self) -> Option<&'static str> {
Some("MemoryEnrichmentTableInsertFailed")
}
}
4 changes: 4 additions & 0 deletions src/internal_events/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use metrics::counter;
use vector_config::internal_event;
use vector_lib::internal_event::InternalEvent;

#[internal_event]
#[derive(Debug)]
pub struct AggregateEventRecorded;

Expand All @@ -10,6 +12,7 @@ impl InternalEvent for AggregateEventRecorded {
}
}

#[internal_event]
#[derive(Debug)]
pub struct AggregateFlushed;

Expand All @@ -19,6 +22,7 @@ impl InternalEvent for AggregateFlushed {
}
}

#[internal_event]
#[derive(Debug)]
pub struct AggregateUpdateFailed;

Expand Down
Loading
Loading