[iris] Batch heartbeat-path SQL writes#5774
Conversation
The heartbeat-apply loop issued one SELECT (job_config) plus two UPDATEs (task_attempts, tasks) per task per worker, plus an unconditional endpoints DELETE per terminal task. A profile of /system/controller showed this as the dominant cost on the polling thread. Split _apply_task_transitions into a pure _plan_task_transitions that buffers writes into a _TransitionPlan, then a dispatcher that flushes one executemany per table before running cascade and recompute work (which read post-flush state). Add bulk_get_job_configs, bulk_apply_attempt_updates, bulk_apply_task_state_updates, and EndpointsProjection.remove_by_tasks. Replace the per-row writers entirely. Refactor apply_direct_provider_updates through the same dispatcher and extract the shared state machine into _compute_transition.
|
Claude finished @rjpower's task in 9m 17s —— View job Code review
1 issue posted as inline comment:
Considered but not flagged (low signal / out of scope):
Refactor correctness (verified, no issues found):
|
|
🤖 Spec for this PR. Problem
A ApproachSplit decision from emission so the steady-state writes can be flushed with executemany.
The state machine extracted into
Key codeThe dispatcher: bulk writes flushed before cascade/recompute reads. def _apply_transition_plan(self, cur, plan, now_ms, results):
writes.bulk_apply_attempt_updates(cur, plan.attempt_writes)
writes.bulk_apply_task_state_updates(cur, plan.task_writes)
if plan.container_id_writes:
cur.execute(
sa_update(tasks_table)
.where(tasks_table.c.task_id == bindparam(\"b_task_id\"))
.values(container_id=bindparam(\"v_container_id\")),
plan.container_id_writes,
)
if plan.endpoints_to_delete:
seen: set[JobName] = set()
unique = [t for t in plan.endpoints_to_delete if not (t in seen or seen.add(t))]
self._endpoints.remove_by_tasks(cur, unique)
for sa in plan.stranded_attempt_finalizations:
cur.execute(sa_update(task_attempts_table).where(...).values(finished_at_ms=sa[\"v_finished_at_ms\"]))
for wid in plan.build_failed_workers:
self._health.build_failed(wid)
# cascades and per-job recompute observe post-flush state
for cascade in plan.cascade_actions: ...
for recompute in plan.recompute_actions: ...The bulk task UPDATE partitions on active vs non-active so each batch has one SQL shape. def bulk_apply_task_state_updates(tx, rows):
active_rows = [r for r in rows if r[\"v_state\"] in ACTIVE_TASK_STATES]
non_active_rows = [r for r in rows if r[\"v_state\"] not in ACTIVE_TASK_STATES]
base_values = dict(
state=bindparam(\"v_state\"),
error=func.coalesce(bindparam(\"v_error\"), tasks_table.c.error),
exit_code=func.coalesce(bindparam(\"v_exit_code\"), tasks_table.c.exit_code),
started_at_ms=func.coalesce(tasks_table.c.started_at_ms, bindparam(\"v_started_at_ms\")),
finished_at_ms=bindparam(\"v_finished_at_ms\"),
failure_count=bindparam(\"v_failure_count\"),
preemption_count=bindparam(\"v_preemption_count\"),
)
if active_rows: tx.execute(update(tasks_table).where(...).values(**base_values), active_rows)
if non_active_rows: tx.execute(update(tasks_table).where(...).values(current_worker_id=None, current_worker_address=None, **base_values), non_active_rows)Stranded-attempt finalization is kept as a separate buffered branch (not folded into Tests896 controller unit tests pass, 2097 iris unit tests pass. No tolerances relaxed, no mocks added. Coverage already exercised the behavior we needed to preserve: state-machine branches ( |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 72d2a17983
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| attempt_keys.append((update.task_id, task.current_attempt_id)) | ||
| attempt_map = reads.bulk_get_attempts(cur, attempt_keys) | ||
| return self._apply_task_transitions(cur, req, now_ms, task_map, attempt_map) | ||
| results = self._plan_and_apply([req], now_ms, cur) |
There was a problem hiding this comment.
Preserve terminal repeat updates in apply_task_updates
apply_task_updates now routes through _plan_and_apply, which prefilters updates to only state changes or ones carrying error/exit payloads. That drops a crucial case used by the stranded-attempt recovery path: when a producer-side transition (e.g. cancel/preempt) already moved the task to a terminal state, the worker’s follow-up heartbeat often reports the same terminal state with no extra payload, and that report is what stamps task_attempts.finished_at_ms. With this call path, those updates are discarded before _plan_task_transitions can run its stranded finalization branch, so attempts can remain unfinished and keep scheduler capacity artificially occupied.
Useful? React with 👍 / 👎.
| return dict(row) if row is not None else None | ||
|
|
||
|
|
||
| def bulk_get_job_configs(tx: Tx, job_ids: Iterable[JobName]) -> dict[JobName, dict]: |
There was a problem hiding this comment.
🤖 With bulk_get_job_configs introduced here, the per-row get_job_config (reads.py:499-509) is no longer called from anywhere in the repository — both its prior call sites (_apply_task_transitions and apply_direct_provider_updates) were converted in this PR, and the only remaining reference is the back-pointer in this docstring.
Per AGENTS.md: "Delete dead code: unused parameters, stale options, old experiments." Worth dropping get_job_config and rewording this docstring to stand on its own (e.g. "Missing ids are absent from the result.").
The heartbeat-apply loop issued one SELECT (job_config) plus two UPDATEs (task_attempts, tasks) per task per worker, plus an unconditional endpoints DELETE per terminal task. A profile of /system/controller showed this as the dominant cost on the polling thread.
Split _apply_task_transitions into a pure _plan_task_transitions that buffers writes into a _TransitionPlan, then a dispatcher that flushes one executemany per table before running cascade and recompute work (which read post-flush state). Add bulk_get_job_configs, bulk_apply_attempt_updates, bulk_apply_task_state_updates, and EndpointsProjection.remove_by_tasks. Replace the per-row writers entirely. Refactor apply_direct_provider_updates through the same dispatcher; extract the shared state machine into _compute_transition.