Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
6583fd7
fix(enrichment): preserve memory enrichment table state on reload
esensar Jun 1, 2026
58b908e
Add changelog entry
esensar Jun 1, 2026
04110a3
Restore old state when building the new table
esensar Jun 2, 2026
0e0952c
Fix reload handling for enrichment tables
esensar Jun 2, 2026
0044cc7
Merge branch 'master' into fix/memory-table-reload-state
esensar Jun 2, 2026
a696cb7
Add additional types for enrichment tables which act as more componen…
esensar Jun 3, 2026
664d2ae
Change stateful flag check with sink check
esensar Jun 3, 2026
bf3173f
Merge branch 'master' into fix/memory-table-reload-state
esensar Jun 3, 2026
d36e743
Remove unused enrichment error
esensar Jun 5, 2026
ed83146
Simplify state extraction from tables
esensar Jun 5, 2026
1ed0732
Remove another unused error from enrichment
esensar Jun 5, 2026
4ab2ce3
Add a test case for memory table state preservation
esensar Jun 5, 2026
f4cb301
Remove obsolete trait methods from enrichment tests
esensar Jun 5, 2026
e9f9738
Prevent needless clone when extracting state from enrichment tables
esensar Jun 8, 2026
961e72f
Merge branch 'master' into fix/memory-table-reload-state
esensar Jun 8, 2026
79eb2b1
Add topology test for enrichment table state extraction
esensar Jun 8, 2026
2fdd859
Merge branch 'master' into fix/memory-table-reload-state
pront Jun 12, 2026
b05daa4
Merge branch 'master' into fix/memory-table-reload-state
pront Jun 16, 2026
12939a0
Add a configuration option for state preservation for memory table
esensar Jun 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/25547_memory_table_state_loss.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed an issue where memory enrichment table state could be lost or detached from the sink component on configuration reload.

authors: esensar Quad9DNS
5 changes: 5 additions & 0 deletions lib/vector-vrl/enrichment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ pub trait Table: DynClone {

/// Returns true if the underlying data has changed and the table needs reloading.
fn needs_reload(&self) -> bool;

/// Extracts state from this table
fn extract_state(&self) -> Option<Box<dyn std::any::Any + Send + Sync>> {
None
}
}

dyn_clone::clone_trait_object!(Table);
Expand Down
8 changes: 8 additions & 0 deletions lib/vector-vrl/enrichment/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ impl TableRegistry {
None => true,
}
}

/// Extracts state from the table if available.
pub fn extract_state(&self, table: &str) -> Option<Box<dyn std::any::Any + Send + Sync>> {
match &**self.tables.load() {
Some(tables) => tables.get(table).and_then(|t| t.extract_state()),
None => None,
}
}
}

impl std::fmt::Debug for TableRegistry {
Expand Down
10 changes: 9 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,11 +616,19 @@ pub async fn load_configs(
for (name, table) in config.enrichment_tables() {
let files = table.inner.files_to_watch();
let component_config = ComponentConfig::new(
files.into_iter().cloned().collect(),
files.clone().into_iter().cloned().collect(),
name.clone(),
ComponentType::EnrichmentTable,
);
watched_component_paths.push(component_config);
if table.as_sink(name).is_some() {
let sink_component_config = ComponentConfig::new(
files.into_iter().cloned().collect(),
name.clone(),
ComponentType::Sink,
);
watched_component_paths.push(sink_component_config);
}
}

info!(
Expand Down
9 changes: 6 additions & 3 deletions src/config/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl ConfigDiff {
enrichment_tables: Difference::from_enrichment_tables(
&old.enrichment_tables,
&new.enrichment_tables,
&components_to_reload,
),
components_to_reload,
}
Expand Down Expand Up @@ -56,15 +57,15 @@ impl ConfigDiff {
self.sources.is_changed(key)
|| self.transforms.is_changed(key)
|| self.sinks.is_changed(key)
|| self.enrichment_tables.contains(key)
|| self.enrichment_tables.is_changed(key)
}

/// Checks whether the given component is removed.
pub fn is_removed(&self, key: &ComponentKey) -> bool {
self.sources.is_removed(key)
|| self.transforms.is_removed(key)
|| self.sinks.is_removed(key)
|| self.enrichment_tables.contains(key)
|| self.enrichment_tables.is_removed(key)
}
}

Expand Down Expand Up @@ -116,6 +117,7 @@ impl Difference {
fn from_enrichment_tables(
old: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
new: &IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
need_change: &HashSet<ComponentKey>,
) -> Self {
let old_table_keys = extract_table_component_keys(old);
let new_table_keys = extract_table_component_keys(new);
Expand All @@ -131,7 +133,7 @@ impl Difference {
// which can iterate in varied orders.
let old_value = serde_json::to_value(&old[*table_key]).unwrap();
let new_value = serde_json::to_value(&new[*table_key]).unwrap();
old_value != new_value
old_value != new_value || need_change.contains(*table_key)
})
.cloned()
.map(|(_table_key, derived_component_key)| derived_component_key)
Expand Down Expand Up @@ -305,6 +307,7 @@ mod tests {
let diff = Difference::from_enrichment_tables(
&old_config.enrichment_tables,
&new_config.enrichment_tables,
&Default::default(),
);

assert_eq!(diff.to_add, HashSet::from_iter(["memory_table_new".into()]));
Expand Down
8 changes: 7 additions & 1 deletion src/config/enrichment_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ where
/// Generalized interface for describing and building enrichment table components.
#[enum_dispatch]
pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync {
/// Builds the enrichment table with the given globals.
/// Builds the enrichment table with the given globals and previous table state.
///
/// If the enrichment table is built successfully, `Ok(...)` is returned containing the
/// enrichment table.
Expand All @@ -124,8 +124,14 @@ pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync
async fn build(
&self,
globals: &GlobalOptions,
prev_state: Option<Box<dyn std::any::Any + Send + Sync>>,
) -> crate::Result<Box<dyn vector_lib::enrichment::Table + Send + Sync>>;

/// Checks whether this table wants previous state, to try and restore it.
fn wants_previous_state(&self) -> bool {
false
}

fn sink_config(
&self,
_default_key: &ComponentKey,
Expand Down
2 changes: 1 addition & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub use vector_lib::{
};

#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
// // This is not a comprehensive set; variants are added as needed.
// This is not a comprehensive set; variants are added as needed.
pub enum ComponentType {
Transform,
Sink,
Expand Down
1 change: 1 addition & 0 deletions src/enrichment_tables/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ impl EnrichmentTableConfig for FileConfig {
async fn build(
&self,
globals: &crate::config::GlobalOptions,
_prev_state: Option<Box<dyn std::any::Any + Send + Sync>>,
) -> crate::Result<Box<dyn Table + Send + Sync>> {
Ok(Box::new(File::new(
self.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/enrichment_tables/geoip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl EnrichmentTableConfig for GeoipConfig {
async fn build(
&self,
_: &crate::config::GlobalOptions,
_: Option<Box<dyn std::any::Any + Send + Sync>>,
) -> crate::Result<Box<dyn Table + Send + Sync>> {
Ok(Box::new(Geoip::new(self.clone())?))
}
Expand Down
41 changes: 36 additions & 5 deletions src/enrichment_tables/memory/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,27 @@ pub struct MemoryConfig {
#[configurable(derived)]
#[serde(default)]
pub ttl_field: OptionalValuePath,
/// Behavior for memory table state on configuration reload.
#[configurable(derived)]
#[serde(default)]
pub reload_behavior: ReloadBehavior,

#[serde(skip)]
memory: Arc<Mutex<Option<Box<Memory>>>>,
}

/// Behavior for memory enrichment table state on configuration reload.
#[configurable_component]
#[derive(Clone, Default)]
#[serde(rename_all = "kebab-case")]
pub enum ReloadBehavior {
/// Always clear state on configuration reload.
#[default]
ClearState,
/// Try to preserve state when possible.
PreserveState,
}

/// Configuration for memory enrichment table source functionality.
#[configurable_component]
#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -123,6 +139,7 @@ impl Default for MemoryConfig {
source_config: None,
internal_metrics: InternalMetricsConfig::default(),
ttl_field: OptionalValuePath::none(),
reload_behavior: Default::default(),
}
}
}
Expand All @@ -136,10 +153,19 @@ const fn default_scan_interval() -> NonZeroU64 {
}

impl MemoryConfig {
pub(super) async fn get_or_build_memory(&self) -> Memory {
pub(super) async fn get_or_build_memory(
&self,
prev_state: Option<Box<dyn std::any::Any + Send + Sync>>,
) -> Memory {
let mut boxed_memory = self.memory.lock().await;
*boxed_memory
.get_or_insert_with(|| Box::new(Memory::new(self.clone())))
.get_or_insert_with(|| {
if let Some(prev) = prev_state {
Box::new(Memory::from_previous_state(self.clone(), prev))
} else {
Box::new(Memory::new(self.clone()))
}
})
.clone()
}
}
Expand All @@ -148,8 +174,13 @@ impl EnrichmentTableConfig for MemoryConfig {
async fn build(
&self,
_globals: &crate::config::GlobalOptions,
prev_state: Option<Box<dyn std::any::Any + Send + Sync>>,
) -> crate::Result<Box<dyn Table + Send + Sync>> {
Ok(Box::new(self.get_or_build_memory().await))
Ok(Box::new(self.get_or_build_memory(prev_state).await))
}

fn wants_previous_state(&self) -> bool {
matches!(self.reload_behavior, ReloadBehavior::PreserveState)
}

fn sink_config(
Expand Down Expand Up @@ -177,7 +208,7 @@ impl EnrichmentTableConfig for MemoryConfig {
#[typetag::serde(name = "memory_enrichment_table")]
impl SinkConfig for MemoryConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await);
let sink = VectorSink::from_event_streamsink(self.get_or_build_memory(None).await);

Ok((sink, future::ok(()).boxed()))
}
Expand All @@ -195,7 +226,7 @@ impl SinkConfig for MemoryConfig {
#[typetag::serde(name = "memory_enrichment_table")]
impl SourceConfig for MemoryConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
let memory = self.get_or_build_memory().await;
let memory = self.get_or_build_memory(None).await;

let log_namespace = cx.log_namespace(self.log_namespace);

Expand Down
69 changes: 66 additions & 3 deletions src/enrichment_tables/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,25 @@ impl Memory {
}
}

/// Creates a new [Memory] based on the provided config and previous state.
pub fn from_previous_state(
config: MemoryConfig,
prev_state: Box<dyn std::any::Any + Send + Sync>,
) -> Self {
if let Ok(prev_memory) = prev_state.downcast::<Memory>() {
Self {
config,
read_handle_factory: prev_memory.read_handle_factory,
read_handle: prev_memory.read_handle,
write_handle: prev_memory.write_handle,
Comment thread
esensar marked this conversation as resolved.
expired_items_sender: prev_memory.expired_items_sender,
expired_items_receiver: prev_memory.expired_items_receiver,
}
} else {
Self::new(config)
}
}

pub(super) fn get_read_handle(&self) -> &evmap::ReadHandle<String, MemoryEntry> {
self.read_handle
.get_or(|| self.read_handle_factory.handle())
Expand Down Expand Up @@ -382,9 +401,15 @@ impl Table for Memory {
Vec::new()
}

/// Doesn't need reload, data is written directly
/// Has to be reloaded always, because a new component is created to insert data into it
fn needs_reload(&self) -> bool {
false
true
Comment on lines 405 to +406

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Rebuild the memory sink when forcing reloads

With the default ClearState behavior, returning true here makes every config reload rebuild the registry's memory table even when the table itself is unchanged, but diff.enrichment_tables does not mark the table sink changed in that case. After an unrelated reload, transforms read from the new empty table while the still-running memory sink keeps writing to the old instance, so all subsequent inserts become invisible until the table is explicitly changed or Vector restarts.

Useful? React with 👍 / 👎.

}

fn extract_state(&self) -> Option<Box<dyn std::any::Any + Send + Sync>> {
let writer = self.write_handle.lock().expect("mutex poisoned");
self.flush(writer);
Some(Box::new(self.clone()))
}
}

Expand Down Expand Up @@ -465,6 +490,7 @@ mod tests {

use super::*;
use crate::{
config::EnrichmentTableConfig,
enrichment_tables::memory::{
config::MemorySourceConfig, internal_events::InternalMetricsConfig,
},
Expand Down Expand Up @@ -501,6 +527,43 @@ mod tests {
);
}

#[tokio::test]
async fn extract_state_preserves_data() {
let memory = Memory::new(Default::default());
memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));

let condition = Condition::Equals {
field: "key",
value: Value::from("test_key"),
};

let expected = ObjectMap::from([
("key".into(), Value::from("test_key")),
("ttl".into(), Value::from(memory.config.ttl)),
("value".into(), Value::from(5)),
]);
assert_eq!(
Ok(expected.clone()),
memory.find_table_row(
Case::Sensitive,
std::slice::from_ref(&condition),
None,
None,
None
)
);

// Now build a new table using old state
let new_memory = MemoryConfig::default()
.build(&Default::default(), memory.extract_state())
.await
.unwrap();
assert_eq!(
Ok(expected),
new_memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
);
}

#[test]
fn calculates_ttl() {
let ttl = 100;
Expand Down Expand Up @@ -1023,7 +1086,7 @@ mod tests {
export_expired_items: false,
source_key: "test".to_string(),
});
let memory = memory_config.get_or_build_memory().await;
let memory = memory_config.get_or_build_memory(None).await;
memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));

let mut events: Vec<Event> = run_and_assert_source_compliance(
Expand Down
1 change: 1 addition & 0 deletions src/enrichment_tables/mmdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl EnrichmentTableConfig for MmdbConfig {
async fn build(
&self,
_: &crate::config::GlobalOptions,
_: Option<Box<dyn std::any::Any + Send + Sync>>,
) -> crate::Result<Box<dyn Table + Send + Sync>> {
Ok(Box::new(Mmdb::new(self.clone())?))
}
Expand Down
Loading
Loading