diff --git a/lib/iris/src/iris/cli/job.py b/lib/iris/src/iris/cli/job.py index 1f2e1da4e8..c7d602ab10 100644 --- a/lib/iris/src/iris/cli/job.py +++ b/lib/iris/src/iris/cli/job.py @@ -30,6 +30,7 @@ WellKnownAttribute, device_variant_constraint, infer_preemptible_constraint, + preemptible_constraint, region_constraint, zone_constraint, ) @@ -532,6 +533,41 @@ def resolve_multinode_defaults( return replicas, coscheduling +def build_job_constraints( + resources_proto: job_pb2.ResourceSpecProto, + tpu_variants: list[str], + replicas: int, + regions: tuple[str, ...] | None = None, + zone: str | None = None, + preemptible: bool | None = None, +) -> list[Constraint]: + """Assemble the constraint list for a submitted job. + + An explicit ``preemptible`` value wins over the executor heuristic: + ``infer_preemptible_constraint`` short-circuits when any preemptible + constraint is already present, so we append the user's choice first. + """ + constraints: list[Constraint] = [] + if regions: + constraints.append(region_constraint(list(regions))) + if zone: + constraints.append(zone_constraint(zone)) + if len(tpu_variants) > 1: + constraints.append(device_variant_constraint(tpu_variants)) + if preemptible is not None: + constraints.append(preemptible_constraint(preemptible)) + + # Executor heuristic: small CPU-only CLI jobs (no accelerators, 1 replica, + # CPU ≤ 0.5 cores, RAM ≤ 4 GiB) are auto-tagged as non-preemptible so + # coordinators survive spot reclamation. Skipped when the user supplied + # --preemptible / --no-preemptible. + inferred = infer_preemptible_constraint(resources_proto, replicas, constraints) + if inferred is not None: + constraints.append(inferred) + logger.info("Executor heuristic: auto-tagging job as non-preemptible") + return constraints + + def run_iris_job( command: list[str], env_vars: dict[str, str], @@ -553,6 +589,7 @@ def run_iris_job( user: str | None = None, reserve: tuple[str, ...] | None = None, priority: str | None = None, + preemptible: bool | None = None, token_provider: TokenProvider | None = None, submit_argv: list[str] | None = None, ) -> int: @@ -565,6 +602,8 @@ def run_iris_job( regions: If provided, restrict the job to workers in these regions. zone: If provided, restrict the job to workers in this zone. reserve: Reservation specs (e.g., ("4:H100x8", "v5litepod-16")). + preemptible: If True/False, force scheduling on (non-)preemptible workers + and bypass the executor heuristic. If None (default), the heuristic runs. Returns: Exit code: 0 for success, 1 for failure @@ -579,22 +618,15 @@ def run_iris_job( replicas, coscheduling = resolve_multinode_defaults(primary_tpu, gpu, replicas) - constraints: list[Constraint] = [] - if regions: - constraints.append(region_constraint(list(regions))) - if zone: - constraints.append(zone_constraint(zone)) - if len(tpu_variants) > 1: - constraints.append(device_variant_constraint(tpu_variants)) - - # Executor heuristic: small CPU-only CLI jobs (no accelerators, 1 replica, - # CPU ≤ 0.5 cores, RAM ≤ 4 GiB) are auto-tagged as non-preemptible so - # coordinators survive spot reclamation. resources_proto = resources.to_proto() - preemptible = infer_preemptible_constraint(resources_proto, replicas, constraints) - if preemptible is not None: - constraints.append(preemptible) - logger.info("Executor heuristic: auto-tagging job as non-preemptible") + constraints = build_job_constraints( + resources_proto=resources_proto, + tpu_variants=tpu_variants, + replicas=replicas, + regions=regions, + zone=zone, + preemptible=preemptible, + ) reservation: list[ReservationEntry] | None = None if reserve: @@ -632,6 +664,8 @@ def run_iris_job( logger.info(f"Region constraint: {', '.join(regions)}") if zone: logger.info(f"Zone constraint: {zone}") + if preemptible is not None: + logger.info(f"Preemptible constraint: {preemptible}") if reservation: logger.info(f"Reservation: {len(reservation)} entries") @@ -831,6 +865,16 @@ def job() -> None: default=None, help="Priority band for scheduling (default: interactive). Lower bands run first; batch jobs yield to interactive.", ) +@click.option( + "--preemptible/--no-preemptible", + "preemptible", + default=None, + help=( + "Force scheduling on preemptible (--preemptible) or non-preemptible " + "(--no-preemptible) workers. Overrides the executor heuristic. " + "Default: heuristic-based (small CPU-only jobs pinned to non-preemptible)." + ), +) @click.option( "--terminate-on-exit/--no-terminate-on-exit", default=True, @@ -858,6 +902,7 @@ def run( extra: tuple[str, ...], reserve: tuple[str, ...], priority: str | None, + preemptible: bool | None, terminate_on_exit: bool, cmd: tuple[str, ...], ): @@ -907,6 +952,7 @@ def run( zone=zone, reserve=reserve or None, priority=priority, + preemptible=preemptible, token_provider=ctx.obj.get("token_provider"), submit_argv=submit_argv, ) diff --git a/lib/iris/tests/cli/test_job.py b/lib/iris/tests/cli/test_job.py index 7ce01f42b9..4686ae3f32 100644 --- a/lib/iris/tests/cli/test_job.py +++ b/lib/iris/tests/cli/test_job.py @@ -9,6 +9,7 @@ from iris.cli.job import ( _parse_tpu_alternatives, _render_job_summary_text, + build_job_constraints, build_job_summary, build_resources, build_tpu_alternatives, @@ -138,6 +139,52 @@ def test_executor_heuristic_with_region_constraint(): assert preemptible.values[0].value == "false" +# --------------------------------------------------------------------------- +# build_job_constraints — --preemptible / --no-preemptible wiring (#4540) +# --------------------------------------------------------------------------- + + +def _preemptible_values(constraints: list[Constraint]) -> list[str]: + return [c.values[0].value for c in constraints if c.key == WellKnownAttribute.PREEMPTIBLE] + + +def test_build_job_constraints_preemptible_true_emits_true_constraint(): + """--preemptible forces a preemptible=true constraint and bypasses the heuristic.""" + resources_proto = build_resources(tpu=None, gpu=None, cpu=0.5, memory="1GB", disk="5GB").to_proto() + + constraints = build_job_constraints(resources_proto, tpu_variants=[], replicas=1, preemptible=True) + + assert _preemptible_values(constraints) == ["true"] + + +def test_build_job_constraints_preemptible_false_emits_false_constraint(): + """--no-preemptible forces a preemptible=false constraint even for non-executor jobs.""" + resources_proto = build_resources(tpu=None, gpu=None, cpu=4.0, memory="16GB", disk="5GB").to_proto() + + constraints = build_job_constraints(resources_proto, tpu_variants=[], replicas=1, preemptible=False) + + assert _preemptible_values(constraints) == ["false"] + + +def test_build_job_constraints_preemptible_none_runs_heuristic(): + """Default (None) preserves the executor heuristic on small CPU jobs.""" + resources_proto = build_resources(tpu=None, gpu=None, cpu=0.5, memory="1GB", disk="5GB").to_proto() + + constraints = build_job_constraints(resources_proto, tpu_variants=[], replicas=1, preemptible=None) + + assert _preemptible_values(constraints) == ["false"] + + +def test_build_job_constraints_preemptible_true_overrides_heuristic(): + """Small CPU jobs normally auto-tag non-preemptible; --preemptible wins.""" + resources_proto = build_resources(tpu=None, gpu=None, cpu=0.5, memory="1GB", disk="5GB").to_proto() + + constraints = build_job_constraints(resources_proto, tpu_variants=[], replicas=1, preemptible=True) + + # Exactly one preemptible constraint, and it reflects the user's choice. + assert _preemptible_values(constraints) == ["true"] + + # --tpu multi-variant parsing # ---------------------------------------------------------------------------