[iris] Low-risk simplification: decompose controller, fix leaks, consolidate mutations#4412
[iris] Low-risk simplification: decompose controller, fix leaks, consolidate mutations#4412
Conversation
|
Claude finished @rjpower's task in 5m 36s —— View job Review of PR #4412: [iris] Low-risk simplification
Bug:
|
|
Specification Problem
Approach Checkpoint fix (controller.py): Swap self._checkpoint_in_progress bool for self._checkpoint_paused = threading.Event(). Update 2 set/clear sites and 5 is_set() check sites. RPC leaks (cli/): Add rpc_client() context manager in cli/main.py. Replace all bare client construction across cluster.py (7 sites), process_status.py (3 sites), task.py, query.py, rpc.py with with rpc_client(...) as client:. PREEMPTED docs+tests (docs/task-states.md, test_preemption.py): Add PREEMPTED to state table, transition diagram, retry semantics, and terminal state summary. Add test_preempt_task_retries_when_budget_remains, test_preempt_task_terminal_when_budget_exhausted, test_preempt_task_cascades_coscheduled_siblings. Controller decomposition (controller.py): Extract _run_scheduling into _refresh_reservation_claims, _read_scheduling_state, _apply_scheduling_gates, _compute_scheduling_order, _run_scheduler_pass, _apply_preemptions. Extract _sync_all_execution_units into _separate_sync_results, _apply_successful_heartbeats, _handle_failed_heartbeats, _handle_sibling_worker_failures, _log_sync_health_summary. Frozen dataclasses (_SchedulingStateRead, _GatedCandidates, _SchedulingOrder) and a mutable _SyncFailureAccumulator carry data between phases. Mutation helpers (transitions.py): _terminate_task(cur, task_id, attempt_id, state, error, now_ms, *, attempt_state, worker_id, resources, failure_count, preemption_count) centralizes attempt finalization + task state update + worker column clearing + endpoint deletion + resource decommit. _assign_task(cur, task_id, worker_id, worker_address, attempt_id, now_ms) centralizes attempt creation + denormalized column sync. _remove_worker(cur, worker_id) centralizes the 4-statement worker teardown. SQL boilerplate (delete_task_endpoints, enqueue_run_dispatch, enqueue_kill_dispatch, insert_task_attempt) moved to queries.py. Tests 3 new preemption tests exercise the PREEMPTED state through the existing ControllerTestHarness. All 1776 existing tests pass. No mocks added; tests use the real DB and transition engine. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1149afa299
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| preemptions = self._apply_preemptions( | ||
| order.ordered_task_ids, | ||
| gated.jobs, | ||
| order.task_band_map, |
There was a problem hiding this comment.
Pass taint-injected requirements to preemption pass
_run_scheduler_pass() now applies reservation taint constraints into a local modified_jobs, but _apply_preemptions() is still called with gated.jobs (the unmodified requirements). In clusters using direct reservations, this allows preemption candidates to be evaluated without the required reservation EQ constraint, so a reserved task can evict lower-priority work on unclaimed workers even though it can never run there. This introduces avoidable task churn and incorrect preemption decisions compared to the previous behavior where preemption used taint-injected requirements.
Useful? React with 👍 / 👎.
Code reviewFound 2 issues:
marin/lib/iris/src/iris/cluster/controller/transitions.py Lines 392 to 394 in 1149afa
marin/lib/iris/src/iris/cluster/controller/controller.py Lines 1607 to 1620 in 1149afa Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
…tests - Replace _checkpoint_in_progress bool with threading.Event to eliminate cross-thread visibility race on checkpoint pause flag. - Fix RPC client resource leaks across 6 CLI files (12+ sites). Add rpc_client() context manager in cli/main.py that guarantees cleanup. - Document TASK_STATE_PREEMPTED in task-states.md (was orphaned from docs) and add 3 tests covering retry-with-budget, terminal-at-exhaustion, and coscheduled sibling cascade. - Decompose _run_scheduling() (190 lines) into 6 named phase methods with typed intermediate dataclasses for inter-phase data flow. - Decompose _sync_all_execution_units() (108 lines) into 5 helpers with a mutable accumulator for failure tracking across phases. - Extract 4 repeated SQL mutation helpers (delete_task_endpoints, enqueue_run_dispatch, enqueue_kill_dispatch, insert_task_attempt) into new queries.py module, replacing 20 inline call sites in transitions.py. - Add kind-based K8s local test fixtures (session-scoped kind cluster) and 3 pod lifecycle smoke tests behind @pytest.mark.kind marker.
…, rename test file - Add _terminate_task(), _assign_task(), _remove_worker() as top-level helpers in transitions.py. These enforce multi-table invariants (worker column clearing, endpoint cleanup, resource decommit) that were previously copy-pasted across 6-9 call sites each. - Remove lib/iris/tests/e2e/kind_fixtures.py and test_k8s_local.py which duplicated the existing tests/integration/iris/test_kind_gpu_canary.py with weaker coverage. - Revert kind marker from pyproject.toml and TESTING.md (not needed). - Rename test_kind_gpu_canary.py to test_iris_kind.py.
Fix two bugs introduced by the decomposition: 1. _terminate_task set finished_at_ms = now_ms for tasks retrying to PENDING (not in ACTIVE_TASK_STATES). The old code explicitly checked for PENDING and set finished_at_ms = None. Add PENDING to the condition so retried tasks do not get a spurious finished timestamp. 2. _run_scheduler_pass created modified_jobs with taint constraints but did not return it. _apply_preemptions and _cache_scheduling_diagnostics received the un-tainted gated.jobs, dropping reservation NOT_EXISTS constraints from preemption evaluation. Return modified_jobs from _run_scheduler_pass and pass it downstream.
The separate queries.py module added indirection without meaningful reuse outside transitions.py. Move the 4 helpers (delete_task_endpoints, enqueue_run_dispatch, enqueue_kill_dispatch, insert_task_attempt) directly into transitions.py alongside the compound mutation helpers that call them.
f5d7b11 to
a1fbcba
Compare
- rpc_client(): drop redundant @contextmanager wrapper since ConnectClientSync already implements __enter__/__exit__ - _run_scheduler_pass / _apply_preemptions: accept dataclass objects instead of destructured fields (8 params → 5 each) - Add thread-safety comment on _checkpoint_paused Event
a1fbcba to
106a290
Compare
yonromai
left a comment
There was a problem hiding this comment.
Approving the current head.
I re-reviewed the latest revision after the earlier automated findings. The two prior blocking issues around retry finished_at_ms handling and reservation taint propagation are fixed on 106a29058d9a099f3d40f493bba8abd883fb4419, and I did not find a new blocker on top of that. I also ran the targeted controller test slice locally and it passed (144 passed).
Generated with Codex.
…olidate mutations (#4412) Replace _checkpoint_in_progress bool with threading.Event to fix a cross-thread visibility race on the checkpoint pause flag. Fix 12+ RPC client resource leaks across 6 CLI files by introducing an rpc_client() context manager. Document the orphaned TASK_STATE_PREEMPTED in task-states.md and add 3 preemption tests. Decompose _run_scheduling() (190 lines) into 6 phase methods and _sync_all_execution_units() (108 lines) into 5 helpers. Extract _terminate_task(), _assign_task(), _remove_worker() as top-level helpers in transitions.py to centralize multi-table invariants (worker column clearing, endpoint cleanup, resource decommit) previously copy-pasted across 6-9 call sites. Move 4 SQL boilerplate helpers into queries.py. Rename test_kind_gpu_canary.py to test_iris_kind.py.
Replace _checkpoint_in_progress bool with threading.Event to fix a
cross-thread visibility race on the checkpoint pause flag. Fix 12+ RPC
client resource leaks across 6 CLI files by introducing an rpc_client()
context manager. Document the orphaned TASK_STATE_PREEMPTED in
task-states.md and add 3 preemption tests.
Decompose _run_scheduling() (190 lines) into 6 phase methods and
_sync_all_execution_units() (108 lines) into 5 helpers. Extract
_terminate_task(), _assign_task(), _remove_worker() as top-level helpers
in transitions.py to centralize multi-table invariants (worker column
clearing, endpoint cleanup, resource decommit) previously copy-pasted
across 6-9 call sites. Move 4 SQL boilerplate helpers into queries.py.
Rename test_kind_gpu_canary.py to test_iris_kind.py.