Skip to content

Commit e873107

Browse files
shayne-fletchermeta-codesync[bot]
authored andcommitted
mesh-admin: surface in-flight handler execution (Rust + Python)
Summary: mesh-admin reports the Rust actor-loop's state, but a Python actor's endpoint work runs detached from that loop -- `PythonActor::handle` hands the message off and returns, so `cell.status()` reads `idle` while Python user code is actively running (the TUI showed `Status: idle` for a busy actor). this adds a general `execution` introspection plane that both runtimes feed, so mesh-admin reports in-flight handler work truthfully without changing dispatch semantics; lifecycle `status` stays a separate plane. the `execution` block is always present on `NodeProperties::Actor` and the HTTP DTO (count 0 when idle) and carries `active_handler_count` (the full live total across all invocations), `total_handler_names`, `oldest_active_handler`/`oldest_active_since`, and `active_handlers[]` (`{name, active_count, oldest_active_since}`, aggregated by endpoint name, sorted oldest-first, capped with an `active_handlers_truncated` flag). a core-owned `ExecutionRegistry` (a per-cell `DashMap<token, {name, started_at}>` plus an `AtomicU64`) on `InstanceCellState` owns storage and aggregation; `finished` is idempotent. composition is by kind: a cell with the registry installed self-reports (Python), otherwise the snapshot is derived from `ActorStatus::Processing` (Rust, count 0 or 1). Python actors feed the registry through new `PyInstance._execution_started`/`_execution_finished` hooks that `_Actor.handle` brackets around the user-method call in a `try`/`finally`; the registry is eager-installed at the top of `PythonActor::init` so a live Python actor never falls back to the raw `Processing` path. the TUI renders an `Execution` section when `active_handler_count > 0`. two operator-facing observability helpers ship alongside the surface (no core/DTO/TUI behavior change): `python/examples/execution_demo.py`, a run-and-watch dining-philosophers workload with real `think`/`eat` endpoints and a central `ForkManager`, so the `execution` surface can be watched live in the TUI without driving stdin -- browse a philosopher for `think`/`eat` turnover, the fork manager for `acquire xN` contention; and `logger.info` flight-recorder lines inside the handler bodies of `execution_workload` (and the demo) so the TUI flight-recorder pane shows recent activity for the observed actors instead of "No events". Differential Revision: D107192488
1 parent ebe3b98 commit e873107

25 files changed

Lines changed: 2594 additions & 16 deletions

File tree

hyperactor/src/introspect.rs

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,23 @@ declare_attrs! {
434434
desc: "Per-session reorder-buffer state from OrderedSender. Independent diagnostic from queue_depth; no arithmetic contract -- see IO-3. Absence vs Some({enabled: false}) is meaningful -- see IO-1.".into(),
435435
})
436436
pub attr INBOUND_ORDERING: crate::ordering::OrderingSnapshot;
437+
438+
/// In-flight execution state: the live invocations an actor is
439+
/// currently handling. A second plane to lifecycle `status` --
440+
/// `status` describes the Rust actor loop, `execution` answers "what
441+
/// is this actor doing right now?" even when work runs detached from
442+
/// the loop (e.g. concurrent Python endpoints).
443+
///
444+
/// Always present (count 0 when idle); composed once in
445+
/// `live_actor_payload` either from the cell's execution registry (for
446+
/// actors that self-report) or derived from `ActorStatus::Processing`.
447+
/// `active_handler_count` is the full live total; the `active_handlers`
448+
/// list is aggregated by name and capped (`active_handlers_truncated`).
449+
@meta(INTROSPECT = IntrospectAttr {
450+
name: "execution".into(),
451+
desc: "In-flight execution: live invocation count, oldest, and per-name aggregates. Second plane to lifecycle status -- answers 'what is this actor handling now?'. Always present (count 0 when idle).".into(),
452+
})
453+
pub attr EXECUTION: crate::proc::ExecutionSnapshot;
437454
}
438455

439456
// See FI-1 through FI-8 in module doc.
@@ -528,6 +545,10 @@ pub struct ActorAttrsView {
528545
/// means buffering is disabled; `Some({enabled: true, ..})` means active.
529546
/// Consumers must distinguish all three states.
530547
pub inbound_ordering: Option<crate::ordering::OrderingSnapshot>,
548+
/// In-flight execution state. Always present (count 0 when idle),
549+
/// unlike `inbound_ordering`: the `None`-vs-`Some({count:0})` ambiguity
550+
/// buys nothing here. Composed once in `live_actor_payload`.
551+
pub execution: crate::proc::ExecutionSnapshot,
531552
/// Failure details, present iff status == "failed".
532553
pub failure: Option<FailureAttrs>,
533554
}
@@ -560,6 +581,13 @@ impl ActorAttrsView {
560581
let flight_recorder = attrs.get(FLIGHT_RECORDER).cloned();
561582
let is_system = *attrs.get(IS_SYSTEM).unwrap_or(&false);
562583
let inbound_ordering = attrs.get(INBOUND_ORDERING).cloned();
584+
// `execution` is always present on the view. When the attr is
585+
// absent (old payloads / hand-built fixtures) default to the zero
586+
// shape: idle, no live invocations.
587+
let execution = attrs
588+
.get(EXECUTION)
589+
.cloned()
590+
.unwrap_or_else(crate::proc::ExecutionSnapshot::idle);
563591

564592
// IA-3 (one-sided): status_reason must not be present for
565593
// non-terminal status. The converse is not enforced —
@@ -637,6 +665,7 @@ impl ActorAttrsView {
637665
flight_recorder,
638666
is_system,
639667
inbound_ordering,
668+
execution,
640669
failure,
641670
})
642671
}
@@ -666,6 +695,9 @@ impl ActorAttrsView {
666695
if let Some(snapshot) = &self.inbound_ordering {
667696
attrs.set(INBOUND_ORDERING, snapshot.clone());
668697
}
698+
// `execution` is always present (count 0 when idle), so it is
699+
// always encoded -- never elided like the Option-shaped attrs.
700+
attrs.set(EXECUTION, self.execution.clone());
669701
if let Some(fi) = &self.failure {
670702
attrs.set(FAILURE_ERROR_MESSAGE, fi.error_message.clone());
671703
attrs.set(FAILURE_ROOT_CAUSE_ACTOR, fi.root_cause_actor.clone());
@@ -805,6 +837,10 @@ struct ActorSnapshot {
805837
is_system: bool,
806838
last_handler: Option<String>,
807839
flight_recorder: Option<String>,
840+
/// In-flight execution, composed once (compose-by-kind) in
841+
/// `live_actor_payload`: registry snapshot when the actor self-reports,
842+
/// otherwise derived from `ActorStatus::Processing`.
843+
execution: crate::proc::ExecutionSnapshot,
808844
failure: Option<FailureSnapshot>,
809845
}
810846

@@ -844,6 +880,10 @@ fn build_actor_attrs(cell: &crate::InstanceCell, snap: &ActorSnapshot) -> String
844880
attrs.set(INBOUND_ORDERING, snapshot);
845881
}
846882

883+
// `execution` is always present (count 0 when idle); composed once in
884+
// `live_actor_payload`.
885+
attrs.set(EXECUTION, snap.execution.clone());
886+
847887
if let Some(handler) = &snap.last_handler {
848888
attrs.set(LAST_HANDLER, handler.clone());
849889
}
@@ -867,6 +907,54 @@ fn build_actor_attrs(cell: &crate::InstanceCell, snap: &ActorSnapshot) -> String
867907
serde_json::to_string(&attrs).unwrap_or_else(|_| "{}".to_string())
868908
}
869909

910+
/// Compose the actor's `execution` snapshot once, compose-by-kind.
911+
///
912+
/// If the cell has an execution registry installed (the actor
913+
/// self-reports, e.g. a Python actor), use it verbatim. Otherwise derive
914+
/// from the actor loop's `ActorStatus::Processing`:
915+
///
916+
/// - `Processing(since, Some(handler))` -> one entry, count 1, name =
917+
/// the handler dot-form, oldest = `since`.
918+
/// - `Processing(since, None)` -> count 1 with a `<unknown>` placeholder
919+
/// name (handler info unavailable), oldest = `since`.
920+
/// - any other status -> idle (count 0).
921+
///
922+
/// The registry branch never surfaces a hand-off artifact: a self-reporting
923+
/// actor always takes the registry branch, not the status branch.
924+
fn compose_execution(
925+
cell: &InstanceCell,
926+
status: &crate::actor::ActorStatus,
927+
) -> crate::proc::ExecutionSnapshot {
928+
use crate::actor::ActorStatus;
929+
use crate::proc::ExecutionHandler;
930+
use crate::proc::ExecutionSnapshot;
931+
932+
if let Some(snapshot) = cell.execution_snapshot() {
933+
return snapshot;
934+
}
935+
936+
let ActorStatus::Processing(since, handler) = status else {
937+
return ExecutionSnapshot::idle();
938+
};
939+
940+
let name = handler
941+
.as_ref()
942+
.map(|h| h.to_string())
943+
.unwrap_or_else(|| "<unknown>".to_string());
944+
ExecutionSnapshot {
945+
active_handler_count: 1,
946+
total_handler_names: 1,
947+
oldest_active_handler: Some(name.clone()),
948+
oldest_active_since: Some(*since),
949+
active_handlers: vec![ExecutionHandler {
950+
name,
951+
active_count: 1,
952+
oldest_active_since: *since,
953+
}],
954+
active_handlers_truncated: false,
955+
}
956+
}
957+
870958
/// Build an [`IntrospectResult`] from live [`InstanceCell`] state.
871959
///
872960
/// Reads the current live status and last handler directly from
@@ -923,11 +1011,14 @@ pub fn live_actor_payload(cell: &InstanceCell) -> IntrospectResult {
9231011
None
9241012
};
9251013

1014+
let execution = compose_execution(cell, &status);
1015+
9261016
let snap = ActorSnapshot {
9271017
status_str: status.to_string(),
9281018
is_system: cell.is_system(),
9291019
last_handler: last_handler.map(|info| info.to_string()),
9301020
flight_recorder,
1021+
execution,
9311022
failure,
9321023
};
9331024

@@ -1102,6 +1193,7 @@ mod tests {
11021193
("instance_id", INSTANCE_ID.attrs()),
11031194
("queue_depth", ACTOR_QUEUE_DEPTH.attrs()),
11041195
("inbound_ordering", INBOUND_ORDERING.attrs()),
1196+
("execution", EXECUTION.attrs()),
11051197
];
11061198

11071199
for (expected_name, meta) in &cases {
@@ -1208,6 +1300,12 @@ mod tests {
12081300
// running_actor_attrs() fixture.
12091301
assert_eq!(view.queue_depth, 0);
12101302
assert!(view.inbound_ordering.is_none());
1303+
// `execution` defaults to the idle zero-shape when the attr is
1304+
// absent from the fixture.
1305+
assert_eq!(view.execution.active_handler_count, 0);
1306+
assert!(view.execution.oldest_active_handler.is_none());
1307+
assert!(view.execution.active_handlers.is_empty());
1308+
assert!(!view.execution.active_handlers_truncated);
12111309
assert!(view.failure.is_none());
12121310

12131311
let round_tripped = ActorAttrsView::from_attrs(&view.to_attrs()).unwrap();
@@ -1304,6 +1402,66 @@ mod tests {
13041402
assert_eq!(round_tripped, view);
13051403
}
13061404

1405+
/// AV-1: idle `execution` (count 0, empty list, null oldest) survives
1406+
/// the encode/decode boundary as the full zero-shape. The attr is
1407+
/// absent from the fixture, so this also pins the default.
1408+
#[test]
1409+
fn test_actor_view_round_trip_execution_idle() {
1410+
let view = ActorAttrsView::from_attrs(&running_actor_attrs()).unwrap();
1411+
assert_eq!(view.execution, crate::proc::ExecutionSnapshot::idle());
1412+
1413+
let attrs = view.to_attrs();
1414+
// Always encoded, never elided.
1415+
assert!(attrs.get(EXECUTION).is_some());
1416+
1417+
let round_tripped = ActorAttrsView::from_attrs(&attrs).unwrap();
1418+
assert_eq!(
1419+
round_tripped.execution,
1420+
crate::proc::ExecutionSnapshot::idle()
1421+
);
1422+
assert_eq!(round_tripped, view);
1423+
}
1424+
1425+
/// AV-1: a populated `execution` (multiple invocations across names,
1426+
/// populated oldest) survives the Attrs encode/decode boundary.
1427+
#[test]
1428+
fn test_actor_view_round_trip_execution_populated() {
1429+
use crate::proc::ExecutionHandler;
1430+
use crate::proc::ExecutionSnapshot;
1431+
1432+
let snapshot = ExecutionSnapshot {
1433+
active_handler_count: 3,
1434+
total_handler_names: 2,
1435+
oldest_active_handler: Some("hold".to_string()),
1436+
oldest_active_since: Some(SystemTime::UNIX_EPOCH),
1437+
active_handlers: vec![
1438+
ExecutionHandler {
1439+
name: "hold".to_string(),
1440+
active_count: 2,
1441+
oldest_active_since: SystemTime::UNIX_EPOCH,
1442+
},
1443+
ExecutionHandler {
1444+
name: "ping".to_string(),
1445+
active_count: 1,
1446+
oldest_active_since: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1),
1447+
},
1448+
],
1449+
active_handlers_truncated: false,
1450+
};
1451+
1452+
let mut attrs = running_actor_attrs();
1453+
attrs.set(EXECUTION, snapshot.clone());
1454+
1455+
let view = ActorAttrsView::from_attrs(&attrs).unwrap();
1456+
assert_eq!(view.execution, snapshot);
1457+
assert_eq!(view.execution.active_handler_count, 3);
1458+
assert_eq!(view.execution.total_handler_names, 2);
1459+
assert_eq!(view.execution.active_handlers.len(), 2);
1460+
1461+
let round_tripped = ActorAttrsView::from_attrs(&view.to_attrs()).unwrap();
1462+
assert_eq!(round_tripped, view);
1463+
}
1464+
13071465
#[test]
13081466
fn test_actor_view_ia3_rejects_reason_on_running() {
13091467
let mut attrs = running_actor_attrs();

0 commit comments

Comments
 (0)