Skip to content

Commit d71cd46

Browse files
fix: graceful shutdown of rrelayer (#39)
* fix: graceful shutdown of rrelayer * fixes * remove workflow * remove workflow
1 parent d866679 commit d71cd46

File tree

10 files changed

+440
-73
lines changed

10 files changed

+440
-73
lines changed

crates/core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ pub mod authentication;
33
pub mod gas;
44
mod logger;
55
pub use logger::setup_info_logger;
6+
mod shutdown;
7+
pub use shutdown::{enter_critical_operation, is_shutdown_in_progress, request_graceful_shutdown};
68
mod middleware;
79
pub mod network;
810
mod postgres;

crates/core/src/logger.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{
33
sync::atomic::{AtomicBool, Ordering},
44
};
55

6+
use crate::is_shutdown_in_progress;
67
use once_cell::sync::Lazy;
78
use tracing::level_filters::LevelFilter;
89
use tracing_subscriber::{
@@ -27,8 +28,7 @@ impl ShutdownAwareWriter {
2728

2829
impl Write for ShutdownAwareWriter {
2930
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
30-
if SHUTDOWN_IN_PROGRESS.load(Ordering::Relaxed) {
31-
// During shutdown, write directly to stdout
31+
if SHUTDOWN_IN_PROGRESS.load(Ordering::Relaxed) || is_shutdown_in_progress() {
3232
let stdout = std::io::stdout();
3333
let mut handle = stdout.lock();
3434
handle.write(buf)
@@ -38,7 +38,7 @@ impl Write for ShutdownAwareWriter {
3838
}
3939

4040
fn flush(&mut self) -> std::io::Result<()> {
41-
if SHUTDOWN_IN_PROGRESS.load(Ordering::Relaxed) {
41+
if SHUTDOWN_IN_PROGRESS.load(Ordering::Relaxed) || is_shutdown_in_progress() {
4242
let stdout = std::io::stdout();
4343
let mut handle = stdout.lock();
4444
handle.flush()
@@ -62,7 +62,7 @@ struct CustomTimer;
6262

6363
impl tracing_subscriber::fmt::time::FormatTime for CustomTimer {
6464
fn format_time(&self, writer: &mut Writer<'_>) -> std::fmt::Result {
65-
if SHUTDOWN_IN_PROGRESS.load(Ordering::Relaxed) {
65+
if SHUTDOWN_IN_PROGRESS.load(Ordering::Relaxed) || is_shutdown_in_progress() {
6666
let now = chrono::Local::now();
6767
write!(writer, "{}", now.format("%H:%M:%S"))
6868
} else {
@@ -93,11 +93,6 @@ pub fn setup_info_logger() {
9393
setup_logger(LevelFilter::INFO);
9494
}
9595

96-
#[allow(dead_code)]
97-
pub fn mark_shutdown_started() {
98-
SHUTDOWN_IN_PROGRESS.store(true, Ordering::Relaxed);
99-
}
100-
10196
#[allow(dead_code)]
10297
pub struct LoggerGuard;
10398

crates/core/src/shutdown.rs

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
use std::{
2+
sync::{
3+
atomic::{AtomicBool, AtomicUsize, Ordering},
4+
Arc,
5+
},
6+
time::Duration,
7+
};
8+
9+
use once_cell::sync::Lazy;
10+
use tokio::sync::{broadcast, Notify};
11+
use tracing::{info, warn};
12+
13+
static SHUTDOWN_COORDINATOR: Lazy<Arc<ShutdownCoordinator>> =
14+
Lazy::new(|| Arc::new(ShutdownCoordinator::new()));
15+
16+
pub struct ShutdownCoordinator {
17+
shutdown_sender: broadcast::Sender<()>,
18+
operations_complete: Arc<Notify>,
19+
active_operations: Arc<AtomicUsize>,
20+
shutdown_requested: Arc<AtomicBool>,
21+
}
22+
23+
impl ShutdownCoordinator {
24+
fn new() -> Self {
25+
let (shutdown_sender, _) = broadcast::channel(16);
26+
Self {
27+
shutdown_sender,
28+
operations_complete: Arc::new(Notify::new()),
29+
active_operations: Arc::new(AtomicUsize::new(0)),
30+
shutdown_requested: Arc::new(AtomicBool::new(false)),
31+
}
32+
}
33+
34+
#[cfg(test)]
35+
pub fn new_for_test() -> Self {
36+
Self::new()
37+
}
38+
39+
pub fn subscribe(&self) -> broadcast::Receiver<()> {
40+
self.shutdown_sender.subscribe()
41+
}
42+
43+
pub fn is_shutdown_requested(&self) -> bool {
44+
self.shutdown_requested.load(Ordering::Relaxed)
45+
}
46+
47+
pub fn enter_operation(self: &Arc<Self>) -> Option<OperationGuard> {
48+
if self.is_shutdown_requested() {
49+
return None;
50+
}
51+
self.active_operations.fetch_add(1, Ordering::SeqCst);
52+
Some(OperationGuard { coordinator: Arc::clone(self) })
53+
}
54+
55+
fn exit_operation(&self) {
56+
let prev = self.active_operations.fetch_sub(1, Ordering::SeqCst);
57+
if prev == 1 {
58+
self.operations_complete.notify_waiters();
59+
}
60+
}
61+
62+
pub async fn request_shutdown(&self, timeout: Duration) -> bool {
63+
info!(
64+
"Graceful shutdown requested, waiting for {} active operations to complete",
65+
self.active_operations.load(Ordering::Relaxed)
66+
);
67+
68+
self.shutdown_requested.store(true, Ordering::SeqCst);
69+
70+
let _ = self.shutdown_sender.send(());
71+
72+
tokio::select! {
73+
_ = self.wait_for_operations_complete() => {
74+
info!("All operations completed gracefully");
75+
true
76+
}
77+
_ = tokio::time::sleep(timeout) => {
78+
warn!("Shutdown timeout reached, {} operations still active",
79+
self.active_operations.load(Ordering::Relaxed));
80+
false
81+
}
82+
}
83+
}
84+
85+
async fn wait_for_operations_complete(&self) {
86+
while self.active_operations.load(Ordering::Relaxed) > 0 {
87+
self.operations_complete.notified().await;
88+
}
89+
}
90+
91+
#[cfg(test)]
92+
pub fn active_operations_count(&self) -> usize {
93+
self.active_operations.load(Ordering::Relaxed)
94+
}
95+
}
96+
97+
pub struct OperationGuard {
98+
coordinator: Arc<ShutdownCoordinator>,
99+
}
100+
101+
impl Drop for OperationGuard {
102+
fn drop(&mut self) {
103+
self.coordinator.exit_operation();
104+
}
105+
}
106+
107+
pub fn shutdown_coordinator() -> Arc<ShutdownCoordinator> {
108+
Arc::clone(&SHUTDOWN_COORDINATOR)
109+
}
110+
111+
pub async fn request_graceful_shutdown(timeout: Duration) -> bool {
112+
shutdown_coordinator().request_shutdown(timeout).await
113+
}
114+
115+
pub fn is_shutdown_in_progress() -> bool {
116+
shutdown_coordinator().is_shutdown_requested()
117+
}
118+
119+
pub fn enter_critical_operation() -> Option<OperationGuard> {
120+
let coordinator = shutdown_coordinator();
121+
coordinator.enter_operation()
122+
}
123+
124+
pub fn subscribe_to_shutdown() -> broadcast::Receiver<()> {
125+
shutdown_coordinator().subscribe()
126+
}
127+
128+
#[cfg(test)]
129+
mod tests {
130+
use super::*;
131+
use std::{sync::Arc, time::Duration};
132+
use tokio::time::sleep;
133+
134+
#[tokio::test]
135+
async fn test_shutdown_coordination() {
136+
let coordinator = Arc::new(ShutdownCoordinator::new_for_test());
137+
138+
let guard1 = coordinator.enter_operation();
139+
assert!(guard1.is_some());
140+
141+
let guard2 = coordinator.enter_operation();
142+
assert!(guard2.is_some());
143+
144+
assert_eq!(coordinator.active_operations_count(), 2);
145+
146+
let coordinator_clone = coordinator.clone();
147+
let shutdown_task = tokio::spawn(async move {
148+
coordinator_clone.request_shutdown(Duration::from_millis(100)).await
149+
});
150+
151+
sleep(Duration::from_millis(10)).await;
152+
153+
let guard3 = coordinator.enter_operation();
154+
assert!(guard3.is_none());
155+
156+
assert_eq!(coordinator.active_operations_count(), 2);
157+
158+
drop(guard1);
159+
assert_eq!(coordinator.active_operations_count(), 1);
160+
161+
drop(guard2);
162+
163+
let result = shutdown_task.await.unwrap();
164+
assert!(result);
165+
assert_eq!(coordinator.active_operations_count(), 0);
166+
}
167+
168+
#[tokio::test]
169+
async fn test_shutdown_timeout() {
170+
let coordinator = Arc::new(ShutdownCoordinator::new_for_test());
171+
172+
let _guard = coordinator.enter_operation();
173+
assert!(_guard.is_some());
174+
assert_eq!(coordinator.active_operations_count(), 1);
175+
176+
let result = coordinator.request_shutdown(Duration::from_millis(10)).await;
177+
178+
assert!(!result);
179+
assert_eq!(coordinator.active_operations_count(), 1);
180+
181+
drop(_guard);
182+
assert_eq!(coordinator.active_operations_count(), 0);
183+
}
184+
185+
#[tokio::test]
186+
async fn test_enter_critical_operation() {
187+
let guard = enter_critical_operation();
188+
189+
if guard.is_some() {
190+
drop(guard);
191+
}
192+
}
193+
194+
#[tokio::test]
195+
async fn test_operation_guard_lifecycle() {
196+
let coordinator = Arc::new(ShutdownCoordinator::new_for_test());
197+
assert_eq!(coordinator.active_operations_count(), 0);
198+
199+
{
200+
let _guard1 = coordinator.enter_operation();
201+
assert_eq!(coordinator.active_operations_count(), 1);
202+
203+
{
204+
let _guard2 = coordinator.enter_operation();
205+
assert_eq!(coordinator.active_operations_count(), 2);
206+
}
207+
208+
assert_eq!(coordinator.active_operations_count(), 1);
209+
}
210+
211+
assert_eq!(coordinator.active_operations_count(), 0);
212+
}
213+
214+
#[tokio::test]
215+
async fn test_shutdown_signal_propagation() {
216+
let coordinator = Arc::new(ShutdownCoordinator::new_for_test());
217+
let mut rx = coordinator.subscribe();
218+
219+
let coordinator_clone = coordinator.clone();
220+
tokio::spawn(
221+
async move { coordinator_clone.request_shutdown(Duration::from_millis(50)).await },
222+
);
223+
224+
let result = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await;
225+
assert!(result.is_ok());
226+
assert!(result.unwrap().is_ok());
227+
}
228+
}

crates/core/src/startup.rs

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::{
1818
schema::apply_schema,
1919
setup_info_logger,
2020
shared::cache::Cache,
21+
shutdown,
2122
signing::create_signing_routes,
2223
transaction::{
2324
api::create_transactions_routes,
@@ -38,11 +39,15 @@ use axum::{
3839
};
3940
use dotenv::dotenv;
4041
use std::path::Path;
41-
use std::{net::SocketAddr, sync::Arc, time::Instant};
42+
use std::{
43+
net::SocketAddr,
44+
sync::Arc,
45+
time::{Duration, Instant},
46+
};
4247
use thiserror::Error;
4348
use tokio::sync::Mutex;
4449
use tower_http::cors::{AllowOrigin, Any, CorsLayer};
45-
use tracing::{error, info};
50+
use tracing::{error, info, warn};
4651

4752
#[derive(Error, Debug)]
4853
#[allow(clippy::enum_variant_names)]
@@ -258,7 +263,62 @@ async fn start_api(
258263

259264
let listener = tokio::net::TcpListener::bind(&address).await?;
260265
info!("rrelayer is up on http://{}", address);
261-
axum::serve(listener, app).await.map_err(StartApiError::ApiStartupError)?;
266+
267+
let shutdown_signal = async {
268+
let ctrl_c = async {
269+
tokio::signal::ctrl_c().await.expect("failed to install Ctrl+C handler");
270+
};
271+
272+
#[cfg(unix)]
273+
let terminate = async {
274+
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
275+
.expect("failed to install signal handler")
276+
.recv()
277+
.await;
278+
};
279+
280+
#[cfg(windows)]
281+
let terminate = async {
282+
tokio::signal::windows::ctrl_break()
283+
.expect("failed to install Ctrl+Break handler")
284+
.recv()
285+
.await;
286+
};
287+
288+
#[cfg(not(any(unix, windows)))]
289+
let terminate = std::future::pending::<()>();
290+
291+
tokio::select! {
292+
_ = ctrl_c => {
293+
info!("Received Ctrl+C, initiating graceful shutdown");
294+
},
295+
_ = terminate => {
296+
#[cfg(unix)]
297+
info!("Received SIGTERM, initiating graceful shutdown");
298+
#[cfg(windows)]
299+
info!("Received Ctrl+Break, initiating graceful shutdown");
300+
#[cfg(not(any(unix, windows)))]
301+
info!("Received terminate signal, initiating graceful shutdown");
302+
},
303+
}
304+
};
305+
306+
tokio::select! {
307+
result = axum::serve(listener, app) => {
308+
result.map_err(StartApiError::ApiStartupError)?;
309+
}
310+
_ = shutdown_signal => {
311+
info!("Starting graceful shutdown...");
312+
313+
let shutdown_successful = shutdown::request_graceful_shutdown(Duration::from_secs(30)).await;
314+
315+
if shutdown_successful {
316+
info!("Graceful shutdown completed successfully");
317+
} else {
318+
warn!("Some operations did not complete within shutdown timeout");
319+
}
320+
}
321+
}
262322

263323
Ok(())
264324
}

0 commit comments

Comments
 (0)