Skip to content

Commit e8ad16e

Browse files
feat: Add health monitoring server with comprehensive system checks, closes #49 (#303)
1 parent 0f72a03 commit e8ad16e

File tree

18 files changed

+948
-11
lines changed

18 files changed

+948
-11
lines changed

Cargo.lock

Lines changed: 72 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/src/commands/start.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use rindexer::{
66
yaml::{read_manifest, YAML_CONFIG_NAME},
77
},
88
rindexer_error, rindexer_info, setup_info_logger, start_rindexer_no_code,
9-
GraphqlOverrideSettings, IndexerNoCodeDetails, PostgresClient, StartNoCodeDetails,
9+
GraphqlOverrideSettings, HealthOverrideSettings, IndexerNoCodeDetails, PostgresClient,
10+
StartNoCodeDetails,
1011
};
1112

1213
use crate::{
@@ -182,6 +183,7 @@ pub async fn start(
182183
enabled: false,
183184
override_port: None,
184185
},
186+
health_details: HealthOverrideSettings { override_port: None },
185187
};
186188

187189
start_rindexer_no_code(details).await.map_err(|e| {
@@ -197,6 +199,7 @@ pub async fn start(
197199
enabled: true,
198200
override_port: port.as_ref().and_then(|port| port.parse().ok()),
199201
},
202+
health_details: HealthOverrideSettings { override_port: None },
200203
};
201204

202205
start_rindexer_no_code(details).await.map_err(|e| {
@@ -212,6 +215,7 @@ pub async fn start(
212215
enabled: true,
213216
override_port: port.as_ref().and_then(|port| port.parse().ok()),
214217
},
218+
health_details: HealthOverrideSettings { override_port: None },
215219
};
216220

217221
let _ = start_rindexer_no_code(details).await.map_err(|e| {

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ anyhow = "1.0.99"
7878
winnow = "0.7.13"
7979
tower = "0.5.2"
8080
port-killer = "0.1.0"
81+
axum = "0.7"
8182

8283
# build
8384
jemallocator = { version = "0.6.0", package = "tikv-jemallocator", optional = true }

core/src/health.rs

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
use std::{net::SocketAddr, sync::Arc};
2+
3+
use axum::{extract::State, http::StatusCode, response::Json, routing::get, Router};
4+
use serde::{Deserialize, Serialize};
5+
use tokio::net::TcpListener;
6+
use tracing::{error, info};
7+
8+
use crate::{
9+
database::postgres::client::PostgresClient, indexer::task_tracker::active_indexing_count,
10+
manifest::core::Manifest, system_state::is_running,
11+
};
12+
13+
#[derive(Debug, Clone, Serialize, Deserialize)]
14+
pub struct HealthStatus {
15+
pub status: HealthStatusType,
16+
pub timestamp: String,
17+
pub services: HealthServices,
18+
pub indexing: IndexingStatus,
19+
}
20+
21+
#[derive(Debug, Clone, Serialize, Deserialize)]
22+
pub struct HealthServices {
23+
pub database: HealthStatusType,
24+
pub indexing: HealthStatusType,
25+
pub sync: HealthStatusType,
26+
}
27+
28+
#[derive(Debug, Clone, Serialize, Deserialize)]
29+
pub struct IndexingStatus {
30+
pub active_tasks: usize,
31+
pub is_running: bool,
32+
}
33+
34+
#[derive(Debug, Clone, Serialize, Deserialize)]
35+
#[serde(rename_all = "lowercase")]
36+
pub enum HealthStatusType {
37+
Healthy,
38+
Unhealthy,
39+
Unknown,
40+
NotConfigured,
41+
Disabled,
42+
NoData,
43+
Stopped,
44+
}
45+
46+
#[derive(Clone)]
47+
pub struct HealthServerState {
48+
pub manifest: Arc<Manifest>,
49+
pub postgres_client: Option<Arc<PostgresClient>>,
50+
}
51+
52+
pub struct HealthServer {
53+
port: u16,
54+
state: HealthServerState,
55+
}
56+
57+
impl HealthServer {
58+
pub fn new(
59+
port: u16,
60+
manifest: Arc<Manifest>,
61+
postgres_client: Option<Arc<PostgresClient>>,
62+
) -> Self {
63+
Self { port, state: HealthServerState { manifest, postgres_client } }
64+
}
65+
66+
pub async fn start(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
67+
let app = Router::new().route("/health", get(health_handler)).with_state(self.state);
68+
69+
let addr = SocketAddr::from(([0, 0, 0, 0], self.port));
70+
let listener = TcpListener::bind(addr).await?;
71+
72+
info!("🩺 Health server started on http://0.0.0.0:{}/health", self.port);
73+
74+
axum::serve(listener, app).await?;
75+
Ok(())
76+
}
77+
}
78+
79+
async fn health_handler(
80+
State(state): State<HealthServerState>,
81+
) -> Result<(StatusCode, Json<HealthStatus>), StatusCode> {
82+
let database_health = check_database_health(&state).await;
83+
let indexing_health = check_indexing_health();
84+
let sync_health = check_sync_health(&state).await;
85+
86+
let overall_status = determine_overall_status(&database_health, &indexing_health, &sync_health);
87+
88+
let health_status =
89+
build_health_status(overall_status, database_health, indexing_health, sync_health);
90+
91+
let status_code = if matches!(health_status.status, HealthStatusType::Healthy) {
92+
StatusCode::OK
93+
} else {
94+
StatusCode::SERVICE_UNAVAILABLE
95+
};
96+
97+
Ok((status_code, Json(health_status)))
98+
}
99+
100+
fn build_health_status(
101+
overall_status: HealthStatusType,
102+
database_health: HealthStatusType,
103+
indexing_health: HealthStatusType,
104+
sync_health: HealthStatusType,
105+
) -> HealthStatus {
106+
HealthStatus {
107+
status: overall_status,
108+
timestamp: chrono::Utc::now().to_rfc3339(),
109+
services: HealthServices {
110+
database: database_health,
111+
indexing: indexing_health,
112+
sync: sync_health,
113+
},
114+
indexing: IndexingStatus {
115+
active_tasks: active_indexing_count(),
116+
is_running: is_running(),
117+
},
118+
}
119+
}
120+
121+
async fn check_database_health(state: &HealthServerState) -> HealthStatusType {
122+
if !state.manifest.storage.postgres_enabled() {
123+
return HealthStatusType::Disabled;
124+
}
125+
126+
match &state.postgres_client {
127+
Some(client) => match client.query_one("SELECT 1", &[]).await {
128+
Ok(_) => HealthStatusType::Healthy,
129+
Err(e) => {
130+
error!("Database health check failed: {}", e);
131+
HealthStatusType::Unhealthy
132+
}
133+
},
134+
None => HealthStatusType::NotConfigured,
135+
}
136+
}
137+
138+
fn check_indexing_health() -> HealthStatusType {
139+
if is_running() {
140+
HealthStatusType::Healthy
141+
} else {
142+
HealthStatusType::Stopped
143+
}
144+
}
145+
146+
async fn check_sync_health(state: &HealthServerState) -> HealthStatusType {
147+
if state.manifest.storage.postgres_enabled() {
148+
check_postgres_sync_health(state).await
149+
} else if state.manifest.storage.csv_enabled() {
150+
check_csv_sync_health(state)
151+
} else {
152+
HealthStatusType::Disabled
153+
}
154+
}
155+
156+
async fn check_postgres_sync_health(state: &HealthServerState) -> HealthStatusType {
157+
match &state.postgres_client {
158+
Some(client) => {
159+
match client.query_one_or_none(
160+
r#"SELECT 1 FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'rindexer_internal') AND table_name NOT LIKE 'latest_block' AND table_name NOT LIKE '%_last_known_%' AND table_name NOT LIKE '%_last_run_%' LIMIT 1"#,
161+
&[]
162+
).await {
163+
Ok(Some(_)) => HealthStatusType::Healthy,
164+
Ok(None) => HealthStatusType::NoData,
165+
Err(e) => {
166+
error!("Sync health check failed: {}", e);
167+
HealthStatusType::Unhealthy
168+
}
169+
}
170+
}
171+
None => HealthStatusType::NotConfigured,
172+
}
173+
}
174+
175+
fn check_csv_sync_health(state: &HealthServerState) -> HealthStatusType {
176+
match &state.manifest.storage.csv {
177+
Some(csv_details) => {
178+
let csv_path = std::path::Path::new(&csv_details.path);
179+
if !csv_path.exists() {
180+
return HealthStatusType::NoData;
181+
}
182+
183+
match std::fs::read_dir(csv_path) {
184+
Ok(entries) => {
185+
let csv_files: Vec<_> = entries
186+
.filter_map(|entry| entry.ok())
187+
.filter(|entry| entry.path().extension().is_some_and(|ext| ext == "csv"))
188+
.collect();
189+
190+
if csv_files.is_empty() {
191+
HealthStatusType::NoData
192+
} else {
193+
HealthStatusType::Healthy
194+
}
195+
}
196+
Err(_) => HealthStatusType::Unhealthy,
197+
}
198+
}
199+
None => HealthStatusType::NotConfigured,
200+
}
201+
}
202+
203+
fn determine_overall_status(
204+
database: &HealthStatusType,
205+
indexing: &HealthStatusType,
206+
sync: &HealthStatusType,
207+
) -> HealthStatusType {
208+
if matches!(database, HealthStatusType::Unhealthy | HealthStatusType::NotConfigured)
209+
|| matches!(indexing, HealthStatusType::Stopped)
210+
|| matches!(sync, HealthStatusType::Unhealthy | HealthStatusType::NotConfigured)
211+
{
212+
HealthStatusType::Unhealthy
213+
} else if matches!(sync, HealthStatusType::NoData) {
214+
// Sync NoData is acceptable when no event tables exist yet
215+
HealthStatusType::Healthy
216+
} else {
217+
HealthStatusType::Healthy
218+
}
219+
}
220+
221+
pub async fn start_health_server(
222+
port: u16,
223+
manifest: Arc<Manifest>,
224+
postgres_client: Option<Arc<PostgresClient>>,
225+
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
226+
let health_server = HealthServer::new(port, manifest, postgres_client);
227+
health_server.start().await
228+
}

core/src/indexer/no_code.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ pub async fn setup_no_code(
100100
manifest_path: details.manifest_path,
101101
indexing_details: None,
102102
graphql_details: details.graphql_details,
103+
health_details: details.health_details,
103104
});
104105
}
105106

@@ -151,6 +152,7 @@ pub async fn setup_no_code(
151152
manifest_path: details.manifest_path,
152153
indexing_details: Some(IndexingDetails { registry, trace_registry }),
153154
graphql_details: details.graphql_details,
155+
health_details: details.health_details,
154156
})
155157
}
156158
None => Err(SetupNoCodeError::NoProjectPathFoundUsingParentOfManifestPath),

0 commit comments

Comments
 (0)