Skip to content

Commit aa78201

Browse files
authored
Improve Heartbeat Reliability (#33)
* Improve heartbeat log observability - everything info: operation of the heartbeat is basically the main job of the daemon, let's not silence those away as debug logs - clarify log sites with name tagging - add an additional "actually doing the thing" log to help puzzled operators left wondering * Use interval instead of sleep for heartbeat * Cleanup and observe heartbeat event timing * Save last heartbeat event to state_dir * Make heartbeat timing persistently consistent * Explicate heartbeat overdue delay * Quicken transiently skipped heartbeats
1 parent be7e928 commit aa78201

5 files changed

Lines changed: 135 additions & 37 deletions

File tree

crates/core/src/config/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,9 @@ pub struct HeartbeatConfig {
358358
#[serde(default = "default_interval")]
359359
pub interval: String,
360360

361+
#[serde(default = "default_overdue_delay")]
362+
pub overdue_delay: String,
363+
361364
#[serde(default)]
362365
pub active_hours: Option<ActiveHours>,
363366

@@ -504,6 +507,10 @@ fn default_true() -> bool {
504507
fn default_interval() -> String {
505508
"30m".to_string()
506509
}
510+
511+
fn default_overdue_delay() -> String {
512+
"1m".to_string()
513+
}
507514
fn default_workspace() -> String {
508515
"~/.local/share/localgpt/workspace".to_string()
509516
}
@@ -607,6 +614,7 @@ impl Default for HeartbeatConfig {
607614
Self {
608615
enabled: default_true(),
609616
interval: default_interval(),
617+
overdue_delay: default_overdue_delay(),
610618
active_hours: None,
611619
timezone: None,
612620
}

crates/core/src/heartbeat/events.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
//! Heartbeat event tracking for UI status display
22
3-
use serde::Serialize;
3+
use serde::{Deserialize, Serialize};
44
use std::sync::RwLock;
55

66
/// Heartbeat event status
7-
#[derive(Debug, Clone, Serialize, PartialEq)]
7+
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
88
#[serde(rename_all = "snake_case")]
99
pub enum HeartbeatStatus {
1010
/// Heartbeat ran and sent a response
@@ -13,12 +13,14 @@ pub enum HeartbeatStatus {
1313
Ok,
1414
/// Heartbeat was skipped (outside active hours, empty file, etc.)
1515
Skipped,
16+
/// Heartbeat was skipped transiently (a soon retry may be useful)
17+
SkippedMayTry,
1618
/// Heartbeat failed with an error
1719
Failed,
1820
}
1921

2022
/// A heartbeat event for tracking/display
21-
#[derive(Debug, Clone, Serialize)]
23+
#[derive(Debug, Clone, Deserialize, Serialize)]
2224
pub struct HeartbeatEvent {
2325
/// Timestamp in milliseconds
2426
pub ts: u64,

crates/core/src/heartbeat/runner.rs

Lines changed: 117 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use anyhow::Result;
44
use chrono::{Local, NaiveTime};
55
use std::fs;
66
use std::path::PathBuf;
7-
use std::time::{Duration, Instant};
8-
use tokio::time::sleep;
7+
use std::time::{Duration, Instant, SystemTime};
8+
use tokio::time::interval_at;
99
use tracing::{debug, info, warn};
1010

1111
use super::events::{HeartbeatEvent, HeartbeatStatus, emit_heartbeat_event, now_ms};
@@ -85,20 +85,64 @@ impl HeartbeatRunner {
8585
})
8686
}
8787

88+
async fn first_delay(&self) -> Duration {
89+
// Read last heartbeat event to calibrate first tick time
90+
if let Ok(json) = fs::read_to_string(self.config.paths.last_heartbeat()) {
91+
if let Ok(event) = serde_json::from_str::<HeartbeatEvent>(&json) {
92+
let last_tick_end = std::time::UNIX_EPOCH + Duration::from_millis(event.ts as u64);
93+
let last_tick_elapsed = Duration::from_millis(event.duration_ms as u64);
94+
let last_tick = last_tick_end - last_tick_elapsed;
95+
debug!(
96+
name: "Heartbeat",
97+
"loaded last_tick: {:?} (ts: {}, duration_ms: {})",
98+
last_tick, event.ts, event.duration_ms
99+
);
100+
101+
let next_tick = last_tick + self.interval;
102+
let now = SystemTime::now();
103+
if now < next_tick {
104+
return next_tick.duration_since(now).unwrap_or(Duration::ZERO);
105+
}
106+
}
107+
}
108+
109+
// heartbeat is overdue
110+
return parse_duration(&self.config.heartbeat.overdue_delay).map_or_else(
111+
|e| {
112+
warn!(name: "Heartbeat", "invalid overdue_delay: {}, falling back to zero", e);
113+
Duration::ZERO
114+
},
115+
|d| d,
116+
);
117+
}
118+
88119
/// Run the heartbeat loop continuously
89120
pub async fn run(&self) -> Result<()> {
121+
info!(name: "Heartbeat", "starting runner with interval: {:?}", self.interval);
122+
123+
// Schedule first tick at next interval from last tick
124+
let first_after = self.first_delay().await;
125+
let first_at = tokio::time::Instant::now() + first_after;
126+
let mut interval = interval_at(first_at, self.interval);
127+
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
90128
info!(
91-
"Starting heartbeat runner with interval: {:?}",
92-
self.interval
129+
name: "Heartbeat",
130+
"first tick scheduled after: {:?} at: {:?}",
131+
first_after,
132+
first_at,
93133
);
94134

135+
// Exponential backoff for SkippedMayTry retries
136+
let mut skips_since_last = 0;
137+
let skip_retry_base = Duration::from_millis(1000);
138+
let skip_retry_max = self.interval / 2;
139+
95140
loop {
96-
// Sleep until next interval
97-
sleep(self.interval).await;
141+
interval.tick().await; // Sleep until next interval
98142

99143
// Check active hours
100144
if !self.in_active_hours() {
101-
debug!("Outside active hours, skipping heartbeat");
145+
info!(name: "Heartbeat", "skipping: outside active hours");
102146
emit_heartbeat_event(HeartbeatEvent {
103147
ts: now_ms(),
104148
status: HeartbeatStatus::Skipped,
@@ -111,41 +155,69 @@ impl HeartbeatRunner {
111155

112156
// Run heartbeat with timing
113157
let start = Instant::now();
114-
match self.run_once_internal().await {
158+
info!(name: "Heartbeat", "tick starting at: {:?}", start);
159+
160+
let res = self.run_once_internal().await;
161+
let elapsed = start.elapsed();
162+
info!(name: "Heartbeat", "tick done elapsed: {:?}", elapsed);
163+
164+
let event = match res {
115165
Ok((response, status)) => {
116-
let duration_ms = start.elapsed().as_millis() as u64;
117166
let preview = if response.len() > 200 {
118167
Some(format!("{}...", &response[..200]))
119168
} else {
120169
Some(response.clone())
121170
};
122171

123-
emit_heartbeat_event(HeartbeatEvent {
172+
if is_heartbeat_ok(&response) {
173+
debug!(name: "Heartbeat", "OK");
174+
} else {
175+
warn!(name: "Heartbeat", "response not OK: {}", response);
176+
}
177+
178+
if status == HeartbeatStatus::SkippedMayTry {
179+
skips_since_last += 1;
180+
let retry_after =
181+
(skip_retry_base * 2_u32.pow(skips_since_last)).min(skip_retry_max);
182+
interval.reset_after(retry_after);
183+
info!(name: "Heartbeat", "transient skip, retry quickly after: {:?}", retry_after);
184+
} else {
185+
skips_since_last = 0;
186+
}
187+
188+
HeartbeatEvent {
124189
ts: now_ms(),
125-
status,
126-
duration_ms,
190+
status: status.clone(),
191+
duration_ms: elapsed.as_millis() as u64,
127192
preview,
128193
reason: None,
129-
});
130-
131-
if is_heartbeat_ok(&response) {
132-
debug!("Heartbeat: OK");
133-
} else {
134-
info!("Heartbeat response: {}", response);
135194
}
136195
}
137196
Err(e) => {
138-
let duration_ms = start.elapsed().as_millis() as u64;
139-
emit_heartbeat_event(HeartbeatEvent {
197+
warn!(name: "Heartbeat", "error: {}", e);
198+
HeartbeatEvent {
140199
ts: now_ms(),
141200
status: HeartbeatStatus::Failed,
142-
duration_ms,
201+
duration_ms: elapsed.as_millis() as u64,
143202
preview: None,
144203
reason: Some(e.to_string()),
145-
});
146-
warn!("Heartbeat error: {}", e);
204+
}
205+
}
206+
};
207+
208+
// Persist any non-transient heartbeat event to disk
209+
if event.status != HeartbeatStatus::SkippedMayTry {
210+
if let Err(e) = serde_json::to_writer_pretty(
211+
fs::File::create(self.config.paths.last_heartbeat())?,
212+
&event,
213+
) {
214+
warn!(name: "Heartbeat", "failed to write event: {}", e);
147215
}
148216
}
217+
218+
emit_heartbeat_event(event);
219+
220+
info!(name: "Heartbeat", "waiting for next tick");
149221
}
150222
}
151223

@@ -192,16 +264,22 @@ impl HeartbeatRunner {
192264
if let Some(ref gate) = self.turn_gate
193265
&& gate.is_busy()
194266
{
195-
debug!("Skipping heartbeat: agent turn in flight (TurnGate busy)");
196-
return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
267+
info!(name: "Heartbeat", "skipping: agent turn in flight (TurnGate busy)");
268+
return Ok((
269+
HEARTBEAT_OK_TOKEN.to_string(),
270+
HeartbeatStatus::SkippedMayTry,
271+
));
197272
}
198273

199274
// Try to acquire the cross-process workspace lock (non-blocking)
200275
let _ws_guard = match self.workspace_lock.try_acquire()? {
201276
Some(guard) => guard,
202277
None => {
203-
debug!("Skipping heartbeat: workspace locked by another process");
204-
return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
278+
info!(name: "Heartbeat", "skipping: workspace locked by another process");
279+
return Ok((
280+
HEARTBEAT_OK_TOKEN.to_string(),
281+
HeartbeatStatus::SkippedMayTry,
282+
));
205283
}
206284
};
207285

@@ -211,8 +289,11 @@ impl HeartbeatRunner {
211289
match gate.try_acquire() {
212290
Some(permit) => Some(permit),
213291
None => {
214-
debug!("Skipping heartbeat: agent turn started between check and acquire");
215-
return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
292+
info!(name: "Heartbeat", "skipping: agent turn started between check and acquire");
293+
return Ok((
294+
HEARTBEAT_OK_TOKEN.to_string(),
295+
HeartbeatStatus::SkippedMayTry,
296+
));
216297
}
217298
}
218299
} else {
@@ -223,13 +304,13 @@ impl HeartbeatRunner {
223304
let heartbeat_path = self.workspace.join("HEARTBEAT.md");
224305

225306
if !heartbeat_path.exists() {
226-
debug!("No HEARTBEAT.md found");
307+
info!(name: "Heartbeat", "skipping: no HEARTBEAT.md");
227308
return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
228309
}
229310

230311
let content = fs::read_to_string(&heartbeat_path)?;
231312
if content.trim().is_empty() {
232-
debug!("HEARTBEAT.md is empty");
313+
info!(name: "Heartbeat", "skipping: empty HEARTBEAT.md");
233314
return Ok((HEARTBEAT_OK_TOKEN.to_string(), HeartbeatStatus::Skipped));
234315
}
235316

@@ -243,6 +324,8 @@ impl HeartbeatRunner {
243324
let mut agent = Agent::new(agent_config, &self.config, self.memory.clone()).await?;
244325
agent.new_session().await?;
245326

327+
info!(name: "Heartbeat", "Running HEARTBEAT.md");
328+
246329
// Check if workspace is a git repo
247330
let workspace_is_git = self.workspace.join(".git").exists();
248331

@@ -263,8 +346,8 @@ impl HeartbeatRunner {
263346
if let Some(entry) = store.get(session_key)
264347
&& entry.is_duplicate_heartbeat(&response)
265348
{
266-
debug!(
267-
"Skipping duplicate heartbeat (same text within 24h): {}",
349+
info!(name: "Heartbeat",
350+
"skipping: duplicate (same text within 24h): {}",
268351
&response[..response.len().min(100)]
269352
);
270353
return Ok((response, HeartbeatStatus::Skipped));
@@ -275,7 +358,7 @@ impl HeartbeatRunner {
275358
if let Err(e) = store.load_and_update(session_key, &session_id, |entry| {
276359
entry.record_heartbeat(&response);
277360
}) {
278-
warn!("Failed to record heartbeat in session store: {}", e);
361+
warn!(name: "Heartbeat", "Failed to record in session store: {}", e);
279362
}
280363
}
281364

crates/core/src/paths.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ impl Paths {
107107
self.state_dir.join("localgpt.audit.jsonl")
108108
}
109109

110+
pub fn last_heartbeat(&self) -> PathBuf {
111+
self.state_dir.join("last_heartbeat")
112+
}
113+
110114
/// Search index for a specific agent: cache_dir/memory/{agent_id}.sqlite
111115
pub fn search_index(&self, agent_id: &str) -> PathBuf {
112116
self.cache_dir

crates/server/src/http.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,7 @@ async fn heartbeat_status(State(state): State<Arc<AppState>>) -> Json<HeartbeatS
10641064
HeartbeatStatus::Sent => "sent",
10651065
HeartbeatStatus::Ok => "ok",
10661066
HeartbeatStatus::Skipped => "skipped",
1067+
HeartbeatStatus::SkippedMayTry => "skipped",
10671068
HeartbeatStatus::Failed => "failed",
10681069
};
10691070

0 commit comments

Comments
 (0)