Skip to content

Commit dfdc21f

Browse files
yoblinclaude
andcommitted
[iris/zephyr] Propagate KubectlError; preserve LocalClient closure semantics
- Provider catches KubectlError during pod apply and returns it as a TASK_STATE_FAILED update with the real error, instead of masking it as "Pod not found". Includes transition test for ASSIGNED->FAILED. - Config-to-disk only on distributed backends. LocalClient passes the config object inline to preserve closure semantics for callers that mutate enclosing-scope state (e.g. _load_fuzzy_dupe_map_shard). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 55c83da commit dfdc21f

4 files changed

Lines changed: 86 additions & 12 deletions

File tree

lib/iris/src/iris/cluster/k8s/provider.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from iris.cluster.controller.transitions import ClusterCapacity, DirectProviderSyncResult, SchedulingEvent
2323
from iris.cluster.controller.transitions import DirectProviderBatch, RunningTaskEntry, TaskUpdate
2424
from iris.cluster.k8s.constants import CW_INTERRUPTABLE_TOLERATION, NVIDIA_GPU_TOLERATION
25-
from iris.cluster.k8s.kubectl import Kubectl, KubectlLogLine
25+
from iris.cluster.k8s.kubectl import Kubectl, KubectlError, KubectlLogLine
2626
from iris.cluster.runtime.env import build_common_iris_env, normalize_workdir_relative_path
2727
from iris.cluster.types import JobName, get_gpu_count
2828
from iris.rpc import cluster_pb2, logging_pb2
@@ -608,11 +608,23 @@ class KubernetesProvider:
608608

609609
def sync(self, batch: DirectProviderBatch) -> DirectProviderSyncResult:
610610
"""Sync task state: apply new pods, delete killed pods, poll running pods."""
611+
apply_failures: list[TaskUpdate] = []
611612
for run_req in batch.tasks_to_run:
612-
self._apply_pod(run_req)
613+
try:
614+
self._apply_pod(run_req)
615+
except KubectlError as exc:
616+
logger.error("Failed to apply pod for task %s: %s", run_req.task_id, exc)
617+
apply_failures.append(
618+
TaskUpdate(
619+
task_id=JobName.from_wire(run_req.task_id),
620+
attempt_id=run_req.attempt_id,
621+
new_state=cluster_pb2.TASK_STATE_FAILED,
622+
error=str(exc),
623+
)
624+
)
613625
for task_id in batch.tasks_to_kill:
614626
self._delete_pods_by_task_id(task_id)
615-
updates = self._poll_pods(batch.running_tasks)
627+
updates = apply_failures + self._poll_pods(batch.running_tasks)
616628
capacity = self._query_capacity()
617629
scheduling_events = self._fetch_scheduling_events()
618630
return DirectProviderSyncResult(updates=updates, scheduling_events=scheduling_events, capacity=capacity)

lib/iris/tests/cluster/controller/test_direct_controller.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,30 @@ def test_apply_failed_no_retry():
219219
assert task.failure_count == 1
220220

221221

222+
def test_apply_failed_directly_from_assigned():
223+
"""ASSIGNED -> FAILED without going through RUNNING (e.g. ConfigMap too large)."""
224+
state = make_controller_state()
225+
[task_id] = submit_direct_job(state, "fail-on-apply")
226+
batch = state.drain_for_direct_provider()
227+
attempt_id = batch.tasks_to_run[0].attempt_id
228+
229+
# Skip RUNNING -- fail immediately from ASSIGNED.
230+
state.apply_direct_provider_updates(
231+
[
232+
TaskUpdate(
233+
task_id=task_id,
234+
attempt_id=attempt_id,
235+
new_state=cluster_pb2.TASK_STATE_FAILED,
236+
error="kubectl apply failed: RequestEntityTooLarge",
237+
),
238+
]
239+
)
240+
241+
task = query_task(state, task_id)
242+
assert task.state == cluster_pb2.TASK_STATE_FAILED
243+
assert task.error == "kubectl apply failed: RequestEntityTooLarge"
244+
245+
222246
def test_apply_worker_failed_from_running_retries():
223247
"""WORKER_FAILED from RUNNING with retries remaining returns to PENDING."""
224248
state = make_controller_state()

lib/iris/tests/kubernetes/test_provider.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def test_sync_applies_pods_for_tasks_to_run(provider, mock_kubectl):
4141
assert result.updates == []
4242

4343

44-
def test_sync_propagates_kubectl_failure(provider, mock_kubectl):
44+
def test_sync_propagates_non_kubectl_failure(provider, mock_kubectl):
4545
mock_kubectl.apply_json.side_effect = RuntimeError("kubectl down")
4646
req = make_run_req("/test-job/0")
4747
batch = make_batch(tasks_to_run=[req])
@@ -50,6 +50,23 @@ def test_sync_propagates_kubectl_failure(provider, mock_kubectl):
5050
provider.sync(batch)
5151

5252

53+
def test_sync_catches_kubectl_error_and_returns_task_failure(provider, mock_kubectl):
54+
from iris.cluster.k8s.kubectl import KubectlError
55+
56+
mock_kubectl.apply_json.side_effect = KubectlError(
57+
"kubectl apply failed: Error from server (RequestEntityTooLarge): limit is 3145728"
58+
)
59+
req = make_run_req("/test-job/0")
60+
batch = make_batch(tasks_to_run=[req])
61+
62+
result = provider.sync(batch)
63+
64+
assert len(result.updates) == 1
65+
update = result.updates[0]
66+
assert update.new_state == cluster_pb2.TASK_STATE_FAILED
67+
assert "RequestEntityTooLarge" in update.error
68+
69+
5370
# ---------------------------------------------------------------------------
5471
# sync(): tasks_to_kill
5572
# ---------------------------------------------------------------------------

lib/zephyr/src/zephyr/execution.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1302,19 +1302,26 @@ class _CoordinatorJobConfig:
13021302
pipeline_id: int
13031303

13041304

1305-
def _run_coordinator_job(config_path: str, result_path: str) -> None:
1305+
def _run_coordinator_job(config_or_path: _CoordinatorJobConfig | str, result_path: str) -> None:
13061306
"""Entrypoint for the coordinator job.
13071307
13081308
Hosts the coordinator actor in-process via host_actor(), creates
13091309
worker actors as child jobs, runs the pipeline, and writes results
13101310
to disk. The coordinator monitors worker job health directly in its
13111311
maintenance loop (no separate watchdog thread).
1312+
1313+
``config_or_path`` is either the config object directly (LocalClient,
1314+
no serialization boundary) or a storage URL to load from (distributed
1315+
backends, avoids K8s ConfigMap 3 MiB limit).
13121316
"""
13131317
from fray.v2.client import current_client
13141318

1315-
logger.info("Loading coordinator config from %s", config_path)
1316-
with open_url(config_path, "rb") as f:
1317-
config: _CoordinatorJobConfig = cloudpickle.loads(f.read())
1319+
if isinstance(config_or_path, str):
1320+
logger.info("Loading coordinator config from %s", config_or_path)
1321+
with open_url(config_or_path, "rb") as f:
1322+
config = cloudpickle.loads(f.read())
1323+
else:
1324+
config = config_or_path
13181325

13191326
logger.info(
13201327
"Coordinator job starting: name=%s, execution_id=%s, pipeline=%d",
@@ -1547,9 +1554,23 @@ def execute(
15471554
name=self.name,
15481555
pipeline_id=self._pipeline_id,
15491556
)
1550-
ensure_parent_dir(config_path)
1551-
with open_url(config_path, "wb") as f:
1552-
f.write(cloudpickle.dumps(config))
1557+
1558+
# Distributed backends serialize the entrypoint into a K8s
1559+
# ConfigMap (3 MiB hard limit). Upload the config to shared
1560+
# storage and pass only the URL to keep the pickle small.
1561+
# LocalClient runs in-process with no serialization boundary,
1562+
# so pass the config object directly — this preserves closure
1563+
# semantics for callers that rely on mutating enclosing-scope
1564+
# state (e.g. _load_fuzzy_dupe_map_shard).
1565+
from fray.v2.local_backend import LocalClient
1566+
1567+
if isinstance(self.client, LocalClient):
1568+
entrypoint_args: tuple = (config, result_path)
1569+
else:
1570+
ensure_parent_dir(config_path)
1571+
with open_url(config_path, "wb") as f:
1572+
f.write(cloudpickle.dumps(config))
1573+
entrypoint_args = (config_path, result_path)
15531574

15541575
job_name = f"zephyr-{self.name}-p{self._pipeline_id}-a{attempt}"
15551576
# The wrapper job just blocks on child actors; real
@@ -1564,7 +1585,7 @@ def execute(
15641585
name=job_name,
15651586
entrypoint=Entrypoint.from_callable(
15661587
_run_coordinator_job,
1567-
args=(config_path, result_path),
1588+
args=entrypoint_args,
15681589
),
15691590
resources=ResourceConfig(cpu=1, ram="1g"),
15701591
)

0 commit comments

Comments
 (0)