Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 61 additions & 15 deletions lib/iris/src/iris/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
WellKnownAttribute,
device_variant_constraint,
infer_preemptible_constraint,
preemptible_constraint,
region_constraint,
zone_constraint,
)
Expand Down Expand Up @@ -532,6 +533,41 @@ def resolve_multinode_defaults(
return replicas, coscheduling


def build_job_constraints(
resources_proto: job_pb2.ResourceSpecProto,
tpu_variants: list[str],
replicas: int,
regions: tuple[str, ...] | None = None,
zone: str | None = None,
preemptible: bool | None = None,
) -> list[Constraint]:
"""Assemble the constraint list for a submitted job.

An explicit ``preemptible`` value wins over the executor heuristic:
``infer_preemptible_constraint`` short-circuits when any preemptible
constraint is already present, so we append the user's choice first.
"""
constraints: list[Constraint] = []
if regions:
constraints.append(region_constraint(list(regions)))
if zone:
constraints.append(zone_constraint(zone))
if len(tpu_variants) > 1:
constraints.append(device_variant_constraint(tpu_variants))
if preemptible is not None:
constraints.append(preemptible_constraint(preemptible))
Comment thread
rjpower marked this conversation as resolved.

# Executor heuristic: small CPU-only CLI jobs (no accelerators, 1 replica,
# CPU ≤ 0.5 cores, RAM ≤ 4 GiB) are auto-tagged as non-preemptible so
# coordinators survive spot reclamation. Skipped when the user supplied
# --preemptible / --no-preemptible.
inferred = infer_preemptible_constraint(resources_proto, replicas, constraints)
if inferred is not None:
constraints.append(inferred)
logger.info("Executor heuristic: auto-tagging job as non-preemptible")
return constraints


def run_iris_job(
command: list[str],
env_vars: dict[str, str],
Expand All @@ -553,6 +589,7 @@ def run_iris_job(
user: str | None = None,
reserve: tuple[str, ...] | None = None,
priority: str | None = None,
preemptible: bool | None = None,
token_provider: TokenProvider | None = None,
submit_argv: list[str] | None = None,
) -> int:
Expand All @@ -565,6 +602,8 @@ def run_iris_job(
regions: If provided, restrict the job to workers in these regions.
zone: If provided, restrict the job to workers in this zone.
reserve: Reservation specs (e.g., ("4:H100x8", "v5litepod-16")).
preemptible: If True/False, force scheduling on (non-)preemptible workers
and bypass the executor heuristic. If None (default), the heuristic runs.

Returns:
Exit code: 0 for success, 1 for failure
Expand All @@ -579,22 +618,15 @@ def run_iris_job(

replicas, coscheduling = resolve_multinode_defaults(primary_tpu, gpu, replicas)

constraints: list[Constraint] = []
if regions:
constraints.append(region_constraint(list(regions)))
if zone:
constraints.append(zone_constraint(zone))
if len(tpu_variants) > 1:
constraints.append(device_variant_constraint(tpu_variants))

# Executor heuristic: small CPU-only CLI jobs (no accelerators, 1 replica,
# CPU ≤ 0.5 cores, RAM ≤ 4 GiB) are auto-tagged as non-preemptible so
# coordinators survive spot reclamation.
resources_proto = resources.to_proto()
preemptible = infer_preemptible_constraint(resources_proto, replicas, constraints)
if preemptible is not None:
constraints.append(preemptible)
logger.info("Executor heuristic: auto-tagging job as non-preemptible")
constraints = build_job_constraints(
resources_proto=resources_proto,
tpu_variants=tpu_variants,
replicas=replicas,
regions=regions,
zone=zone,
preemptible=preemptible,
)

reservation: list[ReservationEntry] | None = None
if reserve:
Expand Down Expand Up @@ -632,6 +664,8 @@ def run_iris_job(
logger.info(f"Region constraint: {', '.join(regions)}")
if zone:
logger.info(f"Zone constraint: {zone}")
if preemptible is not None:
logger.info(f"Preemptible constraint: {preemptible}")
if reservation:
logger.info(f"Reservation: {len(reservation)} entries")

Expand Down Expand Up @@ -831,6 +865,16 @@ def job() -> None:
default=None,
help="Priority band for scheduling (default: interactive). Lower bands run first; batch jobs yield to interactive.",
)
@click.option(
"--preemptible/--no-preemptible",
"preemptible",
default=None,
help=(
"Force scheduling on preemptible (--preemptible) or non-preemptible "
"(--no-preemptible) workers. Overrides the executor heuristic. "
"Default: heuristic-based (small CPU-only jobs pinned to non-preemptible)."
),
)
@click.option(
"--terminate-on-exit/--no-terminate-on-exit",
default=True,
Expand Down Expand Up @@ -858,6 +902,7 @@ def run(
extra: tuple[str, ...],
reserve: tuple[str, ...],
priority: str | None,
preemptible: bool | None,
terminate_on_exit: bool,
cmd: tuple[str, ...],
):
Expand Down Expand Up @@ -907,6 +952,7 @@ def run(
zone=zone,
reserve=reserve or None,
priority=priority,
preemptible=preemptible,
token_provider=ctx.obj.get("token_provider"),
submit_argv=submit_argv,
)
Expand Down
47 changes: 47 additions & 0 deletions lib/iris/tests/cli/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from iris.cli.job import (
_parse_tpu_alternatives,
_render_job_summary_text,
build_job_constraints,
build_job_summary,
build_resources,
build_tpu_alternatives,
Expand Down Expand Up @@ -138,6 +139,52 @@ def test_executor_heuristic_with_region_constraint():
assert preemptible.values[0].value == "false"


# ---------------------------------------------------------------------------
# build_job_constraints — --preemptible / --no-preemptible wiring (#4540)
# ---------------------------------------------------------------------------


def _preemptible_values(constraints: list[Constraint]) -> list[str]:
return [c.values[0].value for c in constraints if c.key == WellKnownAttribute.PREEMPTIBLE]


def test_build_job_constraints_preemptible_true_emits_true_constraint():
"""--preemptible forces a preemptible=true constraint and bypasses the heuristic."""
resources_proto = build_resources(tpu=None, gpu=None, cpu=0.5, memory="1GB", disk="5GB").to_proto()

constraints = build_job_constraints(resources_proto, tpu_variants=[], replicas=1, preemptible=True)

assert _preemptible_values(constraints) == ["true"]


def test_build_job_constraints_preemptible_false_emits_false_constraint():
"""--no-preemptible forces a preemptible=false constraint even for non-executor jobs."""
resources_proto = build_resources(tpu=None, gpu=None, cpu=4.0, memory="16GB", disk="5GB").to_proto()

constraints = build_job_constraints(resources_proto, tpu_variants=[], replicas=1, preemptible=False)

assert _preemptible_values(constraints) == ["false"]


def test_build_job_constraints_preemptible_none_runs_heuristic():
"""Default (None) preserves the executor heuristic on small CPU jobs."""
resources_proto = build_resources(tpu=None, gpu=None, cpu=0.5, memory="1GB", disk="5GB").to_proto()

constraints = build_job_constraints(resources_proto, tpu_variants=[], replicas=1, preemptible=None)

assert _preemptible_values(constraints) == ["false"]


def test_build_job_constraints_preemptible_true_overrides_heuristic():
"""Small CPU jobs normally auto-tag non-preemptible; --preemptible wins."""
resources_proto = build_resources(tpu=None, gpu=None, cpu=0.5, memory="1GB", disk="5GB").to_proto()

constraints = build_job_constraints(resources_proto, tpu_variants=[], replicas=1, preemptible=True)

# Exactly one preemptible constraint, and it reflects the user's choice.
assert _preemptible_values(constraints) == ["true"]


# --tpu multi-variant parsing
# ---------------------------------------------------------------------------

Expand Down
Loading