diff --git a/shotover-proxy/tests/runner/runner_int_tests.rs b/shotover-proxy/tests/runner/runner_int_tests.rs index bacef3da9..b9019b3ac 100644 --- a/shotover-proxy/tests/runner/runner_int_tests.rs +++ b/shotover-proxy/tests/runner/runner_int_tests.rs @@ -1,5 +1,36 @@ use crate::shotover_process; +use serde_json::Value; use test_helpers::shotover_process::{EventMatcher, Level}; +use tokio::net::TcpStream; + +#[tokio::test] +async fn test_request_id_increments() { + // Ensure it isnt reliant on timing + let shotover_process = shotover_process("tests/test-configs/null-valkey/topology.yaml") + .start() + .await; + for _ in 0..1000 { + TcpStream::connect("127.0.0.1:6379").await.unwrap(); + } + + let events = shotover_process.shutdown_and_then_consume_events(&[]).await; + let mut previous_id = 0; + for event in events.events { + for span in event.spans { + if let Some(name) = span.get("name") { + if *name == Value::String("connection".into()) { + if let Some(id) = span.get("id").and_then(|x| x.as_i64()) { + // ensure that the ID increases by 1 and monotonically + assert!(previous_id == id || previous_id + 1 == id); + previous_id = id; + } + } + } + } + } + // ensure that this test does something + assert_eq!(previous_id, 1000); +} #[tokio::test] async fn test_early_shutdown_cassandra_source() { diff --git a/shotover/src/runner.rs b/shotover/src/runner.rs index 08b45c71b..7d0664fbf 100644 --- a/shotover/src/runner.rs +++ b/shotover/src/runner.rs @@ -203,7 +203,7 @@ impl Shotover { crate::hot_reload::server::start_hot_reload_server(socket_path, &sources); } - futures::future::join_all(sources.into_iter().map(|x| x.into_join_handle())).await; + futures::future::join_all(sources.into_iter().map(|x| x.join())).await; Ok(()) } Err(err) => Err(err), diff --git a/shotover/src/server.rs b/shotover/src/server.rs index b515be25f..5df87ade0 100644 --- a/shotover/src/server.rs +++ b/shotover/src/server.rs @@ -238,7 +238,7 @@ impl TcpCodecListener { let client_details = stream.peer_addr() .map(|p| p.ip().to_string()) .unwrap_or_else(|_| "Unknown Peer".to_string()); - tracing::debug!("New connection from {}", client_details); + tracing::info!("New connection from {}", client_details); let force_run_chain = Arc::new(Notify::new()); let context = TransformContextBuilder{ diff --git a/shotover/src/sources/mod.rs b/shotover/src/sources/mod.rs index ffc424b90..247105554 100644 --- a/shotover/src/sources/mod.rs +++ b/shotover/src/sources/mod.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use tokio::net::TcpListener; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::watch; -use tokio::task::JoinHandle; +use tokio::task::{JoinError, JoinHandle}; #[cfg(feature = "cassandra")] pub mod cassandra; @@ -35,7 +35,8 @@ pub enum Transport { #[derive(Debug)] pub struct Source { - pub join_handle: JoinHandle<()>, + pub listener_task: JoinHandle<()>, + /// This value must remain alive for as long as the Source is in use. pub hot_reload_tx: UnboundedSender, pub gradual_shutdown_tx: UnboundedSender, pub name: String, @@ -49,16 +50,20 @@ impl Source { name: String, ) -> Self { Self { - join_handle, + listener_task: join_handle, hot_reload_tx, gradual_shutdown_tx, name, } } - pub fn into_join_handle(self) -> JoinHandle<()> { - self.join_handle + pub async fn join(self) -> Result<(), JoinError> { + self.listener_task.await?; + // explicitly drop hot_reload_tx here, to show that it occurs after the listener_task has shutdown. + std::mem::drop(self.hot_reload_tx); + Ok(()) } + pub fn get_hot_reload_tx(&self) -> UnboundedSender { self.hot_reload_tx.clone() }