Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions lib/iris/src/iris/cluster/providers/k8s/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from iris.cluster.controller.transitions import DirectProviderBatch, RunningTaskEntry, TaskUpdate
from iris.cluster.providers.k8s.constants import CW_INTERRUPTABLE_TOLERATION, NVIDIA_GPU_TOLERATION
from iris.cluster.providers.k8s.service import K8sService
from iris.cluster.providers.k8s.types import KubectlLogLine, parse_k8s_quantity
from iris.cluster.providers.k8s.types import KubectlError, KubectlLogLine, parse_k8s_quantity
from iris.cluster.runtime.env import build_common_iris_env, normalize_workdir_relative_path
from iris.cluster.types import JobName, get_gpu_count
from iris.logging import parse_log_level, str_to_log_level
Expand Down Expand Up @@ -607,11 +607,23 @@ class K8sTaskProvider:

def sync(self, batch: DirectProviderBatch) -> DirectProviderSyncResult:
"""Sync task state: apply new pods, delete killed pods, poll running pods."""
apply_failures: list[TaskUpdate] = []
for run_req in batch.tasks_to_run:
self._apply_pod(run_req)
try:
self._apply_pod(run_req)
except KubectlError as exc:
logger.error("Failed to apply pod for task %s: %s", run_req.task_id, exc)
apply_failures.append(
TaskUpdate(
task_id=JobName.from_wire(run_req.task_id),
attempt_id=run_req.attempt_id,
new_state=cluster_pb2.TASK_STATE_FAILED,
error=str(exc),
)
)
for task_id in batch.tasks_to_kill:
self._delete_pods_by_task_id(task_id)
updates = self._poll_pods(batch.running_tasks)
updates = apply_failures + self._poll_pods(batch.running_tasks)
capacity = self._query_capacity()
scheduling_events = self._fetch_scheduling_events()
return DirectProviderSyncResult(updates=updates, scheduling_events=scheduling_events, capacity=capacity)
Expand Down
22 changes: 22 additions & 0 deletions lib/iris/tests/cluster/controller/test_direct_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,28 @@ def test_apply_failed_no_retry(state):
assert task.failure_count == 1


def test_apply_failed_directly_from_assigned(state):
"""ASSIGNED -> FAILED without going through RUNNING (e.g. ConfigMap too large)."""
[task_id] = submit_direct_job(state, "fail-on-apply")
batch = state.drain_for_direct_provider()
attempt_id = batch.tasks_to_run[0].attempt_id

state.apply_direct_provider_updates(
[
TaskUpdate(
task_id=task_id,
attempt_id=attempt_id,
new_state=cluster_pb2.TASK_STATE_FAILED,
error="kubectl apply failed: RequestEntityTooLarge",
),
]
)

task = query_task(state, task_id)
assert task.state == cluster_pb2.TASK_STATE_FAILED
assert task.error == "kubectl apply failed: RequestEntityTooLarge"


def test_apply_worker_failed_from_running_retries(state):
"""WORKER_FAILED from RUNNING with retries remaining returns to PENDING."""
jid = JobName.root("test-user", "wf-retry")
Expand Down
20 changes: 19 additions & 1 deletion lib/iris/tests/cluster/providers/k8s/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_sync_applies_pods_for_tasks_to_run(provider, k8s):
assert result.updates == []


def test_sync_propagates_kubectl_failure(provider, k8s):
def test_sync_propagates_non_kubectl_failure(provider, k8s):
k8s.inject_failure("apply_json", RuntimeError("kubectl down"))
req = make_run_req("/test-job/0")
batch = make_batch(tasks_to_run=[req])
Expand All @@ -51,6 +51,24 @@ def test_sync_propagates_kubectl_failure(provider, k8s):
provider.sync(batch)


def test_sync_catches_kubectl_error_and_returns_task_failure(provider, k8s):
from iris.cluster.providers.k8s.types import KubectlError

k8s.inject_failure(
"apply_json",
KubectlError("kubectl apply failed: Error from server (RequestEntityTooLarge): limit is 3145728"),
)
req = make_run_req("/test-job/0")
batch = make_batch(tasks_to_run=[req])

result = provider.sync(batch)

assert len(result.updates) == 1
update = result.updates[0]
assert update.new_state == cluster_pb2.TASK_STATE_FAILED
assert "RequestEntityTooLarge" in update.error


# ---------------------------------------------------------------------------
# sync(): tasks_to_kill
# ---------------------------------------------------------------------------
Expand Down
12 changes: 10 additions & 2 deletions lib/zephyr/src/zephyr/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ class _CoordinatorJobConfig:
pipeline_id: int


def _run_coordinator_job(config: _CoordinatorJobConfig, result_path: str) -> None:
def _run_coordinator_job(config_path: str, result_path: str) -> None:
"""Entrypoint for the coordinator job.

Hosts the coordinator actor in-process via host_actor(), creates
Expand All @@ -1154,6 +1154,10 @@ def _run_coordinator_job(config: _CoordinatorJobConfig, result_path: str) -> Non
"""
from fray.v2.client import current_client

logger.info("Loading coordinator config from %s", config_path)
with open_url(config_path, "rb") as f:
config: _CoordinatorJobConfig = cloudpickle.loads(f.read())

logger.info(
"Coordinator job starting: name=%s, execution_id=%s, pipeline=%d",
config.name,
Expand Down Expand Up @@ -1370,6 +1374,7 @@ def execute(
"Starting zephyr pipeline: %s (pipeline %d, attempt %d)", execution_id, self._pipeline_id, attempt
)

config_path = f"{self.chunk_storage_prefix}/{execution_id}/job-config.pkl"
result_path = f"{self.chunk_storage_prefix}/{execution_id}/results.pkl"

try:
Expand All @@ -1385,6 +1390,9 @@ def execute(
name=self.name,
pipeline_id=self._pipeline_id,
)
ensure_parent_dir(config_path)
with open_url(config_path, "wb") as f:
f.write(cloudpickle.dumps(config))

job_name = f"zephyr-{self.name}-p{self._pipeline_id}-a{attempt}"
# The wrapper job just blocks on child actors; real
Expand All @@ -1399,7 +1407,7 @@ def execute(
name=job_name,
entrypoint=Entrypoint.from_callable(
_run_coordinator_job,
args=(config, result_path),
args=(config_path, result_path),
),
resources=ResourceConfig(cpu=1, ram="1g"),
)
Expand Down
Loading