Skip to content

Commit 370a301

Browse files
authored
feat: add healthz endpoint (#29)
1 parent edf017b commit 370a301

7 files changed

Lines changed: 174 additions & 37 deletions

File tree

Cargo.lock

Lines changed: 76 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/ibc-attestor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ tower-http = { workspace = true, features = ["trace"] }
6262
tonic = { workspace = true, default-features = true }
6363
tonic-reflection = { workspace = true, default-features = true }
6464
prost = { workspace = true, default-features = true }
65+
warp = { workspace = true, features = ["server"] }
6566
futures = { workspace = true, default-features = true }
6667
async-trait = { workspace = true }
6768

apps/ibc-attestor/server.dev.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[server]
22
listen_addr = "0.0.0.0:8080"
3+
health_addr = "0.0.0.0:8081"
34

45
[adapter]
56
url = "https://eth-sepolia.g.alchemy.com/v2/your-api-key-here"

apps/ibc-attestor/src/bin/ibc_attestor/main.rs

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use ibc_attestor::{
1212
},
1313
config::{AttestorConfig, TracingConfig},
1414
logging::init_logging,
15-
rpc::{RpcError, server},
15+
rpc::{RpcError, health, server},
1616
signer::{
1717
SignerBuilder,
1818
local::{DEFAULT_KEYSTORE_NAME, LocalSigner, LocalSignerConfig},
@@ -43,25 +43,37 @@ fn default_attestor_dir() -> Result<PathBuf, anyhow::Error> {
4343
Ok(PathBuf::from(home).join(".ibc-attestor"))
4444
}
4545

46-
fn run_server_with_adapter_and_signer<B: AdapterBuilder, S: SignerBuilder>(
46+
fn run_servers<B: AdapterBuilder + 'static, S: SignerBuilder + 'static>(
4747
config: AttestorConfig<B::Config, S::Config>,
48-
shutdown_rx: broadcast::Receiver<()>,
49-
) -> Result<JoinHandle<Result<(), RpcError>>, anyhow::Error> {
48+
shutdown_tx: &broadcast::Sender<()>,
49+
) -> Result<(JoinHandle<Result<(), RpcError>>, JoinHandle<()>), anyhow::Error> {
5050
let adapter = B::build(config.adapter)?;
5151
let signer = S::build(config.signer)?;
52+
let server_config = config.server;
5253

53-
Ok(tokio::spawn(async move {
54-
// Start rpc server
54+
let grpc_shutdown_rx = shutdown_tx.subscribe();
55+
let health_shutdown_rx = shutdown_tx.subscribe();
56+
57+
let grpc_addr = server_config.listen_addr;
58+
let health_addr = server_config.health_addr;
59+
60+
let grpc_handle = tokio::spawn(async move {
5561
server::start(
56-
config.server.listen_addr,
62+
grpc_addr,
5763
adapter,
5864
B::adapter_name(),
5965
signer,
6066
S::signer_name(),
61-
shutdown_rx,
67+
grpc_shutdown_rx,
6268
)
6369
.await
64-
}))
70+
});
71+
72+
let health_handle = tokio::spawn(async move {
73+
health::start(health_addr, grpc_addr, health_shutdown_rx).await;
74+
});
75+
76+
Ok((grpc_handle, health_handle))
6577
}
6678

6779
#[tokio::main]
@@ -75,73 +87,56 @@ async fn main() -> Result<(), anyhow::Error> {
7587
let _tracing_guard = init_logging(tracing_config);
7688

7789
// Create shutdown broadcast channel
78-
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
90+
let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
7991

80-
let rpc_handle = match (args.chain_type, args.signer_type) {
92+
let (grpc_handle, health_handle) = match (args.chain_type, args.signer_type) {
8193
(ChainType::Evm, SignerType::Local) => {
8294
let config = AttestorConfig::<EvmAdapterConfig, LocalSignerConfig>::from_file(
8395
&args.config,
8496
)?;
85-
run_server_with_adapter_and_signer::<EvmAdapterBuilder, LocalSigner>(
86-
config,
87-
shutdown_rx,
88-
)?
97+
run_servers::<EvmAdapterBuilder, LocalSigner>(config, &shutdown_tx)?
8998
}
9099
(ChainType::Evm, SignerType::Remote) => {
91100
let config = AttestorConfig::<EvmAdapterConfig, RemoteSignerConfig>::from_file(
92101
&args.config,
93102
)?;
94-
run_server_with_adapter_and_signer::<EvmAdapterBuilder, RemoteSigner>(
95-
config,
96-
shutdown_rx,
97-
)?
103+
run_servers::<EvmAdapterBuilder, RemoteSigner>(config, &shutdown_tx)?
98104
}
99105
(ChainType::Solana, SignerType::Local) => {
100106
let config =
101107
AttestorConfig::<SolanaAdapterConfig, LocalSignerConfig>::from_file(
102108
&args.config,
103109
)?;
104-
run_server_with_adapter_and_signer::<SolanaAdapterBuilder, LocalSigner>(
105-
config,
106-
shutdown_rx,
107-
)?
110+
run_servers::<SolanaAdapterBuilder, LocalSigner>(config, &shutdown_tx)?
108111
}
109112
(ChainType::Solana, SignerType::Remote) => {
110113
let config =
111114
AttestorConfig::<SolanaAdapterConfig, RemoteSignerConfig>::from_file(
112115
&args.config,
113116
)?;
114-
run_server_with_adapter_and_signer::<SolanaAdapterBuilder, RemoteSigner>(
115-
config,
116-
shutdown_rx,
117-
)?
117+
run_servers::<SolanaAdapterBuilder, RemoteSigner>(config, &shutdown_tx)?
118118
}
119119
(ChainType::Cosmos, SignerType::Local) => {
120120
let config =
121121
AttestorConfig::<CosmosAdapterConfig, LocalSignerConfig>::from_file(
122122
&args.config,
123123
)?;
124-
run_server_with_adapter_and_signer::<CosmosAdapterBuilder, LocalSigner>(
125-
config,
126-
shutdown_rx,
127-
)?
124+
run_servers::<CosmosAdapterBuilder, LocalSigner>(config, &shutdown_tx)?
128125
}
129126
(ChainType::Cosmos, SignerType::Remote) => {
130127
let config =
131128
AttestorConfig::<CosmosAdapterConfig, RemoteSignerConfig>::from_file(
132129
&args.config,
133130
)?;
134-
run_server_with_adapter_and_signer::<CosmosAdapterBuilder, RemoteSigner>(
135-
config,
136-
shutdown_rx,
137-
)?
131+
run_servers::<CosmosAdapterBuilder, RemoteSigner>(config, &shutdown_tx)?
138132
}
139133
};
140134

141135
_ = wait_for_shutdown_signal().await;
142136
info!("shutdown signal received, starting graceful shutdown");
143137
let _ = shutdown_tx.send(());
144-
rpc_handle.await??;
138+
grpc_handle.await??;
139+
health_handle.await?;
145140
}
146141
Commands::Key(cmd) => {
147142
match cmd {

apps/ibc-attestor/src/config.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ where
4949
/// The configuration for the relayer server.
5050
#[derive(Clone, Debug, Deserialize)]
5151
pub struct ServerConfig {
52-
/// The address that the server should listen on.
52+
/// The address that the gRPC server should listen on.
5353
pub listen_addr: SocketAddr,
54+
/// The address that the HTTP health server should listen on.
55+
pub health_addr: SocketAddr,
5456
}
5557

5658
/// Configuration for OpenTelemetry tracing export.
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use std::net::SocketAddr;
2+
3+
use tokio::{net::TcpStream, sync::broadcast};
4+
use tracing::{error, info};
5+
use warp::{Filter, http::StatusCode};
6+
7+
async fn check_grpc(grpc_addr: SocketAddr) -> StatusCode {
8+
match TcpStream::connect(grpc_addr).await {
9+
Ok(_) => {
10+
info!("health check passed: gRPC server is accepting connections");
11+
StatusCode::OK
12+
}
13+
Err(e) => {
14+
error!(error = %e, "health check failed: gRPC server not ready");
15+
StatusCode::SERVICE_UNAVAILABLE
16+
}
17+
}
18+
}
19+
20+
fn make_healthz_filter(
21+
grpc_addr: SocketAddr,
22+
) -> impl Filter<Extract = (StatusCode,), Error = warp::Rejection> + Clone + Send {
23+
warp::get()
24+
.and(warp::path("healthz"))
25+
.and(warp::path::end())
26+
.map(move || grpc_addr)
27+
.then(check_grpc)
28+
}
29+
30+
/// Start the HTTP health server.
31+
///
32+
/// Exposes a `GET /healthz` endpoint that returns 200 OK when the gRPC server
33+
/// is accepting connections, or 503 Service Unavailable when it is not ready.
34+
pub async fn start(
35+
health_addr: SocketAddr,
36+
grpc_addr: SocketAddr,
37+
mut shutdown_rx: broadcast::Receiver<()>,
38+
) {
39+
info!(
40+
health_addr = %health_addr,
41+
grpc_addr = %grpc_addr,
42+
"starting HTTP health server"
43+
);
44+
45+
let healthz = make_healthz_filter(grpc_addr);
46+
47+
let shutdown_signal = async move {
48+
let _ = shutdown_rx.recv().await;
49+
info!("health server received shutdown signal");
50+
};
51+
52+
warp::serve(healthz)
53+
.bind(health_addr)
54+
.await
55+
.graceful(shutdown_signal)
56+
.run()
57+
.await;
58+
59+
info!("health server stopped gracefully");
60+
}

apps/ibc-attestor/src/rpc/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ pub mod api {
1010
}
1111

1212
mod attestor;
13+
/// HTTP health server for readiness probes
14+
pub mod health;
1315
mod interceptor;
1416
mod middleware;
1517
/// Server implementation and assembly

0 commit comments

Comments
 (0)