[iris] Remove old monolithic heartbeat path (Phase 4)#4843
[iris] Remove old monolithic heartbeat path (Phase 4)#4843rjpower wants to merge 5 commits intorjpower/heartbeat-refactorfrom
Conversation
Add Ping, StartTasks, StopTasks, PollTasks to WorkerService and UpdateTaskStatus to ControllerService. Worker handlers extracted from handle_heartbeat(). Existing heartbeat unchanged — zero behavior change.
When use_split_heartbeat=True, run additional Ping and Poll loops alongside the existing provider loop. Ping loop detects worker failures faster (10s interval, 10 failure threshold). Poll loop reconciles task state every 30s as a safety net. Existing provider loop unchanged.
…ase 2) When use_split_heartbeat=True, scheduling sends StartTasks RPCs directly after committing assignments (no dispatch_queue intermediary). Kills send StopTasks directly. Task updater thread drains a queue every 1s for batched state transitions. Provider loop replaced by ping + poll + updater.
Workers now push task state transitions to the controller via UpdateTaskStatus RPC in batches. TaskStateReporter thread on the worker watches TaskAttempt state changes via on_state_change callback and enqueues updates with bounded queue + exponential backoff retry. Controller handler applies updates through existing transitions path.
Split heartbeat mode is now the only path for WorkerProvider. Removed: - _run_provider_loop, _sync_all_execution_units, _buffer_assignments - drain_dispatch_all, buffer_kill, fail_heartbeats_batch - WorkerProvider.sync, _heartbeat_one - Worker.handle_heartbeat (old monolithic handler) - Heartbeat RPC from WorkerService - 19 old heartbeat-specific tests Kept dispatch_queue table (still used by K8s direct provider). -1790 lines of old heartbeat machinery.
|
Claude finished @rjpower's task in 11m 35s —— View job Code reviewReviewed Phase 4 of the heartbeat refactor. Left 2 inline comments on changed lines and flagged 3 additional findings below that couldn't be anchored inline (lines weren't touched by the diff). Findings1. Behavioral regression: K8s direct kills no longer wake 2. 3. Broken call to deleted method in dev benchmark script. 4. Dead code surviving from the removed monolithic path.
Per AGENTS.md: "Delete dead code: unused parameters, stale options, old experiments." Fix this → 5. Stale comments referencing removed symbols.
Per AGENTS.md: "Delete stale comments immediately on discovery." Also confirmed (not issues): the new |
There was a problem hiding this comment.
💡 Codex Review
marin/lib/iris/src/iris/cluster/controller/controller.py
Lines 1267 to 1269 in 71efe6f
In the K8s direct-provider path, _run_direct_provider_loop() waits on _heartbeat_event with a timeout of heartbeat_interval, but stop() now joins _heartbeat_thread without waking that event first. If heartbeat_interval is configured above the 5s join timeout, shutdown can continue while the provider loop is still sleeping, and it may later wake and call _sync_direct_provider() after DB/provider teardown. Please wake _heartbeat_event (or make the loop wait on stop_event) before joining.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| for task_id in task_ids: | ||
| if task_id not in mapping: | ||
| self._transitions.buffer_direct_kill(task_id.to_wire()) |
There was a problem hiding this comment.
Wake direct-provider loop after buffering kill requests
The K8s branch in kill_tasks_on_workers() now only calls buffer_direct_kill() and never signals _heartbeat_event. Because this commit also removed all other _heartbeat_event.set() calls, buffered direct kills are no longer dispatched immediately and will wait until the next periodic provider sync tick, which can noticeably delay cancellation/preemption when heartbeat_interval is large.
Useful? React with 👍 / 👎.
| elif isinstance(self._provider, K8sTaskProvider): | ||
| mapping = dict(task_kill_workers or {}) | ||
| unresolved = task_ids - set(mapping.keys()) | ||
| if unresolved: | ||
| mapping.update(_task_worker_mapping(self._db, unresolved)) | ||
| for task_id in task_ids: | ||
| if task_id not in mapping: | ||
| self._transitions.buffer_direct_kill(task_id.to_wire()) |
There was a problem hiding this comment.
Behavioral regression: K8s direct kills no longer wake _run_direct_provider_loop promptly.
Before this PR, _stop_tasks called self._heartbeat_event.set() after buffer_direct_kill to flush buffered K8s kills immediately. The PR removed every _heartbeat_event.set() call, but _run_direct_provider_loop still blocks on self._heartbeat_event.wait(timeout=limiter.time_until_next()) at L1420. With heartbeat_interval defaulting to 5 s, buffered kills now sit for up to ~5 s before the next scheduled tick flushes them, where previously they fired immediately.
Two side-effects to note:
- Shutdown of the direct-provider loop is also delayed, since the
wait(...)no longer listens tostop_eventand nothing else sets_heartbeat_event. - The construction comment is now stale —
# Used by K8s direct provider loop to wake on buffered kills.at L1106 — nothing ever sets the event.
Either restore the wake after buffering direct kills:
elif isinstance(self._provider, K8sTaskProvider):
mapping = dict(task_kill_workers or {})
unresolved = task_ids - set(mapping.keys())
if unresolved:
mapping.update(_task_worker_mapping(self._db, unresolved))
for task_id in task_ids:
if task_id not in mapping:
self._transitions.buffer_direct_kill(task_id.to_wire())
self._heartbeat_event.set()…or drop _heartbeat_event entirely, switch the loop to stop_event.wait(timeout=...) (which at least fixes shutdown latency), and accept up to heartbeat_interval flush latency as intentional.
| if not isinstance(self._provider, WorkerProvider): | ||
| return |
There was a problem hiding this comment.
The pre-PR version asserted isinstance(self._provider, WorkerProvider); this downgrades to a silent early return. Per AGENTS.md: "Assert liberally; prefer raise ValueError over silent fallbacks." If a non-WorkerProvider ever reaches this method, an affected batch of task kills now vanishes with no signal instead of crashing loudly.
The same pattern was added in _dispatch_assignments_direct at L2079-2080 — and is more dangerous there because queue_assignments has already committed task-attempt rows before the silent return, potentially leaving orphaned attempts.
| if not isinstance(self._provider, WorkerProvider): | |
| return | |
| assert isinstance(self._provider, WorkerProvider) |
cf2be86 to
6267bdd
Compare
🤖 Phase 4 of the heartbeat refactor, split into its own PR for ease of review.
Stacked on top of #4638 (Phases 0–3). Merge that first.
Summary
Split heartbeat mode is now the only path for `WorkerProvider`. Removed:
Kept the `dispatch_queue` table — still used by the K8s direct provider.
Net: -1697 lines of old heartbeat machinery.
Test plan