Skip to content

Commit 79f544c

Browse files
Feat/graceful shutdown (#117)
* fix: resolve race condition of dependency blocking * feat: handle graceful shutdowns * note * rebase
1 parent 8c674cf commit 79f544c

File tree

12 files changed

+264
-79
lines changed

12 files changed

+264
-79
lines changed

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ deadpool = { version = "0.12", features = ["rt_tokio_1"] }
5757
deadpool-lapin = "0.12"
5858
teloxide = "0.12"
5959
serenity = { version = "0.12", features = ["client", "framework"] }
60+
once_cell = "1.19.0"
6061

6162
# build
6263
jemallocator = { version = "0.5.0", optional = true }

core/src/api/graphql.rs

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ pub async fn start_graphql_server(
141141
settings.disable_advanced_filters,
142142
);
143143

144-
setup_ctrlc_handler(Arc::new(Mutex::new(None::<Child>)));
144+
// Do not need now with the main shutdown keeping around in-case
145+
// setup_ctrlc_handler(Arc::new(Mutex::new(None::<Child>)));
145146

146147
// Wait for the initial server startup
147148
let pid = rx.await.map_err(|e| {
@@ -273,23 +274,6 @@ async fn start_server(
273274
.map_err(|e| e.to_string())
274275
}
275276

276-
fn setup_ctrlc_handler(child_arc: Arc<Mutex<Option<Child>>>) {
277-
ctrlc::set_handler(move || {
278-
MANUAL_STOP.store(true, Ordering::SeqCst);
279-
if let Ok(mut guard) = child_arc.lock() {
280-
if let Some(child) = guard.as_mut() {
281-
if let Err(e) = kill_process_tree(child.id()) {
282-
error!("Failed to kill child process: {}", e);
283-
} else {
284-
info!("GraphQL server process killed");
285-
}
286-
}
287-
}
288-
std::process::exit(0);
289-
})
290-
.expect("Error setting Ctrl-C handler");
291-
}
292-
293277
async fn perform_health_check(
294278
graphql_endpoint: &str,
295279
graphql_playground: &str,
@@ -336,17 +320,36 @@ async fn perform_health_check(
336320
Ok(())
337321
}
338322

339-
fn kill_process_tree(pid: u32) -> Result<(), String> {
340-
if cfg!(target_os = "windows") {
341-
Command::new("taskkill")
342-
.args(["/PID", &pid.to_string(), "/T", "/F"])
343-
.output()
344-
.map_err(|e| e.to_string())?;
345-
} else {
346-
Command::new("pkill")
347-
.args(["-TERM", "-P", &pid.to_string()])
348-
.output()
349-
.map_err(|e| e.to_string())?;
350-
}
351-
Ok(())
352-
}
323+
// Do not need now with the main shutdown keeping around in-case
324+
// fn setup_ctrlc_handler(child_arc: Arc<Mutex<Option<Child>>>) {
325+
// ctrlc::set_handler(move || {
326+
// MANUAL_STOP.store(true, Ordering::SeqCst);
327+
// if let Ok(mut guard) = child_arc.lock() {
328+
// if let Some(child) = guard.as_mut() {
329+
// if let Err(e) = kill_process_tree(child.id()) {
330+
// error!("Failed to kill child process: {}", e);
331+
// } else {
332+
// info!("GraphQL server process killed");
333+
// }
334+
// }
335+
// }
336+
// std::process::exit(0);
337+
// })
338+
// .expect("Error setting Ctrl-C handler");
339+
// }
340+
341+
// Do not need now with the main shutdown keeping around in-case
342+
// fn kill_process_tree(pid: u32) -> Result<(), String> {
343+
// if cfg!(target_os = "windows") {
344+
// Command::new("taskkill")
345+
// .args(["/PID", &pid.to_string(), "/T", "/F"])
346+
// .output()
347+
// .map_err(|e| e.to_string())?;
348+
// } else {
349+
// Command::new("pkill")
350+
// .args(["-TERM", "-P", &pid.to_string()])
351+
// .output()
352+
// .map_err(|e| e.to_string())?;
353+
// }
354+
// Ok(())
355+
// }

core/src/event/callback_registry.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ use ethers::{
66
types::{Bytes, Log, H256, U256, U64},
77
};
88
use futures::future::BoxFuture;
9-
use rand::Rng;
109
use serde::{Deserialize, Serialize};
1110
use tokio::time::sleep;
12-
use tracing::{debug, error};
11+
use tracing::{debug, error, info};
1312

1413
use crate::{
1514
event::contract_setup::{ContractInformation, NetworkContract},
1615
indexer::start::ProcessedNetworkContract,
16+
is_running,
1717
provider::WrappedLog,
1818
};
1919

@@ -144,6 +144,11 @@ impl EventCallbackRegistry {
144144
debug!("{} - Pushed {} events", data.len(), event_information.info_log_name());
145145

146146
loop {
147+
if !is_running() {
148+
info!("Detected shutdown, stopping event trigger");
149+
break;
150+
}
151+
147152
match (event_information.callback)(data.clone()).await {
148153
Ok(_) => {
149154
debug!(
@@ -153,18 +158,19 @@ impl EventCallbackRegistry {
153158
break;
154159
}
155160
Err(e) => {
161+
if !is_running() {
162+
info!("Detected shutdown, stopping event trigger");
163+
break;
164+
}
156165
attempts += 1;
157166
error!(
158167
"{} Event processing failed - id: {} - topic_id: {}. Retrying... (attempt {}). Error: {}",
159168
event_information.info_log_name(), id, event_information.topic_id, attempts, e
160169
);
161170

162-
sleep(delay).await;
163-
delay = (delay * 2).min(Duration::from_secs(15)); // Max delay of 15 seconds
171+
delay = (delay * 2).min(Duration::from_secs(15));
164172

165-
// add some jitter to the delay to avoid thundering herd problem
166-
let jitter = Duration::from_millis(rand::thread_rng().gen_range(0..1000));
167-
sleep(delay + jitter).await;
173+
sleep(delay).await;
168174
}
169175
}
170176
}

core/src/indexer/last_synced.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,11 @@ async fn update_last_synced_block_number_for_file(
215215
Ok(())
216216
}
217217

218-
pub fn update_progress_and_last_synced(config: Arc<EventProcessingConfig>, to_block: U64) {
218+
pub fn update_progress_and_last_synced_task(
219+
config: Arc<EventProcessingConfig>,
220+
to_block: U64,
221+
on_complete: impl FnOnce() + Send + 'static,
222+
) {
219223
tokio::spawn(async move {
220224
let update_last_synced_block_result = config
221225
.progress
@@ -281,5 +285,7 @@ pub fn update_progress_and_last_synced(config: Arc<EventProcessingConfig>, to_bl
281285
);
282286
}
283287
}
288+
289+
on_complete();
284290
});
285291
}

core/src/indexer/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ mod last_synced;
1313
pub mod no_code;
1414
mod reorg;
1515
pub mod start;
16+
pub mod task_tracker;
17+
1618
pub use dependency::{ContractEventDependencies, EventDependencies, EventsDependencyTree};
1719

1820
use crate::manifest::contract::Contract;

core/src/indexer/process.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ use crate::{
2121
indexer::{
2222
dependency::{ContractEventsDependenciesConfig, EventDependencies},
2323
fetch_logs::{fetch_logs_stream, FetchLogsResult},
24-
last_synced::update_progress_and_last_synced,
24+
last_synced::update_progress_and_last_synced_task,
2525
log_helpers::is_relevant_block,
2626
progress::IndexingEventProgressStatus,
27+
task_tracker::{indexing_event_processed, indexing_event_processing},
2728
},
29+
is_running,
2830
};
2931

3032
#[derive(thiserror::Error, Debug)]
@@ -501,6 +503,16 @@ async fn live_indexing_for_contract_event_dependencies<'a>(
501503
}
502504
}
503505

506+
async fn trigger_event(
507+
config: Arc<EventProcessingConfig>,
508+
fn_data: Vec<EventResult>,
509+
to_block: U64,
510+
) {
511+
indexing_event_processing();
512+
config.trigger_event(fn_data).await;
513+
update_progress_and_last_synced_task(config, to_block, indexing_event_processed);
514+
}
515+
504516
async fn handle_logs_result(
505517
config: Arc<EventProcessingConfig>,
506518
result: Result<FetchLogsResult, Box<dyn std::error::Error + Send>>,
@@ -522,17 +534,19 @@ async fn handle_logs_result(
522534
})
523535
.collect::<Vec<_>>();
524536

537+
// if shutting down so do not process anymore event
538+
while !is_running() {
539+
tokio::time::sleep(Duration::from_millis(1000)).await;
540+
}
541+
525542
if !fn_data.is_empty() {
526543
return if config.index_event_in_order {
527-
config.trigger_event(fn_data).await;
528-
update_progress_and_last_synced(config, result.to_block);
529-
Ok(tokio::spawn(async {})) // Return a completed task
544+
trigger_event(config, fn_data, result.to_block).await;
545+
Ok(tokio::spawn(async {}))
530546
} else {
531547
let task = tokio::spawn(async move {
532-
config.trigger_event(fn_data).await;
533-
update_progress_and_last_synced(config, result.to_block);
548+
trigger_event(config, fn_data, result.to_block).await;
534549
});
535-
536550
Ok(task)
537551
}
538552
}

core/src/indexer/task_tracker.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use std::sync::atomic::{AtomicUsize, Ordering};
2+
3+
use once_cell::sync::Lazy;
4+
5+
static INDEXING_TASKS: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));
6+
7+
pub fn indexing_event_processing() {
8+
INDEXING_TASKS.fetch_add(1, Ordering::SeqCst);
9+
}
10+
11+
pub fn indexing_event_processed() {
12+
INDEXING_TASKS.fetch_sub(1, Ordering::SeqCst);
13+
}
14+
15+
pub fn active_indexing_count() -> usize {
16+
INDEXING_TASKS.load(Ordering::SeqCst)
17+
}

core/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ pub mod generator;
33
pub mod indexer;
44
pub mod manifest;
55

6+
mod system_state;
7+
pub use system_state::{initiate_shutdown, is_running};
8+
69
mod database;
710
pub use database::postgres::{
811
client::{PostgresClient, ToSql},
@@ -33,6 +36,7 @@ pub mod provider;
3336
mod start;
3437
mod streams;
3538
mod types;
39+
3640
// export 3rd party dependencies
3741
pub use async_trait::async_trait;
3842
pub use colored::Colorize as RindexerColorize;

core/src/logger.rs

Lines changed: 84 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,75 @@
1-
use tracing::{debug, level_filters::LevelFilter};
1+
use std::{
2+
io::Write,
3+
sync::atomic::{AtomicBool, Ordering},
4+
};
5+
6+
use once_cell::sync::Lazy;
7+
use tracing::level_filters::LevelFilter;
28
use tracing_subscriber::{
3-
fmt::format::{Format, Writer},
9+
fmt::{
10+
format::{Format, Writer},
11+
MakeWriter,
12+
},
413
EnvFilter,
514
};
615

16+
static SHUTDOWN_IN_PROGRESS: Lazy<AtomicBool> = Lazy::new(|| AtomicBool::new(false));
17+
18+
struct ShutdownAwareWriter {
19+
buffer: std::io::BufWriter<std::io::Stdout>,
20+
}
21+
22+
impl ShutdownAwareWriter {
23+
fn new() -> Self {
24+
Self { buffer: std::io::BufWriter::new(std::io::stdout()) }
25+
}
26+
}
27+
28+
impl Write for ShutdownAwareWriter {
29+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
30+
if SHUTDOWN_IN_PROGRESS.load(Ordering::Relaxed) {
31+
// During shutdown, write directly to stdout
32+
let stdout = std::io::stdout();
33+
let mut handle = stdout.lock();
34+
handle.write(buf)
35+
} else {
36+
self.buffer.write(buf)
37+
}
38+
}
39+
40+
fn flush(&mut self) -> std::io::Result<()> {
41+
if SHUTDOWN_IN_PROGRESS.load(Ordering::Relaxed) {
42+
let stdout = std::io::stdout();
43+
let mut handle = stdout.lock();
44+
handle.flush()
45+
} else {
46+
self.buffer.flush()
47+
}
48+
}
49+
}
50+
51+
struct ShutdownAwareWriterMaker;
52+
53+
impl<'a> MakeWriter<'a> for ShutdownAwareWriterMaker {
54+
type Writer = ShutdownAwareWriter;
55+
56+
fn make_writer(&'a self) -> Self::Writer {
57+
ShutdownAwareWriter::new()
58+
}
59+
}
60+
761
struct CustomTimer;
862

963
impl tracing_subscriber::fmt::time::FormatTime for CustomTimer {
1064
fn format_time(&self, writer: &mut Writer<'_>) -> std::fmt::Result {
11-
let now = chrono::Local::now();
12-
write!(writer, "{} - {}", now.format("%d %B"), now.format("%H:%M:%S%.6f"))
65+
// Use a simpler time format during shutdown
66+
if SHUTDOWN_IN_PROGRESS.load(Ordering::Relaxed) {
67+
let now = chrono::Local::now();
68+
write!(writer, "{}", now.format("%H:%M:%S"))
69+
} else {
70+
let now = chrono::Local::now();
71+
write!(writer, "{} - {}", now.format("%d %B"), now.format("%H:%M:%S%.6f"))
72+
}
1373
}
1474
}
1575

@@ -18,22 +78,32 @@ pub fn setup_logger(log_level: LevelFilter) {
1878

1979
let format = Format::default().with_timer(CustomTimer).with_level(true).with_target(false);
2080

21-
let subscriber =
22-
tracing_subscriber::fmt().with_env_filter(filter).event_format(format).finish();
81+
let subscriber = tracing_subscriber::fmt()
82+
.with_writer(ShutdownAwareWriterMaker)
83+
.with_env_filter(filter)
84+
.event_format(format)
85+
.finish();
2386

2487
if tracing::subscriber::set_global_default(subscriber).is_err() {
25-
debug!("Logger has already been set up, continuing...");
88+
// Use println! here since logging might not be set up yet
89+
println!("Logger has already been set up, continuing...");
2690
}
2791
}
2892

2993
pub fn setup_info_logger() {
3094
setup_logger(LevelFilter::INFO);
3195
}
3296

33-
// pub fn set_no_op_logger() -> DefaultGuard {
34-
// let no_op_subscriber = FmtSubscriber::builder().with_writer(|| NullWriter).finish();
35-
//
36-
// let no_op_dispatch = Dispatch::new(no_op_subscriber);
37-
//
38-
// tracing::dispatcher::set_default(&no_op_dispatch)
39-
// }
97+
// Call this when starting shutdown
98+
pub fn mark_shutdown_started() {
99+
SHUTDOWN_IN_PROGRESS.store(true, Ordering::Relaxed);
100+
}
101+
102+
// Optional guard for temporary logger suppression
103+
pub struct LoggerGuard;
104+
105+
impl Drop for LoggerGuard {
106+
fn drop(&mut self) {
107+
SHUTDOWN_IN_PROGRESS.store(false, Ordering::Relaxed);
108+
}
109+
}

0 commit comments

Comments
 (0)