Skip to content

[iris] Batch heartbeat-path SQL writes#5774

Open
rjpower wants to merge 1 commit into
mainfrom
worktree-batch-heartbeats
Open

[iris] Batch heartbeat-path SQL writes#5774
rjpower wants to merge 1 commit into
mainfrom
worktree-batch-heartbeats

Conversation

@rjpower
Copy link
Copy Markdown
Collaborator

@rjpower rjpower commented May 15, 2026

The heartbeat-apply loop issued one SELECT (job_config) plus two UPDATEs (task_attempts, tasks) per task per worker, plus an unconditional endpoints DELETE per terminal task. A profile of /system/controller showed this as the dominant cost on the polling thread.

Split _apply_task_transitions into a pure _plan_task_transitions that buffers writes into a _TransitionPlan, then a dispatcher that flushes one executemany per table before running cascade and recompute work (which read post-flush state). Add bulk_get_job_configs, bulk_apply_attempt_updates, bulk_apply_task_state_updates, and EndpointsProjection.remove_by_tasks. Replace the per-row writers entirely. Refactor apply_direct_provider_updates through the same dispatcher; extract the shared state machine into _compute_transition.

The heartbeat-apply loop issued one SELECT (job_config) plus two UPDATEs
(task_attempts, tasks) per task per worker, plus an unconditional endpoints
DELETE per terminal task. A profile of /system/controller showed this as
the dominant cost on the polling thread.

Split _apply_task_transitions into a pure _plan_task_transitions that
buffers writes into a _TransitionPlan, then a dispatcher that flushes one
executemany per table before running cascade and recompute work (which
read post-flush state). Add bulk_get_job_configs, bulk_apply_attempt_updates,
bulk_apply_task_state_updates, and EndpointsProjection.remove_by_tasks.
Replace the per-row writers entirely. Refactor apply_direct_provider_updates
through the same dispatcher and extract the shared state machine into
_compute_transition.
@rjpower rjpower added the agent-generated Created by automation/agent label May 15, 2026
@claude
Copy link
Copy Markdown
Contributor

claude Bot commented May 15, 2026

Claude finished @rjpower's task in 9m 17s —— View job


Code review

  • Check PR state (open, not draft, no prior review)
  • Identify relevant CLAUDE.md / AGENTS.md files
  • Summarize the changes
  • Parallel CLAUDE.md compliance review (×2) + bug review (×2)
  • Validate flagged issues
  • Post review

1 issue posted as inline comment:

  • reads.py:512get_job_config is now dead after the conversion to bulk_get_job_configs; AGENTS.md asks for dead code to be removed.

Considered but not flagged (low signal / out of scope):

  • update: "TaskUpdate" string forward reference at transitions.py:950 — not a literal TYPE_CHECKING guard and TaskUpdate is in the same module, so this is borderline stylistic.
  • for sa in plan.stranded_attempt_finalizations loop-variable abbreviation — a tight local idiom, not a public-name violation.
  • "direct" literal repeated as worker_log_id at transitions.py:3173,3187 — self-documenting sentinel confined to one function.
  • Stranded-attempt finalizations still issuing per-row UPDATEs (transitions.py:2046) — the docstring acknowledges these are an explicit carve-out from the bulk path; this is a design choice, not a regression.
  • seen.add() order-preserving dedup at transitions.py:2044 — known Python idiom, correct.

Refactor correctness (verified, no issues found):

  • Bulk helpers (bulk_get_job_configs, bulk_apply_attempt_updates, bulk_apply_task_state_updates, EndpointsProjection.remove_by_tasks) all guard against empty inputs before issuing SQL.
  • _compute_transition mirrors the prior per-row state machine; emit_build_failed=False correctly preserves the direct-provider path's "no worker health attribution" semantic.
  • Active/non-active partition in bulk_apply_task_state_updates preserves the old behavior of clearing current_worker_id/current_worker_address for non-active transitions.
  • Cascade and recompute actions are deliberately deferred until after the bulk flush, matching the prior code's "writes-before-reads" invariant.
    · Branch

@rjpower
Copy link
Copy Markdown
Collaborator Author

rjpower commented May 15, 2026

🤖 Spec for this PR.

Problem

ControllerTransitions._apply_task_transitions (heartbeat path) and apply_direct_provider_updates (k8s path) ran a per-update loop that issued, for every task update:

  • 1 SELECT against job_config (cached only within one worker request — re-queried across workers reporting tasks in the same job).
  • 1 UPDATE against task_attempts via writes.apply_attempt_update.
  • 1 UPDATE against tasks via writes.apply_task_state_update.
  • 1 DELETE against endpoints (via EndpointsProjection.remove_by_task) per terminal task — issued unconditionally even when no endpoints existed.

A py-spy capture of /system/controller showed the polling thread spending most of its samples inside these per-row SQL paths (apply_task_state_update 13 samples, get_job_config 11, apply_attempt_update 6, all under _run_polling_loop -> _reconcile_worker_batch -> _process_heartbeat_updates -> apply_heartbeats_batch).

Approach

Split decision from emission so the steady-state writes can be flushed with executemany.

  • reads.bulk_get_job_configs(tx, job_ids) — one SELECT WHERE job_id IN (…) instead of N.
  • writes.bulk_apply_attempt_updates(tx, rows) — one executemany via bindparams. Sticky-coalesce semantics preserved (started_at_ms / finished_at_ms pin once set; exit_code / error overwrite when the new value is non-null).
  • writes.bulk_apply_task_state_updates(tx, rows) — partitions rows by active/non-active state so each partition issues a single UPDATE shape; non-active rows additionally clear current_worker_id / current_worker_address.
  • EndpointsProjection.remove_by_tasks(cur, task_ids) — one DELETE … WHERE task_id IN (…).
  • apply_attempt_update and apply_task_state_update removed; all callers migrated.

_apply_task_transitions becomes _plan_task_transitions, which mutates a _TransitionPlan instead of executing SQL. The plan buffers attempt/task writes, container_id writes, endpoint-delete task_ids, stranded-attempt finalizations, in-memory health events, and queues _CascadeAction / _RecomputeAction entries for post-flush work. _apply_transition_plan flushes the bulk writes, then runs cascades and per-job recompute (which need to see the new state).

The state machine extracted into _compute_transition(update, task, attempt, now_ms, *, emit_build_failed) is shared by the heartbeat and direct-provider planners. The direct-provider path passes emit_build_failed=False (no worker-health attribution) and adds a container_id_writes executemany when update.container_id is not None.

apply_task_updates (single-request RPC path) collapses to a one-element call into the same dispatcher.

Key code

The dispatcher: bulk writes flushed before cascade/recompute reads. lib/iris/src/iris/cluster/controller/transitions.py

def _apply_transition_plan(self, cur, plan, now_ms, results):
    writes.bulk_apply_attempt_updates(cur, plan.attempt_writes)
    writes.bulk_apply_task_state_updates(cur, plan.task_writes)
    if plan.container_id_writes:
        cur.execute(
            sa_update(tasks_table)
              .where(tasks_table.c.task_id == bindparam(\"b_task_id\"))
              .values(container_id=bindparam(\"v_container_id\")),
            plan.container_id_writes,
        )
    if plan.endpoints_to_delete:
        seen: set[JobName] = set()
        unique = [t for t in plan.endpoints_to_delete if not (t in seen or seen.add(t))]
        self._endpoints.remove_by_tasks(cur, unique)
    for sa in plan.stranded_attempt_finalizations:
        cur.execute(sa_update(task_attempts_table).where(...).values(finished_at_ms=sa[\"v_finished_at_ms\"]))
    for wid in plan.build_failed_workers:
        self._health.build_failed(wid)
    # cascades and per-job recompute observe post-flush state
    for cascade in plan.cascade_actions: ...
    for recompute in plan.recompute_actions: ...

The bulk task UPDATE partitions on active vs non-active so each batch has one SQL shape. lib/iris/src/iris/cluster/controller/writes.py

def bulk_apply_task_state_updates(tx, rows):
    active_rows = [r for r in rows if r[\"v_state\"] in ACTIVE_TASK_STATES]
    non_active_rows = [r for r in rows if r[\"v_state\"] not in ACTIVE_TASK_STATES]
    base_values = dict(
        state=bindparam(\"v_state\"),
        error=func.coalesce(bindparam(\"v_error\"), tasks_table.c.error),
        exit_code=func.coalesce(bindparam(\"v_exit_code\"), tasks_table.c.exit_code),
        started_at_ms=func.coalesce(tasks_table.c.started_at_ms, bindparam(\"v_started_at_ms\")),
        finished_at_ms=bindparam(\"v_finished_at_ms\"),
        failure_count=bindparam(\"v_failure_count\"),
        preemption_count=bindparam(\"v_preemption_count\"),
    )
    if active_rows: tx.execute(update(tasks_table).where(...).values(**base_values), active_rows)
    if non_active_rows: tx.execute(update(tasks_table).where(...).values(current_worker_id=None, current_worker_address=None, **base_values), non_active_rows)

Stranded-attempt finalization is kept as a separate buffered branch (not folded into attempt_writes) so its no-state-overwrite semantics are preserved. The cascade helpers (_terminate_coscheduled_siblings, _requeue_coscheduled_siblings, _finalize_terminal_job) are unchanged — they're cold-path and didn't show up in the profile.

Tests

896 controller unit tests pass, 2097 iris unit tests pass. No tolerances relaxed, no mocks added.

Coverage already exercised the behavior we needed to preserve: state-machine branches (test_transitions.py), coscheduled cascade and bounce (test_coscheduled_*), terminal-job finalization, retry budgets, ASSIGNED → WORKER_FAILED build-failed attribution, direct-provider path (test_*_direct_provider_*), endpoint cleanup on terminal (test_terminal_states_clean_up_endpoints), test_thread_safety, test_namespace_isolation. All pass.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 72d2a17983

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

attempt_keys.append((update.task_id, task.current_attempt_id))
attempt_map = reads.bulk_get_attempts(cur, attempt_keys)
return self._apply_task_transitions(cur, req, now_ms, task_map, attempt_map)
results = self._plan_and_apply([req], now_ms, cur)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve terminal repeat updates in apply_task_updates

apply_task_updates now routes through _plan_and_apply, which prefilters updates to only state changes or ones carrying error/exit payloads. That drops a crucial case used by the stranded-attempt recovery path: when a producer-side transition (e.g. cancel/preempt) already moved the task to a terminal state, the worker’s follow-up heartbeat often reports the same terminal state with no extra payload, and that report is what stamps task_attempts.finished_at_ms. With this call path, those updates are discarded before _plan_task_transitions can run its stranded finalization branch, so attempts can remain unfinished and keep scheduler capacity artificially occupied.

Useful? React with 👍 / 👎.

return dict(row) if row is not None else None


def bulk_get_job_configs(tx: Tx, job_ids: Iterable[JobName]) -> dict[JobName, dict]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 With bulk_get_job_configs introduced here, the per-row get_job_config (reads.py:499-509) is no longer called from anywhere in the repository — both its prior call sites (_apply_task_transitions and apply_direct_provider_updates) were converted in this PR, and the only remaining reference is the back-pointer in this docstring.

Per AGENTS.md: "Delete dead code: unused parameters, stale options, old experiments." Worth dropping get_job_config and rewording this docstring to stand on its own (e.g. "Missing ids are absent from the result.").

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

agent-generated Created by automation/agent

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant