Skip to content

Commit c28ae61

Browse files
yoblinclaude
andcommitted
[iris/zephyr] Propagate KubectlError; preserve LocalClient closure semantics
- Provider catches KubectlError during pod apply and returns it as a TASK_STATE_FAILED update with the real error, instead of masking it as "Pod not found". Includes transition test for ASSIGNED->FAILED. - Config-to-disk only on distributed backends. LocalClient passes the config object inline to preserve closure semantics for callers that mutate enclosing-scope state (e.g. _load_fuzzy_dupe_map_shard). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent cb15073 commit c28ae61

4 files changed

Lines changed: 86 additions & 12 deletions

File tree

lib/iris/src/iris/cluster/k8s/provider.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from iris.cluster.controller.transitions import ClusterCapacity, DirectProviderSyncResult, SchedulingEvent
2323
from iris.cluster.controller.transitions import DirectProviderBatch, RunningTaskEntry, TaskUpdate
2424
from iris.cluster.k8s.constants import CW_INTERRUPTABLE_TOLERATION, NVIDIA_GPU_TOLERATION
25-
from iris.cluster.k8s.kubectl import Kubectl, KubectlLogLine
25+
from iris.cluster.k8s.kubectl import Kubectl, KubectlError, KubectlLogLine
2626
from iris.cluster.runtime.env import build_common_iris_env, normalize_workdir_relative_path
2727
from iris.cluster.types import JobName, get_gpu_count
2828
from iris.logging import parse_log_level, str_to_log_level
@@ -619,11 +619,23 @@ class KubernetesProvider:
619619

620620
def sync(self, batch: DirectProviderBatch) -> DirectProviderSyncResult:
621621
"""Sync task state: apply new pods, delete killed pods, poll running pods."""
622+
apply_failures: list[TaskUpdate] = []
622623
for run_req in batch.tasks_to_run:
623-
self._apply_pod(run_req)
624+
try:
625+
self._apply_pod(run_req)
626+
except KubectlError as exc:
627+
logger.error("Failed to apply pod for task %s: %s", run_req.task_id, exc)
628+
apply_failures.append(
629+
TaskUpdate(
630+
task_id=JobName.from_wire(run_req.task_id),
631+
attempt_id=run_req.attempt_id,
632+
new_state=cluster_pb2.TASK_STATE_FAILED,
633+
error=str(exc),
634+
)
635+
)
624636
for task_id in batch.tasks_to_kill:
625637
self._delete_pods_by_task_id(task_id)
626-
updates = self._poll_pods(batch.running_tasks)
638+
updates = apply_failures + self._poll_pods(batch.running_tasks)
627639
capacity = self._query_capacity()
628640
scheduling_events = self._fetch_scheduling_events()
629641
return DirectProviderSyncResult(updates=updates, scheduling_events=scheduling_events, capacity=capacity)

lib/iris/tests/cluster/controller/test_direct_controller.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,30 @@ def test_apply_failed_no_retry():
219219
assert task.failure_count == 1
220220

221221

222+
def test_apply_failed_directly_from_assigned():
223+
"""ASSIGNED -> FAILED without going through RUNNING (e.g. ConfigMap too large)."""
224+
state = make_controller_state()
225+
[task_id] = submit_direct_job(state, "fail-on-apply")
226+
batch = state.drain_for_direct_provider()
227+
attempt_id = batch.tasks_to_run[0].attempt_id
228+
229+
# Skip RUNNING -- fail immediately from ASSIGNED.
230+
state.apply_direct_provider_updates(
231+
[
232+
TaskUpdate(
233+
task_id=task_id,
234+
attempt_id=attempt_id,
235+
new_state=cluster_pb2.TASK_STATE_FAILED,
236+
error="kubectl apply failed: RequestEntityTooLarge",
237+
),
238+
]
239+
)
240+
241+
task = query_task(state, task_id)
242+
assert task.state == cluster_pb2.TASK_STATE_FAILED
243+
assert task.error == "kubectl apply failed: RequestEntityTooLarge"
244+
245+
222246
def test_apply_worker_failed_from_running_retries():
223247
"""WORKER_FAILED from RUNNING with retries remaining returns to PENDING."""
224248
state = make_controller_state()

lib/iris/tests/kubernetes/test_provider.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def test_sync_applies_pods_for_tasks_to_run(provider, mock_kubectl):
4242
assert result.updates == []
4343

4444

45-
def test_sync_propagates_kubectl_failure(provider, mock_kubectl):
45+
def test_sync_propagates_non_kubectl_failure(provider, mock_kubectl):
4646
mock_kubectl.apply_json.side_effect = RuntimeError("kubectl down")
4747
req = make_run_req("/test-job/0")
4848
batch = make_batch(tasks_to_run=[req])
@@ -51,6 +51,23 @@ def test_sync_propagates_kubectl_failure(provider, mock_kubectl):
5151
provider.sync(batch)
5252

5353

54+
def test_sync_catches_kubectl_error_and_returns_task_failure(provider, mock_kubectl):
55+
from iris.cluster.k8s.kubectl import KubectlError
56+
57+
mock_kubectl.apply_json.side_effect = KubectlError(
58+
"kubectl apply failed: Error from server (RequestEntityTooLarge): limit is 3145728"
59+
)
60+
req = make_run_req("/test-job/0")
61+
batch = make_batch(tasks_to_run=[req])
62+
63+
result = provider.sync(batch)
64+
65+
assert len(result.updates) == 1
66+
update = result.updates[0]
67+
assert update.new_state == cluster_pb2.TASK_STATE_FAILED
68+
assert "RequestEntityTooLarge" in update.error
69+
70+
5471
# ---------------------------------------------------------------------------
5572
# sync(): tasks_to_kill
5673
# ---------------------------------------------------------------------------

lib/zephyr/src/zephyr/execution.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1309,19 +1309,26 @@ class _CoordinatorJobConfig:
13091309
pipeline_id: int
13101310

13111311

1312-
def _run_coordinator_job(config_path: str, result_path: str) -> None:
1312+
def _run_coordinator_job(config_or_path: _CoordinatorJobConfig | str, result_path: str) -> None:
13131313
"""Entrypoint for the coordinator job.
13141314
13151315
Hosts the coordinator actor in-process via host_actor(), creates
13161316
worker actors as child jobs, runs the pipeline, and writes results
13171317
to disk. The coordinator monitors worker job health directly in its
13181318
maintenance loop (no separate watchdog thread).
1319+
1320+
``config_or_path`` is either the config object directly (LocalClient,
1321+
no serialization boundary) or a storage URL to load from (distributed
1322+
backends, avoids K8s ConfigMap 3 MiB limit).
13191323
"""
13201324
from fray.v2.client import current_client
13211325

1322-
logger.info("Loading coordinator config from %s", config_path)
1323-
with open_url(config_path, "rb") as f:
1324-
config: _CoordinatorJobConfig = cloudpickle.loads(f.read())
1326+
if isinstance(config_or_path, str):
1327+
logger.info("Loading coordinator config from %s", config_or_path)
1328+
with open_url(config_or_path, "rb") as f:
1329+
config = cloudpickle.loads(f.read())
1330+
else:
1331+
config = config_or_path
13251332

13261333
logger.info(
13271334
"Coordinator job starting: name=%s, execution_id=%s, pipeline=%d",
@@ -1560,9 +1567,23 @@ def execute(
15601567
name=self.name,
15611568
pipeline_id=self._pipeline_id,
15621569
)
1563-
ensure_parent_dir(config_path)
1564-
with open_url(config_path, "wb") as f:
1565-
f.write(cloudpickle.dumps(config))
1570+
1571+
# Distributed backends serialize the entrypoint into a K8s
1572+
# ConfigMap (3 MiB hard limit). Upload the config to shared
1573+
# storage and pass only the URL to keep the pickle small.
1574+
# LocalClient runs in-process with no serialization boundary,
1575+
# so pass the config object directly — this preserves closure
1576+
# semantics for callers that rely on mutating enclosing-scope
1577+
# state (e.g. _load_fuzzy_dupe_map_shard).
1578+
from fray.v2.local_backend import LocalClient
1579+
1580+
if isinstance(self.client, LocalClient):
1581+
entrypoint_args: tuple = (config, result_path)
1582+
else:
1583+
ensure_parent_dir(config_path)
1584+
with open_url(config_path, "wb") as f:
1585+
f.write(cloudpickle.dumps(config))
1586+
entrypoint_args = (config_path, result_path)
15661587

15671588
job_name = f"zephyr-{self.name}-p{self._pipeline_id}-a{attempt}"
15681589
# The wrapper job just blocks on child actors; real
@@ -1577,7 +1598,7 @@ def execute(
15771598
name=job_name,
15781599
entrypoint=Entrypoint.from_callable(
15791600
_run_coordinator_job,
1580-
args=(config_path, result_path),
1601+
args=entrypoint_args,
15811602
),
15821603
resources=self.coordinator_resources,
15831604
)

0 commit comments

Comments
 (0)