Skip to content

Commit 854383c

Browse files
yoblinclaude
andcommitted
[iris] Address review: bump node overcommit, delete pods by name
- Bump DIRECT_PROVIDER_NODE_OVERCOMMIT from 2 to 16 to match the worker provider's tasks-per-worker ratio. - Replace delete_by_labels with delete_many: deletes pods by name instead of label selector. Simpler, more precise, no new protocol method needed beyond what kubectl natively supports. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 660ee82 commit 854383c

5 files changed

Lines changed: 58 additions & 75 deletions

File tree

lib/iris/src/iris/cluster/controller/transitions.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,12 @@ class ReservationClaim:
8484
DIRECT_PROVIDER_BOOTSTRAP_BATCH = 64
8585
"""Max tasks promoted per sync cycle when no capacity info is available yet."""
8686

87-
DIRECT_PROVIDER_NODE_OVERCOMMIT = 2
88-
"""Pods per schedulable node allowed for the direct provider scheduler."""
87+
DIRECT_PROVIDER_NODE_OVERCOMMIT = 16
88+
"""Pods per schedulable node allowed for the direct provider scheduler.
89+
90+
Matches the worker provider's tasks-per-worker ratio so that the direct
91+
provider can keep a similar number of tasks in-flight relative to cluster size.
92+
"""
8993

9094

9195
@dataclass(frozen=True)

lib/iris/src/iris/cluster/providers/k8s/fake.py

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -688,28 +688,10 @@ def delete(
688688
if normalized == "pod":
689689
self._release_pod_resources(name)
690690

691-
def delete_by_labels(
692-
self,
693-
resource: str,
694-
labels: dict[str, str],
695-
*,
696-
cluster_scoped: bool = False,
697-
wait: bool = False,
698-
) -> int:
699-
self._check_failure("delete_by_labels")
700-
normalized = _normalize_resource(resource)
701-
to_delete = []
702-
for (kind, name), manifest in self._resources.items():
703-
if kind != normalized:
704-
continue
705-
res_labels = manifest.get("metadata", {}).get("labels", {})
706-
if all(res_labels.get(k) == v for k, v in labels.items()):
707-
to_delete.append(name)
708-
for name in to_delete:
709-
self._resources.pop((normalized, name), None)
710-
if normalized == "pod":
711-
self._release_pod_resources(name)
712-
return len(to_delete)
691+
def delete_many(self, resource: str, names: list[str], *, cluster_scoped: bool = False, wait: bool = False) -> None:
692+
"""Delete multiple resources by name."""
693+
for name in names:
694+
self.delete(resource, name)
713695

714696
def logs(self, pod_name: str, *, container: str | None = None, tail: int = 50, previous: bool = False) -> str:
715697
self._check_failure("logs")

lib/iris/src/iris/cluster/providers/k8s/service.py

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,8 @@ def delete(
5757
self, resource: str, name: str, *, cluster_scoped: bool = False, force: bool = False, wait: bool = True
5858
) -> None: ...
5959

60-
def delete_by_labels(
61-
self,
62-
resource: str,
63-
labels: dict[str, str],
64-
*,
65-
cluster_scoped: bool = False,
66-
wait: bool = False,
67-
) -> int:
68-
"""Delete all resources matching label selector. Returns count deleted."""
60+
def delete_many(self, resource: str, names: list[str], *, cluster_scoped: bool = False, wait: bool = False) -> None:
61+
"""Delete multiple resources by name in a single kubectl call."""
6962
...
7063

7164
def logs(self, pod_name: str, *, container: str | None = None, tail: int = 50, previous: bool = False) -> str: ...
@@ -240,24 +233,16 @@ def delete(
240233
if result.returncode != 0:
241234
raise KubectlError(f"kubectl delete {resource}/{name} failed: {result.stderr.strip()}")
242235

243-
def delete_by_labels(
244-
self,
245-
resource: str,
246-
labels: dict[str, str],
247-
*,
248-
cluster_scoped: bool = False,
249-
wait: bool = False,
250-
) -> int:
251-
"""Delete all resources matching label selector in a single kubectl call."""
252-
selector = ",".join(f"{k}={v}" for k, v in labels.items())
253-
args = ["delete", resource, "-l", selector, "--ignore-not-found"]
236+
def delete_many(self, resource: str, names: list[str], *, cluster_scoped: bool = False, wait: bool = False) -> None:
237+
"""Delete multiple resources by name in a single kubectl call."""
238+
if not names:
239+
return
240+
args = ["delete", resource, *names, "--ignore-not-found"]
254241
if not wait:
255242
args.append("--wait=false")
256243
result = self._run(args, namespaced=not cluster_scoped)
257244
if result.returncode != 0:
258-
raise KubectlError(f"kubectl delete {resource} -l {selector} failed: {result.stderr.strip()}")
259-
# kubectl prints one "deleted" line per resource
260-
return result.stdout.strip().count("deleted")
245+
raise KubectlError(f"kubectl delete {resource} failed: {result.stderr.strip()}")
261246

262247
def set_image(self, resource: str, name: str, container: str, image: str, *, namespaced: bool = False) -> None:
263248
"""Set the container image on a resource via ``kubectl set image``."""

lib/iris/src/iris/cluster/providers/k8s/tasks.py

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -960,29 +960,41 @@ def _apply_pod(self, run_req: cluster_pb2.Worker.RunTaskRequest) -> None:
960960
)
961961

962962
def _bulk_delete_task_pods(self, task_ids: list[str]) -> None:
963-
"""Delete pods and configmaps for multiple tasks via one kubectl call per job."""
964-
# Group by job_id (task_id format: /{user}/{job}/{task_index})
965-
jobs: dict[str, list[str]] = {}
966-
for task_id in task_ids:
967-
job_id, _, _ = task_id.rpartition("/")
968-
assert job_id, f"malformed task_id (expected /<user>/<job>/<index>): {task_id}"
969-
jobs.setdefault(job_id, []).append(task_id)
970-
971-
for job_id, job_task_ids in jobs.items():
972-
labels = {
963+
"""Delete pods and configmaps for multiple tasks in one kubectl call per resource type."""
964+
task_hashes = {_task_hash(tid) for tid in task_ids}
965+
966+
all_pods = self.kubectl.list_json(
967+
"pods",
968+
labels={
973969
_LABEL_MANAGED: "true",
974970
_LABEL_RUNTIME: _RUNTIME_LABEL_VALUE,
975-
_LABEL_JOB_ID: _sanitize_label_value(job_id),
976-
}
977-
pod_count = self.kubectl.delete_by_labels("pods", labels, wait=False)
978-
cm_count = self.kubectl.delete_by_labels("configmaps", labels, wait=False)
979-
logger.info(
980-
"Bulk deleted %d pods, %d configmaps for job %s (%d tasks)",
981-
pod_count,
982-
cm_count,
983-
job_id,
984-
len(job_task_ids),
985-
)
971+
},
972+
)
973+
pod_names = [
974+
p["metadata"]["name"]
975+
for p in all_pods
976+
if p.get("metadata", {}).get("labels", {}).get(_LABEL_TASK_HASH) in task_hashes
977+
]
978+
979+
all_cms = self.kubectl.list_json(
980+
"configmaps",
981+
labels={
982+
_LABEL_MANAGED: "true",
983+
_LABEL_RUNTIME: _RUNTIME_LABEL_VALUE,
984+
},
985+
)
986+
cm_names = [
987+
c["metadata"]["name"]
988+
for c in all_cms
989+
if c.get("metadata", {}).get("labels", {}).get(_LABEL_TASK_HASH) in task_hashes
990+
]
991+
992+
if pod_names:
993+
self.kubectl.delete_many("pods", pod_names, wait=False)
994+
if cm_names:
995+
self.kubectl.delete_many("configmaps", cm_names, wait=False)
996+
997+
logger.info("Deleted %d pods, %d configmaps for %d tasks", len(pod_names), len(cm_names), len(task_ids))
986998

987999
def _delete_pods_by_task_id(self, task_id: str) -> None:
9881000
"""Delete all pods for a given task_id (any attempt).

lib/iris/tests/cluster/controller/test_transitions.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3259,18 +3259,18 @@ def test_drain_caps_promotions_and_refills_after_completion(state):
32593259
def test_drain_capacity_limits_promotions(state):
32603260
"""With capacity, budget is schedulable_nodes * OVERCOMMIT; active tasks reduce it."""
32613261
capacity = ClusterCapacity(
3262-
schedulable_nodes=100,
3263-
total_cpu_millicores=400000,
3264-
available_cpu_millicores=200000,
3265-
total_memory_bytes=800 * 1024**3,
3266-
available_memory_bytes=400 * 1024**3,
3262+
schedulable_nodes=10,
3263+
total_cpu_millicores=40000,
3264+
available_cpu_millicores=20000,
3265+
total_memory_bytes=80 * 1024**3,
3266+
available_memory_bytes=40 * 1024**3,
32673267
)
32683268
_submit_job_direct(state, "/user/cap-job", replicas=250)
32693269

32703270
batch1 = state.drain_for_direct_provider(capacity=capacity)
3271-
assert len(batch1.tasks_to_run) == 200 # 100 nodes * 2
3271+
assert len(batch1.tasks_to_run) == 160 # 10 nodes * 16
32723272

3273-
# 200 active → budget exhausted.
3273+
# 160 active → budget exhausted.
32743274
assert len(state.drain_for_direct_provider(capacity=capacity).tasks_to_run) == 0
32753275

32763276

0 commit comments

Comments
 (0)