Skip to content

Commit e7fd726

Browse files
yoblinclaude
andcommitted
[zephyr/iris] Fix ConfigMap size limit for large pipelines (#3908)
Two fixes for coordinator-as-job on KubernetesProvider: 1. Provider now catches KubectlError during pod apply and returns it as a TaskUpdate with the real error message, instead of letting it propagate and later surface as misleading "Pod not found". 2. Coordinator config is uploaded to object storage as job-config.pkl. The Entrypoint pickle contains only two string URLs, keeping the K8s ConfigMap payload trivially small regardless of dataset size. Depends on #3919 which fixes closure-mutation semantics in fuzzy dedup so that the cloudpickle round-trip through storage is safe. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent cbd1540 commit e7fd726

3 files changed

Lines changed: 43 additions & 6 deletions

File tree

lib/iris/src/iris/cluster/k8s/provider.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from iris.cluster.controller.transitions import ClusterCapacity, DirectProviderSyncResult, SchedulingEvent
2323
from iris.cluster.controller.transitions import DirectProviderBatch, RunningTaskEntry, TaskUpdate
2424
from iris.cluster.k8s.constants import CW_INTERRUPTABLE_TOLERATION, NVIDIA_GPU_TOLERATION
25-
from iris.cluster.k8s.kubectl import Kubectl, KubectlLogLine
25+
from iris.cluster.k8s.kubectl import Kubectl, KubectlError, KubectlLogLine
2626
from iris.cluster.runtime.env import build_common_iris_env, normalize_workdir_relative_path
2727
from iris.cluster.types import JobName, get_gpu_count
2828
from iris.rpc import cluster_pb2, logging_pb2
@@ -608,11 +608,23 @@ class KubernetesProvider:
608608

609609
def sync(self, batch: DirectProviderBatch) -> DirectProviderSyncResult:
610610
"""Sync task state: apply new pods, delete killed pods, poll running pods."""
611+
apply_failures: list[TaskUpdate] = []
611612
for run_req in batch.tasks_to_run:
612-
self._apply_pod(run_req)
613+
try:
614+
self._apply_pod(run_req)
615+
except KubectlError as exc:
616+
logger.error("Failed to apply pod for task %s: %s", run_req.task_id, exc)
617+
apply_failures.append(
618+
TaskUpdate(
619+
task_id=JobName.from_wire(run_req.task_id),
620+
attempt_id=run_req.attempt_id,
621+
new_state=cluster_pb2.TASK_STATE_FAILED,
622+
error=str(exc),
623+
)
624+
)
613625
for task_id in batch.tasks_to_kill:
614626
self._delete_pods_by_task_id(task_id)
615-
updates = self._poll_pods(batch.running_tasks)
627+
updates = apply_failures + self._poll_pods(batch.running_tasks)
616628
capacity = self._query_capacity()
617629
scheduling_events = self._fetch_scheduling_events()
618630
return DirectProviderSyncResult(updates=updates, scheduling_events=scheduling_events, capacity=capacity)

lib/iris/tests/kubernetes/test_provider.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def test_sync_applies_pods_for_tasks_to_run(provider, mock_kubectl):
4141
assert result.updates == []
4242

4343

44-
def test_sync_propagates_kubectl_failure(provider, mock_kubectl):
44+
def test_sync_propagates_non_kubectl_failure(provider, mock_kubectl):
4545
mock_kubectl.apply_json.side_effect = RuntimeError("kubectl down")
4646
req = make_run_req("/test-job/0")
4747
batch = make_batch(tasks_to_run=[req])
@@ -50,6 +50,23 @@ def test_sync_propagates_kubectl_failure(provider, mock_kubectl):
5050
provider.sync(batch)
5151

5252

53+
def test_sync_catches_kubectl_error_and_returns_task_failure(provider, mock_kubectl):
54+
from iris.cluster.k8s.kubectl import KubectlError
55+
56+
mock_kubectl.apply_json.side_effect = KubectlError(
57+
"kubectl apply failed: Error from server (RequestEntityTooLarge): limit is 3145728"
58+
)
59+
req = make_run_req("/test-job/0")
60+
batch = make_batch(tasks_to_run=[req])
61+
62+
result = provider.sync(batch)
63+
64+
assert len(result.updates) == 1
65+
update = result.updates[0]
66+
assert update.new_state == cluster_pb2.TASK_STATE_FAILED
67+
assert "RequestEntityTooLarge" in update.error
68+
69+
5370
# ---------------------------------------------------------------------------
5471
# sync(): tasks_to_kill
5572
# ---------------------------------------------------------------------------

lib/zephyr/src/zephyr/execution.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1301,7 +1301,7 @@ class _CoordinatorJobConfig:
13011301
pipeline_id: int
13021302

13031303

1304-
def _run_coordinator_job(config: _CoordinatorJobConfig, result_path: str) -> None:
1304+
def _run_coordinator_job(config_path: str, result_path: str) -> None:
13051305
"""Entrypoint for the coordinator job.
13061306
13071307
Hosts the coordinator actor in-process via host_actor(), creates
@@ -1311,6 +1311,10 @@ def _run_coordinator_job(config: _CoordinatorJobConfig, result_path: str) -> Non
13111311
"""
13121312
from fray.v2.client import current_client
13131313

1314+
logger.info("Loading coordinator config from %s", config_path)
1315+
with open_url(config_path, "rb") as f:
1316+
config: _CoordinatorJobConfig = cloudpickle.loads(f.read())
1317+
13141318
logger.info(
13151319
"Coordinator job starting: name=%s, execution_id=%s, pipeline=%d",
13161320
config.name,
@@ -1525,6 +1529,7 @@ def execute(
15251529
"Starting zephyr pipeline: %s (pipeline %d, attempt %d)", execution_id, self._pipeline_id, attempt
15261530
)
15271531

1532+
config_path = f"{self.chunk_storage_prefix}/{execution_id}/job-config.pkl"
15281533
result_path = f"{self.chunk_storage_prefix}/{execution_id}/results.pkl"
15291534

15301535
try:
@@ -1541,6 +1546,9 @@ def execute(
15411546
name=self.name,
15421547
pipeline_id=self._pipeline_id,
15431548
)
1549+
ensure_parent_dir(config_path)
1550+
with open_url(config_path, "wb") as f:
1551+
f.write(cloudpickle.dumps(config))
15441552

15451553
job_name = f"zephyr-{self.name}-p{self._pipeline_id}-a{attempt}"
15461554
# The wrapper job just blocks on child actors; real
@@ -1550,7 +1558,7 @@ def execute(
15501558
name=job_name,
15511559
entrypoint=Entrypoint.from_callable(
15521560
_run_coordinator_job,
1553-
args=(config, result_path),
1561+
args=(config_path, result_path),
15541562
),
15551563
resources=ResourceConfig(cpu=1, ram="1g"),
15561564
)

0 commit comments

Comments
 (0)