Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use rindexer::{
yaml::{read_manifest, YAML_CONFIG_NAME},
},
rindexer_error, rindexer_info, setup_info_logger, start_rindexer_no_code,
GraphqlOverrideSettings, IndexerNoCodeDetails, PostgresClient, StartNoCodeDetails,
GraphqlOverrideSettings, HealthOverrideSettings, IndexerNoCodeDetails, PostgresClient, StartNoCodeDetails,
};

use crate::{
Expand Down Expand Up @@ -182,6 +182,9 @@ pub async fn start(
enabled: false,
override_port: None,
},
health_details: HealthOverrideSettings {
override_port: None,
},
};

start_rindexer_no_code(details).await.map_err(|e| {
Expand All @@ -197,6 +200,9 @@ pub async fn start(
enabled: true,
override_port: port.as_ref().and_then(|port| port.parse().ok()),
},
health_details: HealthOverrideSettings {
override_port: None,
},
};

start_rindexer_no_code(details).await.map_err(|e| {
Expand All @@ -212,6 +218,9 @@ pub async fn start(
enabled: true,
override_port: port.as_ref().and_then(|port| port.parse().ok()),
},
health_details: HealthOverrideSettings {
override_port: None,
},
};

let _ = start_rindexer_no_code(details).await.map_err(|e| {
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ anyhow = "1.0.99"
winnow = "0.7.13"
tower = "0.5.2"
port-killer = "0.1.0"
axum = "0.7"

# build
jemallocator = { version = "0.6.0", package = "tikv-jemallocator", optional = true }
Expand Down
244 changes: 244 additions & 0 deletions core/src/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
use std::{
net::SocketAddr,
sync::Arc,
};

use axum::{
extract::State,
http::StatusCode,
response::Json,
routing::get,
Router,
};
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use tracing::{error, info};

use crate::{
database::postgres::client::PostgresClient,
indexer::task_tracker::active_indexing_count,
manifest::core::Manifest,
system_state::is_running,
};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthStatus {
pub status: HealthStatusType,
pub timestamp: String,
pub services: HealthServices,
pub indexing: IndexingStatus,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthServices {
pub database: HealthStatusType,
pub indexing: HealthStatusType,
pub sync: HealthStatusType,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexingStatus {
pub active_tasks: usize,
pub is_running: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum HealthStatusType {
Healthy,
Unhealthy,
Unknown,
NotConfigured,
Disabled,
NoData,
Stopped,
}

#[derive(Clone)]
pub struct HealthServerState {
pub manifest: Arc<Manifest>,
pub postgres_client: Option<Arc<PostgresClient>>,
}

pub struct HealthServer {
port: u16,
state: HealthServerState,
}

impl HealthServer {
pub fn new(port: u16, manifest: Arc<Manifest>, postgres_client: Option<Arc<PostgresClient>>) -> Self {
Self {
port,
state: HealthServerState {
manifest,
postgres_client,
},
}
}

pub async fn start(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let app = Router::new()
.route("/health", get(health_handler))
.with_state(self.state);

let addr = SocketAddr::from(([0, 0, 0, 0], self.port));
let listener = TcpListener::bind(addr).await?;

info!("🩺 Health server started on http://0.0.0.0:{}/health", self.port);

axum::serve(listener, app).await?;
Ok(())
}
}

async fn health_handler(State(state): State<HealthServerState>) -> Result<(StatusCode, Json<HealthStatus>), StatusCode> {
let database_health = check_database_health(&state).await;
let indexing_health = check_indexing_health();
let sync_health = check_sync_health(&state).await;

let overall_status = determine_overall_status(&database_health, &indexing_health, &sync_health);

let health_status = build_health_status(overall_status, database_health, indexing_health, sync_health);

let status_code = if matches!(health_status.status, HealthStatusType::Healthy) {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};

Ok((status_code, Json(health_status)))
}

fn build_health_status(
overall_status: HealthStatusType,
database_health: HealthStatusType,
indexing_health: HealthStatusType,
sync_health: HealthStatusType,
) -> HealthStatus {
HealthStatus {
status: overall_status,
timestamp: chrono::Utc::now().to_rfc3339(),
services: HealthServices {
database: database_health,
indexing: indexing_health,
sync: sync_health,
},
indexing: IndexingStatus {
active_tasks: active_indexing_count(),
is_running: is_running(),
},
}
}

async fn check_database_health(state: &HealthServerState) -> HealthStatusType {
if !state.manifest.storage.postgres_enabled() {
return HealthStatusType::Disabled;
}

match &state.postgres_client {
Some(client) => {
match client.query_one("SELECT 1", &[]).await {
Ok(_) => HealthStatusType::Healthy,
Err(e) => {
error!("Database health check failed: {}", e);
HealthStatusType::Unhealthy
}
}
}
None => HealthStatusType::NotConfigured,
}
}

fn check_indexing_health() -> HealthStatusType {
if is_running() {
HealthStatusType::Healthy
} else {
HealthStatusType::Stopped
}
}

async fn check_sync_health(state: &HealthServerState) -> HealthStatusType {
if state.manifest.storage.postgres_enabled() {
check_postgres_sync_health(state).await
} else if state.manifest.storage.csv_enabled() {
check_csv_sync_health(state)
} else {
HealthStatusType::Disabled
}
}

async fn check_postgres_sync_health(state: &HealthServerState) -> HealthStatusType {
match &state.postgres_client {
Some(client) => {
match client.query_one_or_none(
r#"SELECT 1 FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'rindexer_internal') AND table_name NOT LIKE 'latest_block' AND table_name NOT LIKE '%_last_known_%' AND table_name NOT LIKE '%_last_run_%' LIMIT 1"#,
&[]
).await {
Ok(Some(_)) => HealthStatusType::Healthy,
Ok(None) => HealthStatusType::NoData,
Err(e) => {
error!("Sync health check failed: {}", e);
HealthStatusType::Unhealthy
}
}
}
None => HealthStatusType::NotConfigured,
}
}

fn check_csv_sync_health(state: &HealthServerState) -> HealthStatusType {
match &state.manifest.storage.csv {
Some(csv_details) => {
let csv_path = std::path::Path::new(&csv_details.path);
if !csv_path.exists() {
return HealthStatusType::NoData;
}

match std::fs::read_dir(csv_path) {
Ok(entries) => {
let csv_files: Vec<_> = entries
.filter_map(|entry| entry.ok())
.filter(|entry| {
entry.path().extension()
.map_or(false, |ext| ext == "csv")
})
.collect();

if csv_files.is_empty() {
HealthStatusType::NoData
} else {
HealthStatusType::Healthy
}
}
Err(_) => HealthStatusType::Unhealthy,
}
}
None => HealthStatusType::NotConfigured,
}
}

fn determine_overall_status(
database: &HealthStatusType,
indexing: &HealthStatusType,
sync: &HealthStatusType,
) -> HealthStatusType {
if matches!(database, HealthStatusType::Unhealthy | HealthStatusType::NotConfigured) ||
matches!(indexing, HealthStatusType::Stopped) ||
matches!(sync, HealthStatusType::Unhealthy | HealthStatusType::NotConfigured) {
HealthStatusType::Unhealthy
} else if matches!(sync, HealthStatusType::NoData) {
// Sync NoData is acceptable when no event tables exist yet
HealthStatusType::Healthy
} else {
HealthStatusType::Healthy
}
}

pub async fn start_health_server(
port: u16,
manifest: Arc<Manifest>,
postgres_client: Option<Arc<PostgresClient>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let health_server = HealthServer::new(port, manifest, postgres_client);
health_server.start().await
}
Loading
Loading