Skip to content

Commit d6f5e8f

Browse files
committed
feat(dashboard): wire monitoring and indexer tasks through strata-tasks
1 parent 1586de8 commit d6f5e8f

7 files changed

Lines changed: 102 additions & 37 deletions

File tree

backend/Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/bin/dashboard/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
path = "src/main.rs"
99

1010
[dependencies]
11+
anyhow.workspace = true
1112
status-bridge.workspace = true
1213
status-config.workspace = true
1314
status-network.workspace = true
15+
strata-tasks.workspace = true
1416

1517
axum.workspace = true
1618
tokio.workspace = true

backend/bin/dashboard/src/main.rs

Lines changed: 63 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,96 @@
1+
use std::{net::SocketAddr, sync::Arc, time::Duration};
2+
3+
use anyhow::Result;
14
use axum::{routing::get, Router};
2-
use std::{net::SocketAddr, sync::Arc};
3-
use tokio::net::TcpListener;
5+
use status_bridge::{
6+
bridge_monitoring_task, get_bridge_status, run_withdrawal_indexer, BridgeMonitoringContext,
7+
};
8+
use status_config::Config;
9+
use status_network::{fetch_statuses_task, get_network_status, NetworkMonitoringContext};
10+
use strata_tasks::TaskManager;
11+
use tokio::{net::TcpListener, runtime};
412
use tower_http::cors::{Any, CorsLayer};
513
use tracing::info;
614

7-
use status_bridge::{bridge_monitoring_task, get_bridge_status, BridgeMonitoringContext};
8-
use status_config::Config;
9-
use status_network::{fetch_statuses_task, get_network_status, NetworkMonitoringContext};
15+
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
1016

11-
#[tokio::main]
12-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
17+
fn main() -> Result<()> {
1318
tracing_subscriber::fmt::fmt()
1419
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1520
.init();
1621

17-
// Load configuration from TOML
22+
let runtime = runtime::Builder::new_multi_thread()
23+
.enable_all()
24+
.thread_name("status-dashboard-rt")
25+
.build()?;
26+
1827
let config_path = std::env::var("CONFIG_PATH").unwrap_or_else(|_| "config.toml".to_string());
1928
let config = Arc::new(Config::load_from_path(&config_path));
2029

21-
let cors = CorsLayer::new().allow_origin(Any);
30+
let task_manager = TaskManager::new(runtime.handle().clone());
31+
let executor = task_manager.create_executor();
2232

2333
let network_context = Arc::new(NetworkMonitoringContext::new(config.network().clone()));
24-
let network_context_clone = Arc::clone(&network_context);
25-
26-
tokio::spawn(async move {
27-
fetch_statuses_task(network_context_clone).await;
28-
});
29-
30-
// Bridge monitoring
3134
let bridge_context = Arc::new(BridgeMonitoringContext::new(config.bridge().clone()));
32-
let bridge_context_clone = Arc::clone(&bridge_context);
33-
34-
tokio::spawn(async move {
35-
bridge_monitoring_task(bridge_context_clone).await;
36-
});
3735

36+
let cors = CorsLayer::new().allow_origin(Any);
3837
let app = Router::new()
3938
.route(
4039
"/api/status",
41-
get(move || get_network_status(network_context)),
40+
get({
41+
let network_context = Arc::clone(&network_context);
42+
move || get_network_status(Arc::clone(&network_context))
43+
}),
4244
)
4345
.route(
4446
"/api/bridge_status",
45-
get(move || get_bridge_status(bridge_context)),
47+
get({
48+
let bridge_context = Arc::clone(&bridge_context);
49+
move || get_bridge_status(Arc::clone(&bridge_context))
50+
}),
4651
)
4752
.layer(cors);
4853

4954
let addr = SocketAddr::from((
5055
config.server().host().parse::<std::net::IpAddr>()?,
5156
config.server().port(),
5257
));
53-
info!(%addr, "Server running at http://{}", addr);
5458

55-
let listener = TcpListener::bind(addr).await?;
56-
axum::serve(listener, app).await?;
59+
executor.spawn_critical_async_with_shutdown("network-monitoring", {
60+
let network_context = Arc::clone(&network_context);
61+
move |shutdown| async move { fetch_statuses_task(network_context, shutdown).await }
62+
});
63+
64+
executor.spawn_critical_async_with_shutdown("bridge-monitoring", {
65+
let bridge_context = Arc::clone(&bridge_context);
66+
move |shutdown| async move { bridge_monitoring_task(bridge_context, shutdown).await }
67+
});
68+
69+
executor.spawn_critical_async_with_shutdown("withdrawal-indexer", {
70+
let datadir = config.datadir().to_path_buf();
71+
let cfg = config.withdrawal_indexer().clone();
72+
move |shutdown| async move { run_withdrawal_indexer(&datadir, cfg, shutdown).await }
73+
});
74+
75+
executor.spawn_critical_async_with_shutdown("http-server", {
76+
move |shutdown| async move {
77+
let listener = TcpListener::bind(addr).await?;
78+
info!(%addr, "Server running at http://{}", addr);
79+
let shutdown_future = async move {
80+
shutdown.wait_for_shutdown().await;
81+
};
82+
83+
axum::serve(listener, app)
84+
.with_graceful_shutdown(shutdown_future)
85+
.await?;
86+
87+
Ok(())
88+
}
89+
});
90+
91+
task_manager.start_signal_listeners();
92+
task_manager.monitor(Some(SHUTDOWN_TIMEOUT))?;
5793

94+
info!("Exiting status dashboard backend");
5895
Ok(())
5996
}

backend/crates/bridge/src/status.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use anyhow::Result;
12
use axum::Json;
23
use bitcoin::{secp256k1::PublicKey, Txid};
34

@@ -10,6 +11,7 @@ use strata_bridge_rpc::types::{
1011
};
1112

1213
use strata_primitives::buf::Buf32;
14+
use strata_tasks::ShutdownGuard;
1315

1416
use super::{
1517
cache::BridgeStatusCache,
@@ -283,7 +285,10 @@ async fn determine_reimbursements_to_purge(
283285
}
284286

285287
/// Periodically fetch bridge status and update bridge cache
286-
pub async fn bridge_monitoring_task(context: Arc<BridgeMonitoringContext>) {
288+
pub async fn bridge_monitoring_task(
289+
context: Arc<BridgeMonitoringContext>,
290+
shutdown: ShutdownGuard,
291+
) -> Result<()> {
287292
let mut interval = interval(Duration::from_secs(
288293
context.config.status_refetch_interval(),
289294
));
@@ -292,6 +297,11 @@ pub async fn bridge_monitoring_task(context: Arc<BridgeMonitoringContext>) {
292297
let rpc_manager = RpcClientManager::new(&context.config);
293298

294299
loop {
300+
tokio::select! {
301+
_ = shutdown.wait_for_shutdown() => break,
302+
_ = interval.tick() => {}
303+
}
304+
295305
// Fetch all data without holding lock
296306

297307
// Bridge operator status
@@ -418,10 +428,9 @@ pub async fn bridge_monitoring_task(context: Arc<BridgeMonitoringContext>) {
418428
context.status_available.store(true, Ordering::Release);
419429
context.initial_status_query_complete.notify_waiters();
420430
}
421-
422-
// Wait for next interval
423-
interval.tick().await;
424431
}
432+
433+
Ok(())
425434
}
426435

427436
/// Fetch operator status

backend/crates/bridge/src/withdrawal_indexer/task.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,12 @@ where
8787
"withdrawal indexer starting"
8888
);
8989
loop {
90-
// Process the tick `Result<_, IndexerError>` inside the select arm
91-
// rather than binding it. `IndexerError` transitively wraps
92-
// `Box<dyn Error>` (no `+ Send + Sync`) via `typed_sled::CodecError`,
93-
// so it must not survive into the next await — only the `Send`
94-
// `more_work` flag escapes.
90+
// The indexer can stop mid-tick because writes are atomic per event
91+
// and replay is idempotent on restart. Process the tick result
92+
// inside the select arm rather than binding it: `IndexerError`
93+
// transitively wraps `Box<dyn Error>` (no `+ Send + Sync`) via
94+
// `typed_sled::CodecError`, so it must not survive into the next
95+
// await — only the `Send` `more_work` flag escapes.
9596
let mut more_work = false;
9697
tokio::select! {
9798
_ = shutdown.wait_for_shutdown() => break,

backend/crates/network/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
path = "src/lib.rs"
88

99
[dependencies]
10+
anyhow.workspace = true
1011
status-config.workspace = true
1112
status-utils.workspace = true
13+
strata-tasks.workspace = true
1214

1315
axum.workspace = true
1416
jsonrpsee.workspace = true

backend/crates/network/src/status.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use anyhow::Result;
12
use axum::Json;
23
use jsonrpsee::core::client::ClientT;
34
use jsonrpsee::http_client::HttpClient;
45
use std::sync::Arc;
6+
use strata_tasks::ShutdownGuard;
57
use tokio::time::{interval, sleep, Duration};
68
use tracing::{error, info};
79

@@ -58,7 +60,10 @@ async fn check_bundler_health(
5860
}
5961

6062
/// Periodically fetches real statuses
61-
pub async fn fetch_statuses_task(context: Arc<NetworkMonitoringContext>) {
63+
pub async fn fetch_statuses_task(
64+
context: Arc<NetworkMonitoringContext>,
65+
shutdown: ShutdownGuard,
66+
) -> Result<()> {
6267
info!("Fetching statuses...");
6368
let mut interval = interval(Duration::from_secs(
6469
context.config.status_refetch_interval(),
@@ -68,7 +73,10 @@ pub async fn fetch_statuses_task(context: Arc<NetworkMonitoringContext>) {
6873
let http_client = reqwest::Client::new();
6974

7075
loop {
71-
interval.tick().await;
76+
tokio::select! {
77+
_ = shutdown.wait_for_shutdown() => break,
78+
_ = interval.tick() => {}
79+
}
7280

7381
let sequencer =
7482
call_rpc_status(&sequencer_client, context.config.sequencer_retry_policy()).await;
@@ -96,6 +104,8 @@ pub async fn fetch_statuses_task(context: Arc<NetworkMonitoringContext>) {
96104
context.initial_status_query_complete.notify_waiters();
97105
}
98106
}
107+
108+
Ok(())
99109
}
100110

101111
/// Handler to get the current network status

0 commit comments

Comments
 (0)