diff --git a/changelog.d/25547_memory_table_state_loss.fix.md b/changelog.d/25547_memory_table_state_loss.fix.md new file mode 100644 index 0000000000000..da32c70606de5 --- /dev/null +++ b/changelog.d/25547_memory_table_state_loss.fix.md @@ -0,0 +1,3 @@ +Fixed an issue where memory enrichment table state could be lost or detached from the sink component on configuration reload. + +authors: esensar Quad9DNS diff --git a/lib/vector-vrl/enrichment/src/lib.rs b/lib/vector-vrl/enrichment/src/lib.rs index 293a5db343d61..a5c64ea474411 100644 --- a/lib/vector-vrl/enrichment/src/lib.rs +++ b/lib/vector-vrl/enrichment/src/lib.rs @@ -141,6 +141,11 @@ pub trait Table: DynClone { /// Returns true if the underlying data has changed and the table needs reloading. fn needs_reload(&self) -> bool; + + /// Extracts state from this table + fn extract_state(&self) -> Option> { + None + } } dyn_clone::clone_trait_object!(Table); diff --git a/lib/vector-vrl/enrichment/src/tables.rs b/lib/vector-vrl/enrichment/src/tables.rs index 8d245b2daf379..7135f35e11108 100644 --- a/lib/vector-vrl/enrichment/src/tables.rs +++ b/lib/vector-vrl/enrichment/src/tables.rs @@ -196,6 +196,14 @@ impl TableRegistry { None => true, } } + + /// Extracts state from the table if available. + pub fn extract_state(&self, table: &str) -> Option> { + match &**self.tables.load() { + Some(tables) => tables.get(table).and_then(|t| t.extract_state()), + None => None, + } + } } impl std::fmt::Debug for TableRegistry { diff --git a/src/app.rs b/src/app.rs index 565937d3dc21d..829c792330c7e 100644 --- a/src/app.rs +++ b/src/app.rs @@ -616,11 +616,19 @@ pub async fn load_configs( for (name, table) in config.enrichment_tables() { let files = table.inner.files_to_watch(); let component_config = ComponentConfig::new( - files.into_iter().cloned().collect(), + files.clone().into_iter().cloned().collect(), name.clone(), ComponentType::EnrichmentTable, ); watched_component_paths.push(component_config); + if table.as_sink(name).is_some() { + let sink_component_config = ComponentConfig::new( + files.into_iter().cloned().collect(), + name.clone(), + ComponentType::Sink, + ); + watched_component_paths.push(sink_component_config); + } } info!( diff --git a/src/config/diff.rs b/src/config/diff.rs index ebedf3c73d5c3..e824603b0440b 100644 --- a/src/config/diff.rs +++ b/src/config/diff.rs @@ -29,6 +29,7 @@ impl ConfigDiff { enrichment_tables: Difference::from_enrichment_tables( &old.enrichment_tables, &new.enrichment_tables, + &components_to_reload, ), components_to_reload, } @@ -56,7 +57,7 @@ impl ConfigDiff { self.sources.is_changed(key) || self.transforms.is_changed(key) || self.sinks.is_changed(key) - || self.enrichment_tables.contains(key) + || self.enrichment_tables.is_changed(key) } /// Checks whether the given component is removed. @@ -64,7 +65,7 @@ impl ConfigDiff { self.sources.is_removed(key) || self.transforms.is_removed(key) || self.sinks.is_removed(key) - || self.enrichment_tables.contains(key) + || self.enrichment_tables.is_removed(key) } } @@ -116,6 +117,7 @@ impl Difference { fn from_enrichment_tables( old: &IndexMap>, new: &IndexMap>, + need_change: &HashSet, ) -> Self { let old_table_keys = extract_table_component_keys(old); let new_table_keys = extract_table_component_keys(new); @@ -131,7 +133,7 @@ impl Difference { // which can iterate in varied orders. let old_value = serde_json::to_value(&old[*table_key]).unwrap(); let new_value = serde_json::to_value(&new[*table_key]).unwrap(); - old_value != new_value + old_value != new_value || need_change.contains(*table_key) }) .cloned() .map(|(_table_key, derived_component_key)| derived_component_key) @@ -305,6 +307,7 @@ mod tests { let diff = Difference::from_enrichment_tables( &old_config.enrichment_tables, &new_config.enrichment_tables, + &Default::default(), ); assert_eq!(diff.to_add, HashSet::from_iter(["memory_table_new".into()])); diff --git a/src/config/enrichment_table.rs b/src/config/enrichment_table.rs index 540a91875b463..a7595d8c46cbd 100644 --- a/src/config/enrichment_table.rs +++ b/src/config/enrichment_table.rs @@ -112,7 +112,7 @@ where /// Generalized interface for describing and building enrichment table components. #[enum_dispatch] pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync { - /// Builds the enrichment table with the given globals. + /// Builds the enrichment table with the given globals and previous table state. /// /// If the enrichment table is built successfully, `Ok(...)` is returned containing the /// enrichment table. @@ -124,8 +124,14 @@ pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync async fn build( &self, globals: &GlobalOptions, + prev_state: Option>, ) -> crate::Result>; + /// Checks whether this table wants previous state, to try and restore it. + fn wants_previous_state(&self) -> bool { + false + } + fn sink_config( &self, _default_key: &ComponentKey, diff --git a/src/config/mod.rs b/src/config/mod.rs index 4a24954926c13..abe8f81e9c1b5 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -77,7 +77,7 @@ pub use vector_lib::{ }; #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)] -// // This is not a comprehensive set; variants are added as needed. +// This is not a comprehensive set; variants are added as needed. pub enum ComponentType { Transform, Sink, diff --git a/src/enrichment_tables/file.rs b/src/enrichment_tables/file.rs index bc6e3213d7a1b..c2c82c41e8a71 100644 --- a/src/enrichment_tables/file.rs +++ b/src/enrichment_tables/file.rs @@ -239,6 +239,7 @@ impl EnrichmentTableConfig for FileConfig { async fn build( &self, globals: &crate::config::GlobalOptions, + _prev_state: Option>, ) -> crate::Result> { Ok(Box::new(File::new( self.clone(), diff --git a/src/enrichment_tables/geoip.rs b/src/enrichment_tables/geoip.rs index 6716b4eaf2216..e04e2db176541 100644 --- a/src/enrichment_tables/geoip.rs +++ b/src/enrichment_tables/geoip.rs @@ -101,6 +101,7 @@ impl EnrichmentTableConfig for GeoipConfig { async fn build( &self, _: &crate::config::GlobalOptions, + _: Option>, ) -> crate::Result> { Ok(Box::new(Geoip::new(self.clone())?)) } diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs index 9b17ce29750c3..b0738160c063c 100644 --- a/src/enrichment_tables/memory/config.rs +++ b/src/enrichment_tables/memory/config.rs @@ -68,11 +68,27 @@ pub struct MemoryConfig { #[configurable(derived)] #[serde(default)] pub ttl_field: OptionalValuePath, + /// Behavior for memory table state on configuration reload. + #[configurable(derived)] + #[serde(default)] + pub reload_behavior: ReloadBehavior, #[serde(skip)] memory: Arc>>>, } +/// Behavior for memory enrichment table state on configuration reload. +#[configurable_component] +#[derive(Clone, Default)] +#[serde(rename_all = "kebab-case")] +pub enum ReloadBehavior { + /// Always clear state on configuration reload. + #[default] + ClearState, + /// Try to preserve state when possible. + PreserveState, +} + /// Configuration for memory enrichment table source functionality. #[configurable_component] #[derive(Clone, Debug, PartialEq, Eq)] @@ -123,6 +139,7 @@ impl Default for MemoryConfig { source_config: None, internal_metrics: InternalMetricsConfig::default(), ttl_field: OptionalValuePath::none(), + reload_behavior: Default::default(), } } } @@ -136,10 +153,19 @@ const fn default_scan_interval() -> NonZeroU64 { } impl MemoryConfig { - pub(super) async fn get_or_build_memory(&self) -> Memory { + pub(super) async fn get_or_build_memory( + &self, + prev_state: Option>, + ) -> Memory { let mut boxed_memory = self.memory.lock().await; *boxed_memory - .get_or_insert_with(|| Box::new(Memory::new(self.clone()))) + .get_or_insert_with(|| { + if let Some(prev) = prev_state { + Box::new(Memory::from_previous_state(self.clone(), prev)) + } else { + Box::new(Memory::new(self.clone())) + } + }) .clone() } } @@ -148,8 +174,13 @@ impl EnrichmentTableConfig for MemoryConfig { async fn build( &self, _globals: &crate::config::GlobalOptions, + prev_state: Option>, ) -> crate::Result> { - Ok(Box::new(self.get_or_build_memory().await)) + Ok(Box::new(self.get_or_build_memory(prev_state).await)) + } + + fn wants_previous_state(&self) -> bool { + matches!(self.reload_behavior, ReloadBehavior::PreserveState) } fn sink_config( @@ -177,7 +208,7 @@ impl EnrichmentTableConfig for MemoryConfig { #[typetag::serde(name = "memory_enrichment_table")] impl SinkConfig for MemoryConfig { async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await); + let sink = VectorSink::from_event_streamsink(self.get_or_build_memory(None).await); Ok((sink, future::ok(()).boxed())) } @@ -195,7 +226,7 @@ impl SinkConfig for MemoryConfig { #[typetag::serde(name = "memory_enrichment_table")] impl SourceConfig for MemoryConfig { async fn build(&self, cx: SourceContext) -> crate::Result { - let memory = self.get_or_build_memory().await; + let memory = self.get_or_build_memory(None).await; let log_namespace = cx.log_namespace(self.log_namespace); diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index a819b3be77912..463df13604f99 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -142,6 +142,25 @@ impl Memory { } } + /// Creates a new [Memory] based on the provided config and previous state. + pub fn from_previous_state( + config: MemoryConfig, + prev_state: Box, + ) -> Self { + if let Ok(prev_memory) = prev_state.downcast::() { + Self { + config, + read_handle_factory: prev_memory.read_handle_factory, + read_handle: prev_memory.read_handle, + write_handle: prev_memory.write_handle, + expired_items_sender: prev_memory.expired_items_sender, + expired_items_receiver: prev_memory.expired_items_receiver, + } + } else { + Self::new(config) + } + } + pub(super) fn get_read_handle(&self) -> &evmap::ReadHandle { self.read_handle .get_or(|| self.read_handle_factory.handle()) @@ -386,6 +405,12 @@ impl Table for Memory { fn needs_reload(&self) -> bool { false } + + fn extract_state(&self) -> Option> { + let writer = self.write_handle.lock().expect("mutex poisoned"); + self.flush(writer); + Some(Box::new(self.clone())) + } } impl std::fmt::Debug for Memory { @@ -465,6 +490,7 @@ mod tests { use super::*; use crate::{ + config::EnrichmentTableConfig, enrichment_tables::memory::{ config::MemorySourceConfig, internal_events::InternalMetricsConfig, }, @@ -501,6 +527,43 @@ mod tests { ); } + #[tokio::test] + async fn extract_state_preserves_data() { + let memory = Memory::new(Default::default()); + memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); + + let condition = Condition::Equals { + field: "key", + value: Value::from("test_key"), + }; + + let expected = ObjectMap::from([ + ("key".into(), Value::from("test_key")), + ("ttl".into(), Value::from(memory.config.ttl)), + ("value".into(), Value::from(5)), + ]); + assert_eq!( + Ok(expected.clone()), + memory.find_table_row( + Case::Sensitive, + std::slice::from_ref(&condition), + None, + None, + None + ) + ); + + // Now build a new table using old state + let new_memory = MemoryConfig::default() + .build(&Default::default(), memory.extract_state()) + .await + .unwrap(); + assert_eq!( + Ok(expected), + new_memory.find_table_row(Case::Sensitive, &[condition], None, None, None) + ); + } + #[test] fn calculates_ttl() { let ttl = 100; @@ -1023,7 +1086,7 @@ mod tests { export_expired_items: false, source_key: "test".to_string(), }); - let memory = memory_config.get_or_build_memory().await; + let memory = memory_config.get_or_build_memory(None).await; memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); let mut events: Vec = run_and_assert_source_compliance( diff --git a/src/enrichment_tables/mmdb.rs b/src/enrichment_tables/mmdb.rs index 1977d1e0efad6..d01b3fe320c0a 100644 --- a/src/enrichment_tables/mmdb.rs +++ b/src/enrichment_tables/mmdb.rs @@ -36,6 +36,7 @@ impl EnrichmentTableConfig for MmdbConfig { async fn build( &self, _: &crate::config::GlobalOptions, + _: Option>, ) -> crate::Result> { Ok(Box::new(Mmdb::new(self.clone())?)) } diff --git a/src/topology/builder.rs b/src/topology/builder.rs index d325d1fc070d3..4d092f53671b9 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -28,6 +28,7 @@ use vector_lib::{ }, }, }, + enrichment::Table, internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered}, latency::LatencyRecorder, schema::Definition, @@ -142,11 +143,11 @@ impl<'a> Builder<'a> { self.build_transforms(enrichment_tables).await; self.build_sinks(enrichment_tables).await; - // We should have all the data for the enrichment tables loaded now, so switch them over to - // readonly. - enrichment_tables.finish_load(); - if self.errors.is_empty() { + // We should have all the data for the enrichment tables loaded now, so switch them over to + // readonly. + enrichment_tables.finish_load(); + Ok(TopologyPieces { inputs: self.inputs, outputs: Self::finalize_outputs(self.outputs), @@ -183,12 +184,15 @@ impl<'a> Builder<'a> { /// Loads, or reloads the enrichment tables. /// The tables are stored in the `ENRICHMENT_TABLES` global variable. async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry { - let mut enrichment_tables = HashMap::new(); + let mut enrichment_tables: HashMap> = HashMap::new(); // Build enrichment tables 'tables: for (name, table_outer) in self.config.enrichment_tables.iter() { let table_name = name.to_string(); - if ENRICHMENT_TABLES.needs_reload(&table_name) { + if ENRICHMENT_TABLES.needs_reload(&table_name) + || self.diff.enrichment_tables.is_changed(name) + || self.diff.enrichment_tables.is_added(name) + { let indexes = if !self.diff.enrichment_tables.is_added(name) { // If this is an existing enrichment table, we need to store the indexes to reapply // them again post load. @@ -197,7 +201,18 @@ impl<'a> Builder<'a> { None }; - let mut table = match table_outer.inner.build(&self.config.global).await { + let mut prev_state = None; + if !self.diff.enrichment_tables.is_added(name) + && table_outer.inner.wants_previous_state() + { + prev_state = ENRICHMENT_TABLES.extract_state(&table_name); + } + + let mut table = match table_outer + .inner + .build(&self.config.global, prev_state) + .await + { Ok(table) => table, Err(error) => { self.errors @@ -1006,16 +1021,20 @@ async fn run_source_output_pump( Ok(TaskOutput::Source) } +/// Reloads file based enrichment tables - not stateful ones pub async fn reload_enrichment_tables(config: &Config) { let _enrichment_tables_load_guard = ENRICHMENT_TABLES_LOAD_LOCK.lock().await; let mut enrichment_tables = HashMap::new(); // Build enrichment tables 'tables: for (name, table_outer) in config.enrichment_tables.iter() { let table_name = name.to_string(); - if ENRICHMENT_TABLES.needs_reload(&table_name) { + if ENRICHMENT_TABLES.needs_reload(&table_name) + // Tables that can act as sinks are reloaded through topology + && table_outer.as_sink(name).is_none() + { let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name)); - let mut table = match table_outer.inner.build(&config.global).await { + let mut table = match table_outer.inner.build(&config.global, None).await { Ok(table) => table, Err(error) => { error!("Enrichment table \"{name}\" reload failed: {error}"); diff --git a/src/topology/running.rs b/src/topology/running.rs index 6c6da03abf770..9bc0147730fa7 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -421,12 +421,29 @@ impl RunningTopology { ) -> HashMap { // First, we shutdown any changed/removed sources. This ensures that we can allow downstream // components to terminate naturally by virtue of the flow of events stopping. - if diff.sources.any_changed_or_removed() { + if diff.sources.any_changed_or_removed() || diff.enrichment_tables.any_changed_or_removed() + { let timeout = Duration::from_secs(30); let mut source_shutdown_handles = Vec::new(); + let to_remove_table_sources = diff + .enrichment_tables + .to_remove + .iter() + .filter_map(|key| { + self.config + .enrichment_table(key) + .and_then(|t| t.as_source(key)) + .map(|(key, _)| key.clone()) + }) + .collect::>(); let deadline = Instant::now() + timeout; - for key in &diff.sources.to_remove { + for key in diff + .sources + .to_remove + .iter() + .chain(to_remove_table_sources.iter()) + { debug!(component_id = %key, "Removing source."); let previous = self.tasks.remove(key).unwrap(); @@ -437,7 +454,23 @@ impl RunningTopology { .push(self.shutdown_coordinator.shutdown_source(key, deadline)); } - for key in &diff.sources.to_change { + let to_change_table_sources = diff + .enrichment_tables + .to_change + .iter() + .filter_map(|key| { + self.config + .enrichment_table(key) + .and_then(|t| t.as_source(key)) + .map(|(key, _)| key.clone()) + }) + .collect::>(); + for key in diff + .sources + .to_change + .iter() + .chain(to_change_table_sources.iter()) + { debug!(component_id = %key, "Changing source."); self.remove_outputs(key); @@ -563,6 +596,12 @@ impl RunningTopology { .sinks .to_change .iter() + .chain(diff.enrichment_tables.to_change.iter().filter(|key| { + self.config + .enrichment_table(key) + .and_then(|t| t.as_sink(key)) + .is_some() + })) .filter(|&key| { if diff.components_to_reload.contains(key) { return false; @@ -592,10 +631,15 @@ impl RunningTopology { .iter() .filter(|key| { !reuse_buffers.contains(*key) - && self + && (self .config .sink(key) .is_some_and(|s| s.buffer.has_disk_stage()) + || self + .config + .enrichment_table(key) + .and_then(|t| t.as_sink(key)) + .is_some_and(|(_, s)| s.buffer.has_disk_stage())) }) .cloned() .collect::>(); @@ -751,6 +795,7 @@ impl RunningTopology { for key in removed_sinks { // Sinks only have inputs self.inputs_tap_metadata.remove(key); + self.component_type_names.remove(key); } let removed_sources = diff.enrichment_tables.to_remove.iter().filter_map(|key| { @@ -761,6 +806,7 @@ impl RunningTopology { for key in removed_sources { // Sources only have outputs self.outputs_tap_metadata.remove(&key); + self.component_type_names.remove(&key); } for key in diff.sources.changed_and_added() { @@ -784,6 +830,8 @@ impl RunningTopology { if let Some(task) = new_pieces.tasks.get(&key) { self.outputs_tap_metadata .insert(key.clone(), ("source", task.typetag().to_string())); + self.component_type_names + .insert(key.clone(), task.typetag().to_string()); } } @@ -803,6 +851,21 @@ impl RunningTopology { } } + for key in diff + .enrichment_tables + .changed_and_added() + .filter_map(|key| { + self.config + .enrichment_table(key) + .and_then(|t| t.as_sink(key).map(|(key, _)| key)) + }) + { + if let Some(task) = new_pieces.tasks.get(&key) { + self.component_type_names + .insert(key.clone(), task.typetag().to_string()); + } + } + for (key, input) in &new_pieces.inputs { self.inputs_tap_metadata .insert(key.clone(), input.1.clone()); @@ -1042,7 +1105,7 @@ impl RunningTopology { .transforms() .filter(|(key, _)| !diff.transforms.contains(key)); for (transform_key, transform) in unchanged_transforms { - let changed_outputs = get_changed_outputs(diff, transform.inputs.clone()); + let changed_outputs = get_changed_outputs(&self.config, diff, transform.inputs.clone()); for output_id in changed_outputs { debug!(component_id = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout."); @@ -1052,12 +1115,20 @@ impl RunningTopology { } } + let unchanged_table_sinks = self + .config + .enrichment_tables() + .filter(|(key, _)| !diff.enrichment_tables.contains(key)) + .filter_map(|(key, table)| table.as_sink(key)) + .collect::>(); let unchanged_sinks = self .config .sinks() .filter(|(key, _)| !diff.sinks.contains(key)); - for (sink_key, sink) in unchanged_sinks { - let changed_outputs = get_changed_outputs(diff, sink.inputs.clone()); + for (sink_key, sink) in + unchanged_sinks.chain(unchanged_table_sinks.iter().map(|(k, v)| (k, v))) + { + let changed_outputs = get_changed_outputs(&self.config, diff, sink.inputs.clone()); for output_id in changed_outputs { debug!(component_id = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout."); @@ -1412,10 +1483,30 @@ impl RunningTopology { } } -fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs) -> Vec { +fn get_changed_outputs( + config: &Config, + diff: &ConfigDiff, + output_ids: Inputs, +) -> Vec { let mut changed_outputs = Vec::new(); - for source_key in &diff.sources.to_change { + let to_change_table_sources = diff + .enrichment_tables + .to_change + .iter() + .filter_map(|key| { + config + .enrichment_table(key) + .and_then(|t| t.as_source(key)) + .map(|(key, _)| key.clone()) + }) + .collect::>(); + for source_key in diff + .sources + .to_change + .iter() + .chain(to_change_table_sources.iter()) + { changed_outputs.extend( output_ids .iter() diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index f4ccf974158db..f71710fba4b06 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -44,7 +44,8 @@ mod latency_metrics; feature = "sources-prometheus", feature = "sinks-prometheus", feature = "sources-internal_metrics", - feature = "sources-splunk_hec" + feature = "sources-splunk_hec", + feature = "enrichment-tables-memory", ))] mod reload; #[cfg(all(feature = "sinks-console", feature = "sources-demo_logs"))] diff --git a/src/topology/test/reload.rs b/src/topology/test/reload.rs index 7b2db20d4f337..b86cf24aba0c4 100644 --- a/src/topology/test/reload.rs +++ b/src/topology/test/reload.rs @@ -6,21 +6,31 @@ use std::{ }; use futures::StreamExt; -use tokio::time::sleep; +use tokio::{sync::oneshot::channel, time::sleep}; use tokio_stream::wrappers::UnboundedReceiverStream; use vector_lib::{ buffers::{BufferConfig, BufferType, MemoryBufferSize, WhenFull}, config::ComponentKey, + event::{Event, EventContainer, LogEvent}, }; use crate::{ - config::Config, + config::{Config, unit_test::UnitTestSourceConfig}, + enrichment_tables::{ + EnrichmentTables, + memory::{MemoryConfig, MemorySourceConfig, ReloadBehavior}, + }, sinks::prometheus::exporter::PrometheusExporterConfig, sources::{ internal_metrics::InternalMetricsConfig, prometheus::PrometheusRemoteWriteConfig, splunk_hec::SplunkConfig, }, - test_util::{self, addr::next_addr, mock::basic_sink, start_topology, temp_dir, wait_for_tcp}, + test_util::{ + self, + addr::next_addr, + mock::{basic_sink, oneshot_sink}, + start_topology, temp_dir, wait_for_tcp, + }, topology::ReloadError::*, }; @@ -451,6 +461,111 @@ async fn topology_disk_buffer_config_change_chained_does_not_stall() { } } +#[tokio::test] +async fn topology_reload_preserves_enrichment_table_state() { + // Changing an enrichment table that has state and supports state preservation should preserve + // the state after reload, even if it was changed (if the state is still valid after the chaange). + test_util::trace_init(); + + let source_event = Event::Log(LogEvent::from("test")); + let (old_tx, old_rx) = channel(); + + let mut old_config = Config::builder(); + let mut old_memory_config = MemoryConfig::default(); + old_memory_config.source_config = Some(MemorySourceConfig { + export_interval: Some(NonZeroU64::new(1).unwrap()), + source_key: "memory_test_source".to_string(), + export_batch_size: None, + remove_after_export: false, + export_expired_items: false, + }); + old_memory_config.ttl = 100; + old_config.add_enrichment_table( + "memory_test", + &["in"], + EnrichmentTables::Memory(old_memory_config), + ); + old_config.add_source( + "in", + UnitTestSourceConfig { + events: vec![source_event.clone()], + }, + ); + old_config.add_sink("out", &["memory_test_source"], oneshot_sink(old_tx)); + + let (new_tx, new_rx) = channel(); + let mut new_config = Config::builder(); + let mut new_memory_config = MemoryConfig::default(); + new_memory_config.reload_behavior = ReloadBehavior::PreserveState; + new_memory_config.source_config = Some(MemorySourceConfig { + export_interval: Some(NonZeroU64::new(1).unwrap()), + source_key: "memory_test_source".to_string(), + export_batch_size: None, + remove_after_export: false, + export_expired_items: false, + }); + new_memory_config.ttl = 101; + new_config.add_enrichment_table( + "memory_test", + // No input to ensure old state is read and not new + &[], + EnrichmentTables::Memory(new_memory_config), + ); + new_config.add_source("in_2", UnitTestSourceConfig { events: vec![] }); + new_config.add_sink("out_2", &["memory_test_source"], oneshot_sink(new_tx)); + + let (mut topology, crash) = start_topology(old_config.build().unwrap(), false).await; + let mut crash_stream = UnboundedReceiverStream::new(crash); + + // Make sure the topology is fully running: other components, etc. + sleep(Duration::from_secs(2)).await; + + tokio::select! { + events = old_rx => { + let events = events.expect("must get event to output"); + let events = events.into_events().collect::>(); + assert_eq!(events.len(), 2); + let message = events.into_iter().filter_map(|e| e.into_log().value().clone().into_object()).find(|e| e.get("key").is_some_and(|k| k.as_str().is_some_and(|k| k == "message"))) + .and_then(|entry| { + entry + .get("value") + .cloned() + .map(|m| m.to_string_lossy().into_owned()) + }); + assert_eq!(message.unwrap(), "test"); + } + _ = crash_stream.next() => panic!(), + } + + // Now reload the topology with the new configuration, and ensure the table still has the same state + topology + .reload_config_and_respawn(new_config.build().unwrap(), Default::default()) + .await + .unwrap(); + + // Give the old topology configuration a chance to shutdown cleanly, etc. + sleep(Duration::from_secs(2)).await; + tokio::select! { + events = new_rx => { + let events = events.expect("must get event to output"); + let events = events.into_events().collect::>(); + assert_eq!(events.len(), 2); + let message = events.into_iter().filter_map(|e| e.into_log().value().clone().into_object()).find(|e| e.get("key").is_some_and(|k| k.as_str().is_some_and(|k| k == "message"))) + .and_then(|entry| { + entry + .get("value") + .cloned() + .map(|m| m.to_string_lossy().into_owned()) + }); + assert_eq!(message.unwrap(), "test"); + }, + _ = tokio::time::sleep(Duration::from_secs(10)) => { + panic!("Never received the events") + } + _ = crash_stream.next() => panic!(), + } +} + async fn reload_sink_test( old_config: Config, new_config: Config,