Skip to content

Commit 9b09517

Browse files
authored
feat(coprocessor): implement health-checking logic for zkproof-worker (#318)
* feat(coprocessor): add healthz server as a common util * chore(coprocessor): implement HealthCheckService for zkproof worker * chore(coprocessor): refactor the health-checking logic to be aligned with the spec - Add dependencies in Json response - Use ok/fail - Add is_alive func * feat(coprocessor): implement HealthCheckService::is_alive for zkproof worker * chore(coprocessor): add version endpoint support in healthz * chore(coprocessor): add simple impl of HealthCheckService::get_version
1 parent dac59db commit 9b09517

8 files changed

Lines changed: 321 additions & 27 deletions

File tree

coprocessor/fhevm-engine/Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coprocessor/fhevm-engine/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ tracing = "0.1.41"
5252
tracing-subscriber = { version = "0.3.19", features = ["fmt", "json"] }
5353
humantime = "2.2.0"
5454
bytesize = "2.0.1"
55+
http = "1.3.1"
5556

5657
[profile.dev.package.tfhe]
5758
overflow-checks = false

coprocessor/fhevm-engine/fhevm-engine-common/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ tonic = { workspace = true }
2323
tokio = { workspace = true }
2424
tracing = { workspace = true }
2525
bytesize = { workspace = true}
26+
tokio-util = { workspace = true}
27+
axum = { workspace = true}
28+
serde_json = { workspace = true}
29+
http = {workspace = true}
2630

2731
# crates.io dependencies
2832
paste = "1.0.15"
@@ -34,6 +38,8 @@ opentelemetry-otlp = { workspace = true }
3438
opentelemetry_sdk = { workspace = true }
3539
opentelemetry-semantic-conventions = { workspace = true }
3640

41+
42+
3743
[features]
3844
nightly-avx512 = ["tfhe/nightly-avx512"]
3945
gpu = ["tfhe/gpu"]
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
use axum::{
2+
extract::State,
3+
http::StatusCode,
4+
response::{IntoResponse, Json},
5+
routing::{get, Router},
6+
};
7+
use serde::Serialize;
8+
use sqlx::PgPool;
9+
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
10+
use tokio::net::TcpListener;
11+
use tokio_util::sync::CancellationToken;
12+
use tracing::{error, info};
13+
14+
#[derive(Serialize)]
15+
struct HealthResponse {
16+
status_code: String,
17+
status: String,
18+
dependencies: HashMap<&'static str, &'static str>,
19+
details: String,
20+
}
21+
22+
impl From<HealthStatus> for HealthResponse {
23+
fn from(status: HealthStatus) -> Self {
24+
let details = status.error_details();
25+
let dependencies: HashMap<&'static str, &'static str> = status
26+
.checks
27+
.iter()
28+
.map(|(&key, &value)| (key, if value { "ok" } else { "fail" }))
29+
.collect();
30+
31+
Self {
32+
status_code: if status.is_healthy() { "200" } else { "503" }.to_string(),
33+
status: if status.is_healthy() {
34+
"healthy".to_string()
35+
} else {
36+
"unhealthy".to_string()
37+
},
38+
dependencies,
39+
details,
40+
}
41+
}
42+
}
43+
44+
#[derive(Serialize)]
45+
pub struct Version {
46+
pub name: &'static str,
47+
pub version: &'static str,
48+
pub build: &'static str,
49+
}
50+
51+
pub trait HealthCheckService: Send + Sync {
52+
fn health_check(&self) -> impl std::future::Future<Output = HealthStatus> + Send;
53+
fn is_alive(&self) -> impl std::future::Future<Output = bool> + Send;
54+
fn get_version(&self) -> Version;
55+
}
56+
57+
pub struct HttpServer<S: HealthCheckService + Send + Sync + 'static> {
58+
service: Arc<S>,
59+
port: u16,
60+
cancel_token: CancellationToken,
61+
}
62+
63+
impl<S: HealthCheckService + Send + Sync + 'static> HttpServer<S> {
64+
pub fn new(service: Arc<S>, port: u16, cancel_token: CancellationToken) -> Self {
65+
Self {
66+
service,
67+
port,
68+
cancel_token,
69+
}
70+
}
71+
pub async fn start(&self) -> anyhow::Result<()> {
72+
let app = Router::new()
73+
.route("/healthz", get(Self::health_handler))
74+
.route("/liveness", get(Self::liveness_handler))
75+
.route("/version", get(Self::version_handler))
76+
.with_state(self.service.clone());
77+
78+
let addr = SocketAddr::from(([0, 0, 0, 0], self.port));
79+
info!("Starting HTTP server on {}", addr);
80+
81+
let shutdown = {
82+
let cancel_token = self.cancel_token.clone();
83+
async move {
84+
cancel_token.cancelled().await;
85+
}
86+
};
87+
88+
let listener = TcpListener::bind(addr).await?;
89+
let server =
90+
axum::serve(listener, app.into_make_service()).with_graceful_shutdown(shutdown);
91+
92+
if let Err(err) = server.await {
93+
error!("HTTP server error: {}", err);
94+
return Err(anyhow::anyhow!("HTTP server error: {}", err));
95+
}
96+
97+
Ok(())
98+
}
99+
100+
async fn health_handler(State(service): State<Arc<S>>) -> impl IntoResponse {
101+
let status = service.health_check().await;
102+
let http_status = if status.is_healthy() {
103+
StatusCode::OK
104+
} else {
105+
StatusCode::SERVICE_UNAVAILABLE
106+
};
107+
108+
(http_status, Json(HealthResponse::from(status)))
109+
}
110+
111+
async fn liveness_handler(State(service): State<Arc<S>>) -> impl IntoResponse {
112+
if service.is_alive().await {
113+
(
114+
StatusCode::OK,
115+
Json(serde_json::json!({
116+
"status_code": "200",
117+
"status": "alive"
118+
})),
119+
)
120+
} else {
121+
(
122+
StatusCode::SERVICE_UNAVAILABLE,
123+
Json(serde_json::json!({
124+
"status_code": "503",
125+
"status": "not_responding"
126+
})),
127+
)
128+
}
129+
}
130+
131+
async fn version_handler(State(service): State<Arc<S>>) -> impl IntoResponse {
132+
let version = service.get_version();
133+
(
134+
StatusCode::OK,
135+
Json(serde_json::json!(version)),
136+
)
137+
}
138+
}
139+
140+
#[derive(Clone, Default)]
141+
pub struct HealthStatus {
142+
pub checks: HashMap<&'static str, bool>,
143+
pub error_details: Vec<String>,
144+
}
145+
146+
impl HealthStatus {
147+
/// Checks DB availability by reusing the service internal DB connection pool
148+
///
149+
/// query has its internal timeout
150+
pub async fn set_db_connected(&mut self, pool: &PgPool) {
151+
let mut is_connected = false;
152+
match sqlx::query("SELECT 1").execute(pool).await {
153+
Ok(_) => {
154+
is_connected = true;
155+
}
156+
Err(e) => {
157+
self.error_details
158+
.push(format!("Database query error: {}", e));
159+
}
160+
}
161+
self.checks.insert("database", is_connected);
162+
}
163+
164+
pub fn is_healthy(&self) -> bool {
165+
self.checks.iter().all(|(_, s)| *s)
166+
}
167+
168+
pub fn error_details(&self) -> String {
169+
self.error_details
170+
.iter()
171+
.filter(|s| !s.is_empty())
172+
.cloned()
173+
.collect::<Vec<_>>()
174+
.join("; ")
175+
}
176+
}

coprocessor/fhevm-engine/fhevm-engine-common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod healthz_server;
12
pub mod keys;
23
pub mod telemetry;
34
pub mod tenant_keys;

coprocessor/fhevm-engine/zkproof-worker/src/bin/zkproof_worker.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
use clap::{command, Parser};
2+
use fhevm_engine_common::healthz_server::HttpServer;
23
use fhevm_engine_common::telemetry;
4+
use std::sync::Arc;
5+
use tokio::{join, task};
6+
use tokio_util::sync::CancellationToken;
37
use tracing::{error, info, Level};
8+
use zkproof_worker::verifier::ZkProofService;
49

510
#[derive(Parser, Debug, Clone)]
611
#[command(version, about, long_about = None)]
@@ -40,6 +45,10 @@ pub struct Args {
4045
value_parser = clap::value_parser!(Level),
4146
default_value_t = Level::INFO)]
4247
pub log_level: Level,
48+
49+
/// HTTP server port for health checks
50+
#[arg(long, default_value_t = 8080)]
51+
health_check_port: u16,
4352
}
4453

4554
pub fn parse_args() -> Args {
@@ -74,8 +83,33 @@ async fn main() {
7483
std::process::exit(1);
7584
}
7685

77-
info!("Starting zkProof worker...");
78-
if let Err(err) = zkproof_worker::verifier::execute_verify_proofs_loop(&conf).await {
79-
error!("Worker failed: {:?}", err);
80-
}
86+
let cancel_token = CancellationToken::new();
87+
let service = ZkProofService::create(conf, cancel_token.child_token()).await;
88+
let service = Arc::new(service);
89+
90+
let http_server = HttpServer::new(
91+
service.clone(),
92+
args.health_check_port,
93+
cancel_token.child_token(),
94+
);
95+
96+
let http_task = task::spawn(async move {
97+
if let Err(err) = http_server.start().await {
98+
error!(
99+
task = "health_check",
100+
"Error while running server: {:?}", err
101+
);
102+
}
103+
anyhow::Ok(())
104+
});
105+
106+
let service_task = async {
107+
info!("Starting worker...");
108+
if let Err(err) = service.run().await {
109+
error!("Worker failed: {:?}", err);
110+
}
111+
Ok::<_, anyhow::Error>(())
112+
};
113+
114+
let (_http_result, _service_result) = join!(http_task, service_task);
81115
}

coprocessor/fhevm-engine/zkproof-worker/src/tests/utils.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use std::time::Duration;
2-
1+
use std::sync::Arc;
2+
use std::time::{Duration, SystemTime};
3+
use tokio::sync::RwLock;
34
use fhevm_engine_common::{tenant_keys, utils::safe_serialize};
45
use test_harness::instance::DBInstance;
56
use tokio::time::sleep;
@@ -31,8 +32,10 @@ pub async fn setup() -> anyhow::Result<(sqlx::PgPool, DBInstance)> {
3132
.await
3233
.unwrap();
3334

35+
let last_active_at = Arc::new(RwLock::new(SystemTime::now()));
36+
let db_pool = pool.clone();
3437
tokio::spawn(async move {
35-
crate::verifier::execute_verify_proofs_loop(&conf)
38+
crate::verifier::execute_verify_proofs_loop(db_pool, conf.clone(), last_active_at.clone() )
3639
.await
3740
.unwrap();
3841
});

0 commit comments

Comments
 (0)