Skip to content

Commit e6ffac5

Browse files
authored
Fix accidental spinloop in connection accept logic (#1979)
1 parent 7a7f863 commit e6ffac5

File tree

4 files changed

+43
-7
lines changed

4 files changed

+43
-7
lines changed

shotover-proxy/tests/runner/runner_int_tests.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,36 @@
11
use crate::shotover_process;
2+
use serde_json::Value;
23
use test_helpers::shotover_process::{EventMatcher, Level};
4+
use tokio::net::TcpStream;
5+
6+
#[tokio::test]
7+
async fn test_request_id_increments() {
8+
// Ensure it isnt reliant on timing
9+
let shotover_process = shotover_process("tests/test-configs/null-valkey/topology.yaml")
10+
.start()
11+
.await;
12+
for _ in 0..1000 {
13+
TcpStream::connect("127.0.0.1:6379").await.unwrap();
14+
}
15+
16+
let events = shotover_process.shutdown_and_then_consume_events(&[]).await;
17+
let mut previous_id = 0;
18+
for event in events.events {
19+
for span in event.spans {
20+
if let Some(name) = span.get("name") {
21+
if *name == Value::String("connection".into()) {
22+
if let Some(id) = span.get("id").and_then(|x| x.as_i64()) {
23+
// ensure that the ID increases by 1 and monotonically
24+
assert!(previous_id == id || previous_id + 1 == id);
25+
previous_id = id;
26+
}
27+
}
28+
}
29+
}
30+
}
31+
// ensure that this test does something
32+
assert_eq!(previous_id, 1000);
33+
}
334

435
#[tokio::test]
536
async fn test_early_shutdown_cassandra_source() {

shotover/src/runner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ impl Shotover {
203203
crate::hot_reload::server::start_hot_reload_server(socket_path, &sources);
204204
}
205205

206-
futures::future::join_all(sources.into_iter().map(|x| x.into_join_handle())).await;
206+
futures::future::join_all(sources.into_iter().map(|x| x.join())).await;
207207
Ok(())
208208
}
209209
Err(err) => Err(err),

shotover/src/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ impl<C: CodecBuilder + 'static> TcpCodecListener<C> {
238238
let client_details = stream.peer_addr()
239239
.map(|p| p.ip().to_string())
240240
.unwrap_or_else(|_| "Unknown Peer".to_string());
241-
tracing::debug!("New connection from {}", client_details);
241+
tracing::info!("New connection from {}", client_details);
242242

243243
let force_run_chain = Arc::new(Notify::new());
244244
let context = TransformContextBuilder{

shotover/src/sources/mod.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::collections::HashMap;
1515
use tokio::net::TcpListener;
1616
use tokio::sync::mpsc::UnboundedSender;
1717
use tokio::sync::watch;
18-
use tokio::task::JoinHandle;
18+
use tokio::task::{JoinError, JoinHandle};
1919

2020
#[cfg(feature = "cassandra")]
2121
pub mod cassandra;
@@ -35,7 +35,8 @@ pub enum Transport {
3535

3636
#[derive(Debug)]
3737
pub struct Source {
38-
pub join_handle: JoinHandle<()>,
38+
pub listener_task: JoinHandle<()>,
39+
/// This value must remain alive for as long as the Source is in use.
3940
pub hot_reload_tx: UnboundedSender<HotReloadListenerRequest>,
4041
pub gradual_shutdown_tx: UnboundedSender<GradualShutdownRequest>,
4142
pub name: String,
@@ -49,16 +50,20 @@ impl Source {
4950
name: String,
5051
) -> Self {
5152
Self {
52-
join_handle,
53+
listener_task: join_handle,
5354
hot_reload_tx,
5455
gradual_shutdown_tx,
5556
name,
5657
}
5758
}
5859

59-
pub fn into_join_handle(self) -> JoinHandle<()> {
60-
self.join_handle
60+
pub async fn join(self) -> Result<(), JoinError> {
61+
self.listener_task.await?;
62+
// explicitly drop hot_reload_tx here, to show that it occurs after the listener_task has shutdown.
63+
std::mem::drop(self.hot_reload_tx);
64+
Ok(())
6165
}
66+
6267
pub fn get_hot_reload_tx(&self) -> UnboundedSender<HotReloadListenerRequest> {
6368
self.hot_reload_tx.clone()
6469
}

0 commit comments

Comments
 (0)