Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions shotover-proxy/tests/runner/runner_int_tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,36 @@
use crate::shotover_process;
use serde_json::Value;
use test_helpers::shotover_process::{EventMatcher, Level};
use tokio::net::TcpStream;

#[tokio::test]
async fn test_request_id_increments() {
// Ensure it isnt reliant on timing
let shotover_process = shotover_process("tests/test-configs/null-valkey/topology.yaml")
.start()
.await;
for _ in 0..1000 {
TcpStream::connect("127.0.0.1:6379").await.unwrap();
}

let events = shotover_process.shutdown_and_then_consume_events(&[]).await;
let mut previous_id = 0;
for event in events.events {
for span in event.spans {
if let Some(name) = span.get("name") {
if *name == Value::String("connection".into()) {
if let Some(id) = span.get("id").and_then(|x| x.as_i64()) {
// ensure that the ID increases by 1 and monotonically
assert!(previous_id == id || previous_id + 1 == id);
previous_id = id;
}
}
}
}
}
// ensure that this test does something
assert_eq!(previous_id, 1000);
}

#[tokio::test]
async fn test_early_shutdown_cassandra_source() {
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl Shotover {
crate::hot_reload::server::start_hot_reload_server(socket_path, &sources);
}

futures::future::join_all(sources.into_iter().map(|x| x.into_join_handle())).await;
futures::future::join_all(sources.into_iter().map(|x| x.join())).await;
Ok(())
}
Err(err) => Err(err),
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl<C: CodecBuilder + 'static> TcpCodecListener<C> {
let client_details = stream.peer_addr()
.map(|p| p.ip().to_string())
.unwrap_or_else(|_| "Unknown Peer".to_string());
tracing::debug!("New connection from {}", client_details);
tracing::info!("New connection from {}", client_details);

let force_run_chain = Arc::new(Notify::new());
let context = TransformContextBuilder{
Expand Down
15 changes: 10 additions & 5 deletions shotover/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::collections::HashMap;
use tokio::net::TcpListener;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::task::{JoinError, JoinHandle};

#[cfg(feature = "cassandra")]
pub mod cassandra;
Expand All @@ -35,7 +35,8 @@ pub enum Transport {

#[derive(Debug)]
pub struct Source {
pub join_handle: JoinHandle<()>,
pub listener_task: JoinHandle<()>,
/// This value must remain alive for as long as the Source is in use.
pub hot_reload_tx: UnboundedSender<HotReloadListenerRequest>,
pub gradual_shutdown_tx: UnboundedSender<GradualShutdownRequest>,
pub name: String,
Expand All @@ -49,16 +50,20 @@ impl Source {
name: String,
) -> Self {
Self {
join_handle,
listener_task: join_handle,
hot_reload_tx,
gradual_shutdown_tx,
name,
}
}

pub fn into_join_handle(self) -> JoinHandle<()> {
self.join_handle
pub async fn join(self) -> Result<(), JoinError> {
self.listener_task.await?;
// explicitly drop hot_reload_tx here, to show that it occurs after the listener_task has shutdown.
std::mem::drop(self.hot_reload_tx);
Ok(())
}

pub fn get_hot_reload_tx(&self) -> UnboundedSender<HotReloadListenerRequest> {
self.hot_reload_tx.clone()
}
Expand Down
Loading