Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions lib/iris/src/iris/cluster/controller/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,10 +842,20 @@ def launch_job(
f"{priority_band_name(user_budget.max_band)})",
)

# Reject submissions if the parent job has already terminated
# Reject submissions whose parent is absent or already terminated.
# Absent parents can appear after a controller restart restores from a
# checkpoint that did not capture the parent row; accepting the child
# anyway would insert an orphan with `parent_job_id = NULL` and a
# `depth` computed from the name path, which the dashboard `WHERE
# depth = 1` query never surfaces.
if job_id.parent:
parent_state = _job_state(self._db, job_id.parent)
if parent_state is not None and parent_state in TERMINAL_JOB_STATES:
if parent_state is None:
raise ConnectError(
Code.FAILED_PRECONDITION,
f"Cannot submit job: parent job {job_id.parent} is absent from the database",
)
if parent_state in TERMINAL_JOB_STATES:
raise ConnectError(
Code.FAILED_PRECONDITION,
f"Cannot submit job: parent job {job_id.parent} has terminated "
Expand Down
11 changes: 5 additions & 6 deletions lib/iris/src/iris/cluster/controller/transitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,18 +978,17 @@ def submit_job(
cur.execute("UPDATE meta SET value = ? WHERE key = 'last_submission_ms'", (effective_submission_ms,))

parent_job_id = job_id.parent.to_wire() if job_id.parent is not None else None
if parent_job_id is not None:
parent_exists = cur.execute("SELECT 1 FROM jobs WHERE job_id = ?", (parent_job_id,)).fetchone()
if parent_exists is None:
parent_job_id = None
root_submitted_ms = effective_submission_ms
if parent_job_id is not None:
parent = cur.execute(
"SELECT root_submitted_at_ms FROM jobs WHERE job_id = ?",
(parent_job_id,),
).fetchone()
if parent is not None:
root_submitted_ms = int(parent["root_submitted_at_ms"])
# `launch_job` is responsible for rejecting submissions with a
# missing parent; if we reach here the parent row must exist.
if parent is None:
raise ValueError(f"Cannot submit job {job_id}: parent {parent_job_id} is absent from the database")
root_submitted_ms = int(parent["root_submitted_at_ms"])

deadline_epoch_ms: int | None = None
if request.HasField("scheduling_timeout") and request.scheduling_timeout.milliseconds > 0:
Expand Down
15 changes: 15 additions & 0 deletions lib/iris/tests/cluster/controller/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,21 @@ def test_launch_job_rejects_child_of_failed_parent(service, state):
assert "terminated" in exc_info.value.message.lower() or "failed" in exc_info.value.message.lower()


def test_launch_job_rejects_child_of_absent_parent(service):
"""Reject child submissions when the parent row is missing from the DB.

Simulates a controller restart where the checkpoint did not capture the
parent row but running processes keep submitting descendants. Previously
the guard only rejected terminated parents, leaving absent-parent children
inserted with `parent_job_id = NULL` and an orphaned `depth`.
"""
with pytest.raises(ConnectError) as exc_info:
service.launch_job(make_job_request("/test-user/absent-parent/new-child"), None)

assert exc_info.value.code == Code.FAILED_PRECONDITION
assert "absent" in exc_info.value.message.lower() or "not found" in exc_info.value.message.lower()


# =============================================================================
# Job List Tests
# =============================================================================
Expand Down
8 changes: 5 additions & 3 deletions lib/iris/tests/cluster/controller/test_transitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1964,15 +1964,17 @@ def test_requeued_task_maintains_priority_position(state):

worker_id = register_worker(state, "w1", "host:8080", make_worker_metadata())

# Submit a deep job and a shallow job
# Submit a deep job (under an explicit parent tree) and a shallow job
submit_job(state, "tree", make_job_request("tree"), timestamp_ms=500)
submit_job(state, "/test-user/tree/deep", make_job_request("deep"), timestamp_ms=1000)
submit_job(state, "shallow", make_job_request("shallow"), timestamp_ms=2000)

# Initially: deep job comes first
pending = _schedulable_tasks(state)
assert len(pending) == 2
assert len(pending) == 3
assert pending[0].job_id == JobName.from_string("/test-user/tree/deep")
assert pending[1].job_id == JobName.root("test-user", "shallow")
assert pending[1].job_id == JobName.root("test-user", "tree")
assert pending[2].job_id == JobName.root("test-user", "shallow")

# Dispatch and fail the deep job's task (with retries enabled)
deep_req = make_job_request("deep")
Expand Down
36 changes: 25 additions & 11 deletions lib/iris/tests/e2e/test_env_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,40 @@ def dummy_entrypoint():
pass


def _sleep_entrypoint():
import time

time.sleep(300)


@pytest.mark.timeout(60)
def test_child_job_inherits_parent_env(cluster):
"""Child jobs inherit the parent's explicit env vars from JobInfo.env."""
entrypoint = Entrypoint.from_callable(dummy_entrypoint)
resources = ResourceSpec(cpu=1, memory="1g")
parent_env = {"MY_CUSTOM_VAR": "hello", "WANDB_API_KEY": "secret"}

parent_context = IrisContext(
job_id=JobName.root("test-user", "parent-job"),
client=cluster.client,
)
# Submit a long-running parent so the controller has a live row for its
# hierarchy. Child submissions are rejected with FAILED_PRECONDITION when
# the parent row is missing or terminated, so the parent must stay alive
# until the child has been submitted.
parent_job = cluster.client.submit(Entrypoint.from_callable(_sleep_entrypoint), "parent-job", resources)
try:
parent_context = IrisContext(
job_id=parent_job.job_id,
client=cluster.client,
)

with (
iris_ctx_scope(parent_context),
patch("iris.client.client.get_job_info", return_value=_parent_job_info(parent_env)),
):
job = cluster.client.submit(entrypoint, "child-job", resources)
with (
iris_ctx_scope(parent_context),
patch("iris.client.client.get_job_info", return_value=_parent_job_info(parent_env)),
):
job = cluster.client.submit(entrypoint, "child-job", resources)

job.wait(timeout=30)
assert job.job_id == JobName.root("test-user", "parent-job").child("child-job")
job.wait(timeout=30)
assert job.job_id == parent_job.job_id.child("child-job")
finally:
cluster.kill(parent_job)


def _chain_job(output_file: str, child_spec: dict | None = None):
Expand Down
29 changes: 24 additions & 5 deletions lib/zephyr/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ def zephyr_ctx(local_client, tmp_path_factory):
# --- Multi-backend fixtures (integration tests) ---


def _parent_holder_entrypoint():
"""Long-running no-op that keeps the integration-test parent job alive."""
import time

time.sleep(3600)


@pytest.fixture(params=["local", "iris", "ray"], scope="session")
def integration_client(request):
"""Parametrized fixture providing Local, Iris, and Ray clients.
Expand All @@ -108,16 +115,28 @@ def integration_client(request):
client.shutdown(wait=True)
elif request.param == "iris":
from iris.client.client import IrisClient, IrisContext, iris_ctx_scope
from iris.cluster.types import JobName
from iris.cluster.types import Entrypoint, ResourceSpec

iris_cluster = request.getfixturevalue("iris_cluster")
iris_client = IrisClient.remote(iris_cluster, workspace=ZEPHYR_ROOT)
client = FrayIrisClient.from_iris_client(iris_client)

ctx = IrisContext(job_id=JobName.root("test-user", "test"), client=iris_client)
with iris_ctx_scope(ctx):
yield client
client.shutdown(wait=True)
# Submit a long-running parent job so child submissions have a live
# parent row in the controller DB. Absent parents are rejected with
# FAILED_PRECONDITION, so simulating a parent context without a real
# parent no longer works.
parent_job = iris_client.submit(
entrypoint=Entrypoint.from_callable(_parent_holder_entrypoint),
name="test",
resources=ResourceSpec(cpu=1, memory="512m"),
)
try:
ctx = IrisContext(job_id=parent_job.job_id, client=iris_client)
with iris_ctx_scope(ctx):
yield client
finally:
iris_client.terminate(parent_job.job_id)
client.shutdown(wait=True)
elif request.param == "ray":
request.getfixturevalue("ray_cluster")
client = RayClient()
Expand Down
Loading