Skip to content

Commit 996cc57

Browse files
authored
Merge pull request #502 from NillionNetwork/feat/robust-heartbeats
feat: add a few robustness-es features around htx submission
2 parents c261663 + 67527a3 commit 996cc57

File tree

3 files changed

+41
-11
lines changed

3 files changed

+41
-11
lines changed

cvm-agent/src/heartbeat.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::heartbeat::NilAVRouter::NilAVRouterInstance;
1+
use crate::{heartbeat::NilAVRouter::NilAVRouterInstance, monitors::caddy::CaddyStatus};
22
use alloy::{primitives::Address, providers::ProviderBuilder, signers::local::PrivateKeySigner, sol_types::sol};
33
use alloy_provider::{Provider, WsConnect};
44
use anyhow::Context;
@@ -32,6 +32,7 @@ pub(crate) struct HeartbeatEmitterArgs {
3232
pub(crate) measurement_hash_url: String,
3333
pub(crate) cpu_count: u64,
3434
pub(crate) gpu_count: u64,
35+
pub(crate) caddy_status: CaddyStatus,
3536
}
3637

3738
pub(crate) struct HeartbeatEmitter {
@@ -62,6 +63,7 @@ impl HeartbeatEmitter {
6263
measurement_hash_url,
6364
cpu_count,
6465
gpu_count,
66+
caddy_status,
6567
} = args;
6668
let contract_address: Address = contract_address.parse().context("Invalid contract address")?;
6769
let attestation_url = format!("https://{workload_domain}{ATTESTATION_PATH}");
@@ -81,11 +83,15 @@ impl HeartbeatEmitter {
8183
cpu_count,
8284
gpu_count,
8385
};
84-
tokio::spawn(async move { submitter.run().await });
86+
tokio::spawn(async move { submitter.run(caddy_status).await });
8587
Ok(())
8688
}
8789

88-
async fn run(self) {
90+
async fn run(self, caddy_status: CaddyStatus) {
91+
info!("Waiting for caddy to generate a TLS certificate before emitting heartbeats");
92+
caddy_status.wait_tls_certificate().await;
93+
info!("Starting heartbeat generation");
94+
8995
let provider = loop {
9096
match self.connect().await {
9197
Ok(out) => break out,
@@ -111,8 +117,12 @@ impl HeartbeatEmitter {
111117

112118
async fn connect(&self) -> anyhow::Result<impl Provider> {
113119
let ws = WsConnect::new(&self.rpc_endpoint).with_max_retries(u32::MAX);
114-
let provider =
115-
ProviderBuilder::new().wallet(self.wallet.clone()).with_simple_nonce_management().connect_ws(ws).await?;
120+
let provider = ProviderBuilder::new()
121+
.wallet(self.wallet.clone())
122+
.with_simple_nonce_management()
123+
.with_gas_estimation()
124+
.connect_ws(ws)
125+
.await?;
116126
Ok(provider)
117127
}
118128

@@ -133,7 +143,8 @@ impl HeartbeatEmitter {
133143
let pending_tx = call.send().await?;
134144
let receipt = pending_tx.get_receipt().await?;
135145
let tx_hash = receipt.transaction_hash;
136-
info!("HTX submitted in transaction {tx_hash}");
146+
let status = if receipt.status() { "success" } else { "failure" };
147+
info!("HTX submitted in transaction {tx_hash} with status {status}");
137148
Ok(())
138149
}
139150
}

cvm-agent/src/monitors/caddy.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use cvm_agent_models::health::EventKind;
88
use futures::{Stream, StreamExt};
99
use serde::Deserialize;
1010
use std::{borrow::Cow, mem, sync::Arc, time::Duration};
11-
use tokio::sync::Mutex;
11+
use tokio::sync::{Mutex, oneshot};
1212
use tokio::time::sleep;
1313
use tracing::{debug, error, info, warn};
1414

@@ -23,15 +23,18 @@ pub struct CaddyMonitor {
2323
}
2424

2525
impl CaddyMonitor {
26-
pub fn spawn(docker: Docker, system_state: Arc<Mutex<SystemState>>, event_holder: EventHolder) {
26+
pub fn spawn(docker: Docker, system_state: Arc<Mutex<SystemState>>, event_holder: EventHolder) -> CaddyStatus {
2727
let monitor = Self { docker, system_state, event_holder };
28+
let (sender, receiver) = oneshot::channel();
2829
info!("Spawning caddy monitor");
2930
tokio::spawn(async move {
30-
monitor.run().await;
31+
monitor.run(sender).await;
3132
});
33+
CaddyStatus(receiver)
3234
}
3335

34-
async fn run(self) {
36+
async fn run(self, sender: oneshot::Sender<()>) {
37+
let mut sender = Some(sender);
3538
let mut threshold_timestamp = 0.0;
3639
loop {
3740
let builder = LogsOptionsBuilder::new().tail("10").stderr(true);
@@ -45,6 +48,10 @@ impl CaddyMonitor {
4548
SystemState::WaitingBootstrap => error!("System is still waiting for bootstrap"),
4649
SystemState::Starting | SystemState::Ready => {
4750
info!("Caddy fetched TLS certificate and is running successfully");
51+
// Notify the listener that we're ready
52+
if let Some(sender) = sender.take() {
53+
let _ = sender.send(());
54+
}
4855
*system_state = SystemState::Ready
4956
}
5057
}
@@ -99,6 +106,16 @@ impl CaddyMonitor {
99106
}
100107
}
101108

109+
pub(crate) struct CaddyStatus(oneshot::Receiver<()>);
110+
111+
impl CaddyStatus {
112+
pub(crate) async fn wait_tls_certificate(self) {
113+
if self.0.await.is_err() {
114+
error!("Caddy status receiver dropped");
115+
}
116+
}
117+
}
118+
102119
#[derive(Deserialize)]
103120
struct LogLine<'a> {
104121
ts: f64,

cvm-agent/src/routes/system/bootstrap.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ pub(crate) async fn handler(state: SharedState, request: Json<BootstrapRequest>)
1717
let event_holder = ctx.event_holder.clone();
1818
*system_state = SystemState::Starting;
1919

20+
let caddy_status = CaddyMonitor::spawn(state.docker.clone(), state.system_state.clone(), event_holder);
21+
2022
match (request.workload_id, request.heartbeat) {
2123
(Some(workload_id), Some(heartbeat)) => {
2224
let args = HeartbeatEmitterArgs {
@@ -31,6 +33,7 @@ pub(crate) async fn handler(state: SharedState, request: Json<BootstrapRequest>)
3133
measurement_hash_url: heartbeat.measurement_hash_url,
3234
cpu_count: ctx.cpus,
3335
gpu_count: ctx.gpus,
36+
caddy_status,
3437
};
3538
if let Err(e) = HeartbeatEmitter::spawn(args).await {
3639
error!("Failed setting up heartbeat emitter: {e}");
@@ -41,6 +44,5 @@ pub(crate) async fn handler(state: SharedState, request: Json<BootstrapRequest>)
4144
};
4245

4346
ComposeMonitor::spawn(ctx, request.acme, request.docker, request.domain);
44-
CaddyMonitor::spawn(state.docker.clone(), state.system_state.clone(), event_holder);
4547
StatusCode::OK
4648
}

0 commit comments

Comments
 (0)