Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 97 additions & 10 deletions crates/api/src/state_controller/machine/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,20 @@ impl MachineStateHandler {
Ok(())
}

async fn clear_scout_timeout_alert(
txn: &mut PgConnection,
host_machine_id: &MachineId,
) -> Result<(), StateHandlerError> {
db::machine::remove_health_report_override(
txn,
host_machine_id,
health_report::OverrideMode::Merge,
"scout",
)
.await?;
Ok(())
}

async fn clear_host_reprovision(
mh_snaphost: &ManagedHostStateSnapshot,
txn: &mut PgConnection,
Expand Down Expand Up @@ -952,16 +966,11 @@ impl MachineStateHandler {
}

ManagedHostState::Ready => {
// Check if scout is running. If not, emit metric.
if let Some(last_scout_contact) = mh_snapshot.host_snapshot.last_scout_contact_time
if let Some(outcome) = self
.handle_scout_heartbeat_timeout(host_machine_id, mh_snapshot, ctx)
.await?
{
let since_last_contact = Utc::now().signed_duration_since(last_scout_contact);
let timeout_threshold = self.reachability_params.scout_reporting_timeout;

if since_last_contact > timeout_threshold {
ctx.metrics.host_with_scout_heartbeat_timeout =
Some(host_machine_id.to_string());
}
return Ok(outcome);
}

// Check if instance to be created.
Expand Down Expand Up @@ -1712,6 +1721,65 @@ impl MachineStateHandler {
}
}

async fn handle_scout_heartbeat_timeout(
&self,
host_machine_id: &MachineId,
mh_snapshot: &ManagedHostStateSnapshot,
ctx: &mut StateHandlerContext<'_, MachineStateHandlerContextObjects>,
) -> Result<Option<StateHandlerOutcome<ManagedHostState>>, StateHandlerError> {
let Some(last_scout_contact) = mh_snapshot.host_snapshot.last_scout_contact_time else {
return Ok(None);
};

let since_last_contact = Utc::now().signed_duration_since(last_scout_contact);
let timeout_threshold = self.reachability_params.scout_reporting_timeout;
let scout_timeout_alert_exists = mh_snapshot
.host_snapshot
.health_report_overrides
.merges
.contains_key("scout");

if since_last_contact >= timeout_threshold {
ctx.metrics.host_with_scout_heartbeat_timeout = Some(host_machine_id.to_string());
}

if since_last_contact >= timeout_threshold && !scout_timeout_alert_exists {
let message = format!("Last scout heartbeat over {timeout_threshold} ago");
let health_report =
HealthReport::heartbeat_timeout("scout".to_string(), "scout".to_string(), message);
let mut txn = ctx.services.db_pool.begin().await?;
db::machine::insert_health_report_override(
&mut txn,
host_machine_id,
OverrideMode::Merge,
&health_report,
false,
)
.await?;
tracing::warn!(
host_machine_id = %host_machine_id,
last_scout_contact = %last_scout_contact,
timeout_threshold = %timeout_threshold,
"Scout heartbeat timeout detected, adding health alert"
);
return Ok(Some(StateHandlerOutcome::do_nothing().with_txn(txn)));
}

if since_last_contact < timeout_threshold && scout_timeout_alert_exists {
let mut txn = ctx.services.db_pool.begin().await?;
Self::clear_scout_timeout_alert(&mut txn, host_machine_id).await?;
tracing::info!(
host_machine_id = %host_machine_id,
last_scout_contact = %last_scout_contact,
timeout_threshold = %timeout_threshold,
"Scout heartbeat recovered, removing health alert"
);
return Ok(Some(StateHandlerOutcome::do_nothing().with_txn(txn)));
}

Ok(None)
}

async fn handle_restart_dpu_reprovision_assigned_state(
&self,
state: &ManagedHostStateSnapshot,
Expand Down Expand Up @@ -2230,7 +2298,9 @@ impl StateHandler for MachineStateHandler {
// Clone the pool before we borrow ctx mutably
let power_options_pool = ctx.services.db_pool.clone();

let result = if continue_state_machine {
let was_ready = matches!(mh_snapshot.managed_state, ManagedHostState::Ready);

let mut result = if continue_state_machine {
self.attempt_state_transition(host_machine_id, mh_snapshot, ctx)
.await
} else {
Expand All @@ -2240,6 +2310,23 @@ impl StateHandler for MachineStateHandler {
)))
};

if was_ready && let Ok(outcome) = result {
if matches!(&outcome, StateHandlerOutcome::Transition { .. }) {
let host_machine_id = *host_machine_id;
result = Ok(outcome
.in_transaction(&ctx.services.db_pool, move |txn| {
async move {
Self::clear_scout_timeout_alert(txn, &host_machine_id).await?;
Ok::<(), StateHandlerError>(())
}
.boxed()
})
.await??);
} else {
result = Ok(outcome);
}
}

// Persist power options before returning
// They are persisted in an individual DB transaction in order to be unaffected
// by the main state handling outcome
Expand Down
Loading
Loading