Skip to content

Commit 9f9f294

Browse files
committed
[iris] Consolidate feasibility + drop worker.attributes region/zone override
Fixes a regression in marin-itest where child jobs submitted by zephyr coordinators were rejected with "no groups in region local" and the integration pipeline hung. The fake GCP provider synthesized region=local on workers while groups derived region from the GCP slice_template zone (europe-west4) — two sources of truth that the new feasibility gate caught disagreeing. Removes the synthetic region entirely. Group region now always derives from slice_template.gcp.zone / coreweave.region; the worker.attributes override path in ScalingGroup.region/zone and the duplicate writes in _expand_tpu_pools / _expand_multi_zone_groups were 100% redundant. _validate_worker_settings now explicitly rejects REGION/ZONE in worker.attributes so stale configs fail loudly. Collapses the feasibility logic that PR 4681 introduced into a single predicate: job_feasibility() + _diagnose() in routing.py, exposed on Autoscaler as one job_feasibility(constraints, replicas=None). service.launch_job calls it once instead of chaining two checks. The two near-identical diagnose functions become one. Net -206 LOC.
1 parent a9109a1 commit 9f9f294

File tree

12 files changed

+162
-368
lines changed

12 files changed

+162
-368
lines changed

lib/iris/examples/coreweave.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ scale_groups:
8383
capacity_type: on-demand
8484
worker:
8585
attributes:
86-
region: US-WEST-04A
8786
pool: cpu-erapids
8887
buffer_slices: 1
8988
max_slices: 4
@@ -106,7 +105,6 @@ scale_groups:
106105
capacity_type: on-demand
107106
worker:
108107
attributes:
109-
region: US-WEST-04A
110108
pool: h100-8x
111109
buffer_slices: 0
112110
max_slices: 8

lib/iris/src/iris/cluster/config.py

Lines changed: 8 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -224,44 +224,26 @@ def _validate_worker_settings(config: config_pb2.IrisClusterConfig) -> None:
224224
WellKnownAttribute.PREEMPTIBLE,
225225
WellKnownAttribute.DEVICE_TYPE,
226226
WellKnownAttribute.DEVICE_VARIANT,
227+
WellKnownAttribute.REGION,
228+
WellKnownAttribute.ZONE,
227229
}
228230
for name, sg_config in config.scale_groups.items():
229231
if not sg_config.HasField("worker"):
230232
continue
231233

232234
attributes = sg_config.worker.attributes
233235

234-
# Reject well-known keys that are now derived from resources
236+
# Reject well-known keys that are now derived from resources or slice_template.
237+
# region/zone are derived from slice_template.gcp.zone; device fields from resources.
235238
for attr_key in _well_known_resource_attrs:
236239
if attr_key in attributes:
237240
raise ValueError(
238-
f"Scale group '{name}': worker.attributes.{attr_key} is now derived from "
239-
f"resources and must not be set explicitly. Remove it from worker.attributes."
240-
)
241-
242-
region = attributes.get(WellKnownAttribute.REGION, "").strip()
243-
if WellKnownAttribute.REGION in attributes and not region:
244-
raise ValueError(f"Scale group '{name}': worker.attributes.region must be non-empty.")
245-
246-
zone_attr = attributes.get(WellKnownAttribute.ZONE, "").strip()
247-
if WellKnownAttribute.ZONE in attributes and not zone_attr:
248-
raise ValueError(f"Scale group '{name}': worker.attributes.zone must be non-empty.")
249-
if zone_attr and sg_config.slice_template.HasField("gcp") and sg_config.slice_template.gcp.zone:
250-
if zone_attr != sg_config.slice_template.gcp.zone:
251-
raise ValueError(
252-
f"Scale group '{name}': worker.attributes.zone={zone_attr!r} must match "
253-
f"slice_template.gcp.zone={sg_config.slice_template.gcp.zone!r}."
241+
f"Scale group '{name}': worker.attributes.{attr_key} is derived automatically "
242+
f"(from resources or slice_template.gcp.zone) and must not be set explicitly. "
243+
f"Remove it from worker.attributes."
254244
)
255245

256246
template = sg_config.slice_template
257-
if region and template.HasField("gcp") and template.gcp.zone:
258-
zone_region = template.gcp.zone.rsplit("-", 1)[0]
259-
if region != zone_region:
260-
raise ValueError(
261-
f"Scale group '{name}': worker.attributes.region={region!r} must match "
262-
f"slice_template.gcp.zone region {zone_region!r}."
263-
)
264-
265247
if (
266248
template.HasField("coreweave")
267249
and sg_config.resources.device_type == config_pb2.ACCELERATOR_TYPE_GPU
@@ -676,7 +658,6 @@ def _expand_tpu_pools(data: dict) -> None:
676658
topo = get_tpu_topology(variant)
677659

678660
for zone in zones:
679-
region = zone.rsplit("-", 1)[0]
680661
sg_name = f"tpu_{pool_name}_{size_int}-{zone}"
681662

682663
if sg_name in scale_groups:
@@ -705,12 +686,6 @@ def _expand_tpu_pools(data: dict) -> None:
705686
"buffer_slices": size_overrides.get("buffer_slices", 0),
706687
"max_slices": size_overrides["max_slices"],
707688
"slice_template": st,
708-
"worker": {
709-
"attributes": {
710-
WellKnownAttribute.ZONE: zone,
711-
WellKnownAttribute.REGION: region,
712-
}
713-
},
714689
}
715690

716691
scale_groups[sg_name] = sg
@@ -737,15 +712,14 @@ def _expand_multi_zone_groups(data: dict) -> None:
737712
creates a copy of the scale group with:
738713
- name suffixed with -{zone} (e.g. tpu_v5e_16-europe-west4-b)
739714
- slice_template.gcp.zone set to the zone
740-
- worker.attributes.zone and worker.attributes.region set automatically
741715
- buffer_slices defaulted to 0 if not explicitly set
742716
743717
Also merges all expanded zones into platform.gcp.zones.
744718
745719
Raises:
746720
ValueError: If zones is not a non-empty list of unique non-empty strings,
747721
if an expanded name collides with an existing scale group, or if
748-
user-provided zone/region fields conflict with the expansion.
722+
slice_template.gcp.zone is set while zones is also specified.
749723
"""
750724
scale_groups = data.get("scale_groups")
751725
if not isinstance(scale_groups, dict):
@@ -785,28 +759,14 @@ def _expand_multi_zone_groups(data: dict) -> None:
785759

786760
# Detect conflicts with user-provided fields that expansion will set
787761
existing_gcp_zone = (sg.get("slice_template") or {}).get("gcp", {}).get("zone")
788-
existing_worker_attrs = (sg.get("worker") or {}).get("attributes", {})
789-
existing_zone_attr = existing_worker_attrs.get(WellKnownAttribute.ZONE)
790-
existing_region_attr = existing_worker_attrs.get(WellKnownAttribute.REGION)
791762

792763
if existing_gcp_zone:
793764
raise ValueError(
794765
f"Scale group '{name}': cannot set both 'zones' and 'slice_template.gcp.zone'. "
795766
f"Remove slice_template.gcp.zone — it is set automatically by zone expansion."
796767
)
797-
if existing_zone_attr:
798-
raise ValueError(
799-
f"Scale group '{name}': cannot set both 'zones' and 'worker.attributes.zone'. "
800-
f"Remove worker.attributes.zone — it is set automatically by zone expansion."
801-
)
802-
if existing_region_attr:
803-
raise ValueError(
804-
f"Scale group '{name}': cannot set both 'zones' and 'worker.attributes.region'. "
805-
f"Remove worker.attributes.region — it is set automatically by zone expansion."
806-
)
807768

808769
for zone in zones:
809-
region = zone.rsplit("-", 1)[0]
810770
expanded_name = f"{name}-{zone}"
811771

812772
if expanded_name in scale_groups:
@@ -826,12 +786,6 @@ def _expand_multi_zone_groups(data: dict) -> None:
826786
gcp = st.setdefault("gcp", {})
827787
gcp["zone"] = zone
828788

829-
# Set worker.attributes.zone and .region
830-
worker = expanded_sg.setdefault("worker", {})
831-
attrs = worker.setdefault("attributes", {})
832-
attrs[WellKnownAttribute.ZONE] = zone
833-
attrs[WellKnownAttribute.REGION] = region
834-
835789
if "buffer_slices" not in expanded_sg:
836790
expanded_sg["buffer_slices"] = 0
837791

lib/iris/src/iris/cluster/controller/autoscaler/routing.py

Lines changed: 83 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from iris.cluster.constraints import (
1515
ConstraintIndex,
1616
DeviceType,
17+
PlacementRequirements,
1718
extract_placement_requirements,
1819
get_device_type_enum,
1920
routing_constraints,
@@ -228,52 +229,6 @@ def _format_variants(variants: frozenset[str] | None) -> str:
228229
return ",".join(sorted(variants))
229230

230231

231-
def _diagnose_no_matching_group(entry: DemandEntry, groups: list[ScalingGroup]) -> str:
232-
"""Produce a concise, actionable reason when no group matches a demand entry."""
233-
234-
normalized = entry.normalized
235-
device_type = normalized.device_type or DeviceType.CPU
236-
device_matches = []
237-
for group in groups:
238-
if group.matches_device_requirement(device_type, normalized.device_variants):
239-
device_matches.append(group)
240-
241-
variants_str = _format_variants(normalized.device_variants)
242-
if not device_matches:
243-
return f"no_matching_group: no groups with device {device_type.value}:{variants_str}"
244-
245-
if normalized.preemptible is not None:
246-
preempt_matches = [
247-
group
248-
for group in device_matches
249-
if (group.config.resources.capacity_type == config_pb2.CAPACITY_TYPE_PREEMPTIBLE) == normalized.preemptible
250-
]
251-
if not preempt_matches:
252-
want = "preemptible" if normalized.preemptible else "non-preemptible"
253-
return f"no_matching_group: no {want} groups for device {device_type.value}:{variants_str}"
254-
device_matches = preempt_matches
255-
256-
if normalized.required_zones:
257-
available_zones = {group.zone for group in device_matches} - {None}
258-
requested = sorted(normalized.required_zones)
259-
message = f"no_matching_group: no groups in zone {', '.join(requested)}"
260-
for requested_zone in requested:
261-
close = difflib.get_close_matches(requested_zone, available_zones, n=1, cutoff=0.7)
262-
if close:
263-
message += f" (did you mean {close[0]}?)"
264-
return message
265-
266-
if normalized.required_regions:
267-
requested = sorted(normalized.required_regions)
268-
region_message = f"no_matching_group: no groups in region {', '.join(requested)}"
269-
return region_message
270-
271-
return (
272-
"no_matching_group: no groups match device="
273-
f"{device_type.value}:{_format_variants(normalized.device_variants)}"
274-
)
275-
276-
277232
# GCP zones end with -{single letter}, e.g. us-central1-a.
278233
_ZONE_PATTERN = re.compile(r".+-[a-z]$")
279234

@@ -282,42 +237,41 @@ def _looks_like_zone(value: str) -> bool:
282237
return bool(_ZONE_PATTERN.fullmatch(value))
283238

284239

285-
def diagnose_unsatisfiable_constraints(
286-
constraints: Sequence[job_pb2.Constraint],
287-
groups: list[ScalingGroup],
240+
def _diagnose(
241+
placement: PlacementRequirements,
242+
groups: Sequence[ScalingGroup],
288243
) -> str:
289-
"""Produce a user-facing error when no scaling group can satisfy constraints.
244+
"""Explain why no scaling group satisfies a placement requirement.
290245
291-
Performs layered diagnosis (device, preemptible, zone, region) and
292-
detects zone/region value confusion.
246+
Layered analysis (device → preemptible → zone → region) with zone/region
247+
confusion heuristics and fuzzy-match hints. Returned string has no prefix;
248+
callers prepend their own (e.g. "no_matching_group: ") when needed.
293249
"""
294-
normalized = extract_placement_requirements(constraints)
295-
device_type = normalized.device_type or DeviceType.CPU
250+
device_type = placement.device_type or DeviceType.CPU
251+
device_matches = [g for g in groups if g.matches_device_requirement(device_type, placement.device_variants)]
252+
variants_str = _format_variants(placement.device_variants)
296253

297-
device_matches = [g for g in groups if g.matches_device_requirement(device_type, normalized.device_variants)]
298-
variants_str = _format_variants(normalized.device_variants)
299254
if not device_matches:
300255
available = ", ".join(g.name for g in groups)
301256
return f"no scaling group provides device {device_type.value}:{variants_str} (available: {available})"
302257

303-
if normalized.preemptible is not None:
258+
if placement.preemptible is not None:
304259
preempt_matches = [
305260
g
306261
for g in device_matches
307-
if (g.config.resources.capacity_type == config_pb2.CAPACITY_TYPE_PREEMPTIBLE) == normalized.preemptible
262+
if (g.config.resources.capacity_type == config_pb2.CAPACITY_TYPE_PREEMPTIBLE) == placement.preemptible
308263
]
309264
if not preempt_matches:
310-
want = "preemptible" if normalized.preemptible else "non-preemptible"
265+
want = "preemptible" if placement.preemptible else "non-preemptible"
311266
return f"no {want} group provides device {device_type.value}:{variants_str}"
312267
device_matches = preempt_matches
313268

314-
if normalized.required_zones:
269+
if placement.required_zones:
315270
available_zones = {g.zone for g in device_matches} - {None}
316271
available_regions = {g.region for g in device_matches} - {None}
317-
requested = sorted(normalized.required_zones)
272+
requested = sorted(placement.required_zones)
318273
parts = [f"no groups in zone {', '.join(requested)}"]
319274
for z in requested:
320-
# Prioritize zone/region confusion over fuzzy match
321275
if not _looks_like_zone(z) and z in available_regions:
322276
parts.append(f"'{z}' looks like a region, not a zone; use a region constraint instead")
323277
else:
@@ -326,13 +280,12 @@ def diagnose_unsatisfiable_constraints(
326280
parts.append(f"did you mean {close[0]}?")
327281
return "; ".join(parts)
328282

329-
if normalized.required_regions:
283+
if placement.required_regions:
330284
available_regions = {g.region for g in device_matches} - {None}
331285
available_zones = {g.zone for g in device_matches} - {None}
332-
requested = sorted(normalized.required_regions)
286+
requested = sorted(placement.required_regions)
333287
parts = [f"no groups in region {', '.join(requested)}"]
334288
for r in requested:
335-
# Prioritize zone/region confusion over fuzzy match
336289
if _looks_like_zone(r) and r in available_zones:
337290
parts.append(f"'{r}' looks like a zone, not a region; use a zone constraint instead")
338291
else:
@@ -345,6 +298,70 @@ def diagnose_unsatisfiable_constraints(
345298
return f"no scaling group matches constraints (available: {available})"
346299

347300

301+
@dataclass(frozen=True)
302+
class GroupFeasibility:
303+
"""Result of the job_feasibility predicate.
304+
305+
`feasible` is the subset of groups whose hard routing constraints match
306+
and (if coscheduled) have a compatible num_vms. Non-empty means the job
307+
can, in principle, be scheduled; an autoscaler tick may still need to
308+
grow a group before capacity appears.
309+
310+
`reason` is populated iff `feasible` is empty, with a user-facing
311+
explanation suitable for rejecting the job at submit time.
312+
"""
313+
314+
feasible: list[ScalingGroup]
315+
reason: str | None
316+
317+
318+
def job_feasibility(
319+
groups: Sequence[ScalingGroup],
320+
constraints: Sequence[job_pb2.Constraint],
321+
replicas: int | None = None,
322+
) -> GroupFeasibility:
323+
"""Answer: can any scaling group ever host this job shape?
324+
325+
Ignores runtime availability (quota, cooldown, in-flight capacity) — that
326+
is the autoscaler's job on each tick. This predicate gates LaunchJob at
327+
submit time so jobs that can never be scheduled fail fast.
328+
329+
Args:
330+
groups: scaling groups to consider.
331+
constraints: the job's hard + soft routing constraints.
332+
replicas: for coscheduled jobs, the required replica count; None for
333+
non-coscheduled jobs. When set, groups must also have num_vms that
334+
divides replicas evenly.
335+
"""
336+
groups_list = list(groups)
337+
if not groups_list:
338+
return GroupFeasibility(feasible=[], reason=None)
339+
340+
group_attrs = {g.name: g.to_attributes() for g in groups_list}
341+
group_index = ConstraintIndex.build(group_attrs)
342+
hard_cs, _ = split_hard_soft(routing_constraints(constraints))
343+
matching_names = group_index.matching_entities(hard_cs)
344+
matching = [g for g in groups_list if g.name in matching_names]
345+
346+
if not matching:
347+
placement = extract_placement_requirements(constraints)
348+
return GroupFeasibility(feasible=[], reason=_diagnose(placement, groups_list))
349+
350+
if replicas is not None:
351+
compatible = [g for g in matching if g.num_vms > 0 and replicas % g.num_vms == 0]
352+
if not compatible:
353+
sizes = {g.name: g.num_vms for g in matching}
354+
reason = (
355+
f"job requires {replicas} coscheduled replicas but no matching scaling group "
356+
f"has a compatible size (replicas must be an exact multiple of num_vms); "
357+
f"matching group sizes: {sizes}"
358+
)
359+
return GroupFeasibility(feasible=[], reason=reason)
360+
matching = compatible
361+
362+
return GroupFeasibility(feasible=matching, reason=None)
363+
364+
348365
def _diagnose_no_capacity(
349366
entry: DemandEntry,
350367
matching_groups: list[ScalingGroup],
@@ -493,7 +510,7 @@ def route_demand(
493510
reason = (
494511
f"tier_blocked: {pre_tier_count} matching group(s) blocked by quota-pool tier monotonicity"
495512
if pre_tier_count > 0
496-
else _diagnose_no_matching_group(entry, sorted_groups)
513+
else f"no_matching_group: {_diagnose(entry.normalized, sorted_groups)}"
497514
)
498515
unmet.append(UnmetDemand(entry=entry, reason=reason))
499516
continue

0 commit comments

Comments
 (0)