Skip to content

[iris] Replace monolithic heartbeat with focused Ping/StartTasks/StopTasks/PollTasks RPCs#4638

Open
rjpower wants to merge 5 commits intomainfrom
rjpower/heartbeat-refactor
Open

[iris] Replace monolithic heartbeat with focused Ping/StartTasks/StopTasks/PollTasks RPCs#4638
rjpower wants to merge 5 commits intomainfrom
rjpower/heartbeat-refactor

Conversation

@rjpower
Copy link
Copy Markdown
Collaborator

@rjpower rjpower commented Apr 10, 2026

Split the single HeartbeatRequest/HeartbeatResponse RPC into five focused
RPCs: Worker.Ping (liveness), Worker.StartTasks (immediate dispatch),
Worker.StopTasks (direct kill), Worker.PollTasks (reconciliation), and
Controller.UpdateTaskStatus (worker push).

Scheduling sends StartTasks
immediately after committing assignments instead of buffering in
dispatch_queue.

rjpower added 5 commits April 10, 2026 12:06
Add Ping, StartTasks, StopTasks, PollTasks to WorkerService and
UpdateTaskStatus to ControllerService. Worker handlers extracted from
handle_heartbeat(). Existing heartbeat unchanged — zero behavior change.
When use_split_heartbeat=True, run additional Ping and Poll loops
alongside the existing provider loop. Ping loop detects worker failures
faster (10s interval, 10 failure threshold). Poll loop reconciles
task state every 30s as a safety net. Existing provider loop unchanged.
…ase 2)

When use_split_heartbeat=True, scheduling sends StartTasks RPCs directly
after committing assignments (no dispatch_queue intermediary). Kills send
StopTasks directly. Task updater thread drains a queue every 1s for
batched state transitions. Provider loop replaced by ping + poll + updater.
Workers now push task state transitions to the controller via
UpdateTaskStatus RPC in batches. TaskStateReporter thread on the
worker watches TaskAttempt state changes via on_state_change callback
and enqueues updates with bounded queue + exponential backoff retry.
Controller handler applies updates through existing transitions path.
Split heartbeat mode is now the only path for WorkerProvider. Removed:
- _run_provider_loop, _sync_all_execution_units, _buffer_assignments
- drain_dispatch_all, buffer_kill, fail_heartbeats_batch
- WorkerProvider.sync, _heartbeat_one
- Worker.handle_heartbeat (old monolithic handler)
- Heartbeat RPC from WorkerService
- 19 old heartbeat-specific tests

Kept dispatch_queue table (still used by K8s direct provider).
-1790 lines of old heartbeat machinery.
@rjpower rjpower added the agent-generated Created by automation/agent label Apr 10, 2026
@rjpower
Copy link
Copy Markdown
Collaborator Author

rjpower commented Apr 10, 2026

Specification for this >500 LOC change:

PROBLEM

The controller multiplexed worker liveness, task dispatch, task state collection,
and reconciliation through a single HeartbeatRequest/HeartbeatResponse RPC. This
made the provider loop (_sync_all_execution_units) very complex and gated task
dispatch latency on the heartbeat interval (2-5s). Key files:
controller/controller.py:2080 (_sync_all_execution_units), controller/transitions.py:1690
(apply_heartbeats_batch), controller/worker_provider.py:132 (_heartbeat_one).

APPROACH

Five new RPCs replace the monolithic heartbeat:

  1. Worker.Ping - liveness only, 10s cadence, updates workers table
  2. Worker.StartTasks - immediate dispatch from scheduling loop
  3. Worker.StopTasks - direct kill RPCs
  4. Worker.PollTasks - 30s reconciliation safety net
  5. Controller.UpdateTaskStatus - worker pushes state transitions

Four controller threads replace the single provider loop:

  • scheduling-loop: matches tasks to workers, sends StartTasks immediately
  • ping-loop: pings workers, in-memory failure tracking, worker death cascade
  • task-updater: 1s batch drain of task state transitions from all sources
  • poll-loop: PollTasks every 30s as safety net

The dispatch_queue table is no longer written to (kept for K8s provider).
Worker adds TaskStateReporter thread that pushes state changes via
UpdateTaskStatus with bounded queue and exponential backoff retry.

KEY CODE

Assignment dispatch (controller.py _dispatch_assignments_direct):

  • Calls queue_assignments(direct_dispatch=True) which returns RunTaskRequest
    protos instead of writing dispatch_queue
  • Groups by worker, sends StartTasks RPCs
  • Failed RPCs feed WORKER_FAILED into task-updater queue

Worker push (worker.py TaskStateReporter):

  • TaskAttempt.on_state_change callback fires on every transition
  • Bounded queue (1000 items), drops oldest on overflow
  • Batches at 0.5s interval, retries with exponential backoff

Ping loop (controller.py _run_ping_loop):

  • In-memory ping_failures dict (separate from DB consecutive_failures)
  • On threshold (10 failures): fail_workers_batch -> sibling cascade
  • update_worker_ping_success updates last_heartbeat_ms + resource snapshot

TESTS

1861 unit tests pass. 19 old heartbeat-specific tests removed (tested removed
code paths like drain_dispatch_all, _sync_all_execution_units, heartbeat
reconciliation). Remaining tests cover: scheduling, task transitions, worker
registration, task lifecycle, reservation handling, autoscaler, K8s provider.
The split heartbeat path is exercised by existing controller integration tests
since it is now the default and only path.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7d9b010b5c

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +597 to +599
def _stop_state_reporter(self) -> None:
"""Stop the TaskStateReporter thread."""
self._state_reporter = None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Stop reporter thread between worker lifecycle iterations

_start_state_reporter() creates a new managed thread on every register/serve cycle, but _stop_state_reporter() only nulls the reference and never stops that thread. When _serve() exits on heartbeat timeout and the worker re-registers, the previous reporter keeps running, so reconnect flaps accumulate background threads and retry loops indefinitely. This can leak resources and eventually destabilize long-lived workers.

Useful? React with 👍 / 👎.

Comment on lines +2186 to +2188
running, addresses = self._transitions.get_running_tasks_for_poll()
if not running:
return
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Poll every active worker to reconcile unexpected tasks

This polling path only runs for workers that already have controller-known active tasks, so workers with zero expected tasks are skipped entirely. In that case, if a worker is running orphan/unexpected tasks (for example after a failed stop or controller state divergence), it never receives PollTasks(expected=[]), so its handle_poll_tasks cleanup path is never triggered and stray tasks can continue running indefinitely.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

agent-generated Created by automation/agent

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant