Skip to content

Commit 81ed3a5

Browse files
committed
[iris] k8s: bulk pod-metrics query to cut controller load
The k8s ResourceCollector polled one pod's metrics at a time: every tick it fanned out top_pod(pod_name) -> get_namespaced_custom_object across a 32-thread pool, so an N-pod cluster issued N metrics-API requests per tick. At ~1000 pods this is the dominant controller load. Replace top_pod with a bulk top_pods(): a single list_namespaced_custom_object on metrics.k8s.io (PodMetricsList), scoped to the managed-pod label selector, returning pod_name -> usage for the whole namespace in one request. The collector now does one API call per tick regardless of pod count, looks up each tracked pod in the result, and writes the rows in a single batched Table.write. The per-pod thread pool is gone. Also align the resource poll cadence with the metrics-server scrape resolution: the collector ran every 5s, but metrics-server only refreshes samples ~every 15s, so faster polling re-read identical values. Add a resource_poll_interval (default 15s) separate from log_poll_interval. Net effect at 1000 pods: from 1000 requests / 5s to 1 request / 15s.
1 parent 3c5199d commit 81ed3a5

5 files changed

Lines changed: 175 additions & 87 deletions

File tree

lib/iris/src/iris/cluster/backends/k8s/fake.py

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -275,12 +275,14 @@ def __init__(
275275
)
276276
self._resources: dict[tuple[str, str], dict] = {} # (kind, name) -> manifest
277277
self._injected_failures: dict[str, Exception] = {}
278+
self._persistent_failures: dict[str, Exception] = {}
278279
self._logs: dict[str, str] = {} # pod_name -> log text
279280
self._events: list[dict] = []
280281
self._exec_responses: dict[str, list[ExecResult]] = {}
281282
self._file_contents: dict[tuple[str, str], bytes] = {} # (pod_name, path) -> data
282283
self._rm_files_calls: list[tuple[str, list[str]]] = []
283284
self._top_pod_overrides: dict[str, PodResourceUsage | None] = {}
285+
self.top_pods_call_count = 0
284286
self._log_watermarks: dict[str, int] = {} # pod_name -> bytes consumed
285287

286288
# Pods living outside the service's own namespace, keyed by
@@ -472,12 +474,21 @@ def close(self) -> None:
472474

473475
# -- Failure injection --
474476

475-
def inject_failure(self, operation: str, error: Exception) -> None:
476-
"""Inject a one-shot failure for the next call to *operation*."""
477-
self._injected_failures[operation] = error
477+
def inject_failure(self, operation: str, error: Exception, *, persistent: bool = False) -> None:
478+
"""Inject a failure for *operation*.
479+
480+
One-shot by default (consumed by the next call); ``persistent=True``
481+
raises on every call until cleared — needed for operations a background
482+
loop retries on its own cadence.
483+
"""
484+
if persistent:
485+
self._persistent_failures[operation] = error
486+
else:
487+
self._injected_failures[operation] = error
478488

479489
def clear_failure(self, operation: str) -> None:
480490
self._injected_failures.pop(operation, None)
491+
self._persistent_failures.pop(operation, None)
481492

482493
# -- Node pool management --
483494

@@ -610,7 +621,7 @@ def set_file_content(self, pod_name: str, path: str, data: bytes) -> None:
610621
self._file_contents[(pod_name, path)] = data
611622

612623
def set_top_pod(self, pod_name: str, result: PodResourceUsage | None) -> None:
613-
"""Configure a specific top_pod result for a pod."""
624+
"""Configure a pod's reported resource usage (None = metrics absent)."""
614625
self._top_pod_overrides[pod_name] = result
615626

616627
def seed_resource(self, resource: K8sResource, name: str, manifest: dict) -> None:
@@ -628,6 +639,8 @@ def seed_namespaced_pod(self, namespace: str, name: str, manifest: dict) -> None
628639
# -- Protocol methods --
629640

630641
def _check_failure(self, operation: str) -> None:
642+
if err := self._persistent_failures.get(operation):
643+
raise err
631644
if err := self._injected_failures.pop(operation, None):
632645
raise err
633646

@@ -852,13 +865,27 @@ def get_events(self, field_selector: str | None = None) -> list[dict]:
852865
results.append(event)
853866
return results
854867

855-
def top_pod(self, pod_name: str) -> PodResourceUsage | None:
856-
self._check_failure("top_pod")
857-
if pod_name in self._top_pod_overrides:
858-
return self._top_pod_overrides[pod_name]
859-
if any(name == pod_name for (_, name) in self._resources):
860-
return PodResourceUsage(cpu_millicores=100, memory_bytes=256 * 1024 * 1024)
861-
return None
868+
def top_pods(self, *, labels: dict[str, str] | None = None) -> dict[str, PodResourceUsage]:
869+
self._check_failure("top_pods")
870+
self.top_pods_call_count += 1
871+
plural = K8sResource.PODS.plural
872+
usage: dict[str, PodResourceUsage] = {}
873+
for (stored_plural, name), manifest in self._resources.items():
874+
if stored_plural != plural:
875+
continue
876+
if labels:
877+
res_labels = manifest.get("metadata", {}).get("labels", {})
878+
if not all(res_labels.get(k) == v for k, v in labels.items()):
879+
continue
880+
usage[name] = PodResourceUsage(cpu_millicores=100, memory_bytes=256 * 1024 * 1024)
881+
# Per-pod overrides win regardless of the label scope; a None override
882+
# means "metrics absent" and drops the pod from the result.
883+
for name, override in self._top_pod_overrides.items():
884+
if override is None:
885+
usage.pop(name, None)
886+
else:
887+
usage[name] = override
888+
return usage
862889

863890
def read_file(
864891
self,

lib/iris/src/iris/cluster/backends/k8s/service.py

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def get_events(
134134
field_selector: str | None = None,
135135
) -> list[dict]: ...
136136

137-
def top_pod(self, pod_name: str) -> PodResourceUsage | None: ...
137+
def top_pods(self, *, labels: dict[str, str] | None = None) -> dict[str, PodResourceUsage]: ...
138138

139139
def read_file(
140140
self,
@@ -684,39 +684,53 @@ def rm_files(self, pod_name: str, paths: list[str], *, container: str | None = N
684684
"""Remove files inside a Pod container. Ignores missing files."""
685685
self.exec(pod_name, ["rm", "-f", *paths], container=container, timeout=10)
686686

687-
# -- top_pod -------------------------------------------------------------
687+
# -- top_pods ------------------------------------------------------------
688688

689-
def top_pod(self, pod_name: str) -> PodResourceUsage | None:
690-
"""Get CPU/memory usage for a pod via metrics.k8s.io API."""
691-
logger.info("k8s: top_pod %s", pod_name)
692-
with slow_log(logger, f"top_pod {pod_name}", threshold_ms=_SLOW_THRESHOLD_MS):
689+
def top_pods(self, *, labels: dict[str, str] | None = None) -> dict[str, PodResourceUsage]:
690+
"""Bulk pod CPU/memory usage via a single metrics.k8s.io list call.
691+
692+
Lists ``PodMetrics`` for the namespace (optionally scoped by ``labels``)
693+
in one request and returns a ``pod_name -> PodResourceUsage`` map. One
694+
request covers every pod, so resource collection over N pods costs a
695+
single API round-trip instead of N per-pod GETs.
696+
697+
A 404 means the metrics API is unavailable (metrics-server absent);
698+
returns an empty map rather than raising so collection degrades quietly.
699+
"""
700+
logger.info("k8s: top_pods labels=%s", labels)
701+
kwargs = self._request_timeout_kwargs()
702+
if labels:
703+
kwargs["label_selector"] = _label_selector(labels)
704+
with slow_log(logger, "top_pods", threshold_ms=_SLOW_THRESHOLD_MS):
693705
try:
694-
result = self._custom.get_namespaced_custom_object(
706+
result = self._custom.list_namespaced_custom_object(
695707
group="metrics.k8s.io",
696708
version="v1beta1",
697709
namespace=self.namespace,
698710
plural="pods",
699-
name=pod_name,
700-
**self._request_timeout_kwargs(),
711+
**kwargs,
701712
)
702713
except ApiException as e:
703714
if e.status == 404:
704-
return None
715+
return {}
705716
raise
706717

707-
containers = result.get("containers", [])
708-
if not containers:
709-
return None
710-
711-
total_cpu = 0
712-
total_mem = 0
713-
for c in containers:
714-
usage = c.get("usage", {})
715-
if "cpu" in usage:
716-
total_cpu += parse_k8s_cpu(usage["cpu"])
717-
if "memory" in usage:
718-
total_mem += parse_k8s_quantity(usage["memory"])
719-
return PodResourceUsage(cpu_millicores=total_cpu, memory_bytes=total_mem)
718+
usage_by_pod: dict[str, PodResourceUsage] = {}
719+
for item in result.get("items", []):
720+
name = item.get("metadata", {}).get("name", "")
721+
containers = item.get("containers", [])
722+
if not name or not containers:
723+
continue
724+
total_cpu = 0
725+
total_mem = 0
726+
for c in containers:
727+
usage = c.get("usage", {})
728+
if "cpu" in usage:
729+
total_cpu += parse_k8s_cpu(usage["cpu"])
730+
if "memory" in usage:
731+
total_mem += parse_k8s_quantity(usage["memory"])
732+
usage_by_pod[name] = PodResourceUsage(cpu_millicores=total_cpu, memory_bytes=total_mem)
733+
return usage_by_pod
720734

721735
# -- port_forward (subprocess-based) -------------------------------------
722736

lib/iris/src/iris/cluster/backends/k8s/tasks.py

Lines changed: 62 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from __future__ import annotations
1111

1212
import base64
13-
import concurrent.futures
1413
import hashlib
1514
import json
1615
import logging
@@ -69,7 +68,7 @@
6968
wrap_with_kill_watchdog,
7069
)
7170
from iris.cluster.types import JobName, TaskAttempt, WorkerId, get_gpu_count
72-
from iris.cluster.worker.stats import build_task_stat
71+
from iris.cluster.worker.stats import IrisTaskStat, build_task_stat
7372
from iris.rpc import controller_pb2, job_pb2, worker_pb2
7473
from iris.time_proto import timestamp_to_proto
7574

@@ -1138,22 +1137,35 @@ class ResourceCollector:
11381137
"""Background resource usage collector that writes to ``iris.task`` stats.
11391138
11401139
Same set_pods() pattern as LogCollector: the sync loop declares the
1141-
authoritative set of running pods once per cycle. Each tick, the collector
1142-
fans out to ``kubectl top`` per pod and appends one ``IrisTaskStat`` row
1143-
per successful read to the supplied stats Table — the same table the
1144-
worker daemon writes to on the GCE/TPU path, so the dashboard's
1145-
``iris.task`` queries cover both runtimes uniformly.
1140+
authoritative set of running pods once per cycle. Each tick the collector
1141+
issues a single bulk metrics list (``kubectl top`` equivalent) scoped to the
1142+
managed-pod labels, then appends one ``IrisTaskStat`` row per tracked pod
1143+
that has a sample — to the same table the worker daemon writes to on the
1144+
GCE/TPU path, so the dashboard's ``iris.task`` queries cover both runtimes
1145+
uniformly. One API round-trip covers every pod, so cost is independent of
1146+
pod count and no per-pod thread fan-out is needed.
1147+
1148+
``poll_interval`` defaults to the metrics-server scrape resolution (15s);
1149+
polling faster only re-reads the same sample.
11461150
"""
11471151

1148-
def __init__(self, kubectl: K8sService, task_stats_table: Table, *, concurrency: int = 8):
1152+
def __init__(
1153+
self,
1154+
kubectl: K8sService,
1155+
task_stats_table: Table,
1156+
*,
1157+
labels: dict[str, str] | None = None,
1158+
poll_interval: float = 15.0,
1159+
):
11491160
self._kubectl = kubectl
11501161
self._table = task_stats_table
1162+
self._labels = labels
1163+
self._poll_interval = poll_interval
11511164
# (task_id_wire, attempt_id) -> pod_name. Tuple keys carry the
11521165
# identity needed to build IrisTaskStat without parsing strings.
11531166
self._pods: dict[tuple[str, int], str] = {}
11541167
self._lock = threading.Lock()
11551168
self._stop = threading.Event()
1156-
self._executor = ThreadPoolExecutor(max_workers=concurrency, thread_name_prefix="resource-collect")
11571169
self._thread = threading.Thread(target=self._run, daemon=True, name="resource-collector")
11581170
self._thread.start()
11591171

@@ -1164,47 +1176,47 @@ def set_pods(self, pods: dict[tuple[str, int], str]) -> None:
11641176

11651177
def _run(self) -> None:
11661178
while not self._stop.is_set():
1167-
with self._lock:
1168-
snapshot = list(self._pods.items())
1169-
if snapshot:
1170-
futures = [self._executor.submit(self._fetch_one, key, pod_name) for key, pod_name in snapshot]
1171-
for f in concurrent.futures.as_completed(futures):
1172-
try:
1173-
f.result()
1174-
except Exception:
1175-
pass
1176-
self._stop.wait(timeout=5.0)
1177-
1178-
def _fetch_one(self, key: tuple[str, int], pod_name: str) -> None:
1179+
self._collect_once()
1180+
self._stop.wait(timeout=self._poll_interval)
1181+
1182+
def _collect_once(self) -> None:
1183+
with self._lock:
1184+
snapshot = list(self._pods.items())
1185+
if not snapshot:
1186+
return
11791187
try:
1180-
top = self._kubectl.top_pod(pod_name)
1188+
usage_by_pod = self._kubectl.top_pods(labels=self._labels)
11811189
except Exception as e:
1182-
logger.debug("ResourceCollector: top_pod raised for pod %s: %s", pod_name, e)
1183-
return
1184-
if top is None:
1190+
logger.debug("ResourceCollector: top_pods raised: %s", e)
11851191
return
11861192

1187-
task_id_wire, attempt_id = key
1188-
usage = job_pb2.ResourceUsage(
1189-
cpu_millicores=top.cpu_millicores,
1190-
memory_mb=top.memory_bytes // (1024 * 1024),
1191-
)
1192-
stat = build_task_stat(
1193-
task_id=task_id_wire,
1194-
attempt_id=attempt_id,
1195-
# Pod name is the per-attempt platform identity on k8s, mirroring
1196-
# worker_id on the GCE/TPU path.
1197-
worker_id=pod_name,
1198-
usage=usage,
1199-
)
1193+
stats: list[IrisTaskStat] = []
1194+
for (task_id_wire, attempt_id), pod_name in snapshot:
1195+
top = usage_by_pod.get(pod_name)
1196+
if top is None:
1197+
continue
1198+
stats.append(
1199+
build_task_stat(
1200+
task_id=task_id_wire,
1201+
attempt_id=attempt_id,
1202+
# Pod name is the per-attempt platform identity on k8s,
1203+
# mirroring worker_id on the GCE/TPU path.
1204+
worker_id=pod_name,
1205+
usage=job_pb2.ResourceUsage(
1206+
cpu_millicores=top.cpu_millicores,
1207+
memory_mb=top.memory_bytes // (1024 * 1024),
1208+
),
1209+
)
1210+
)
1211+
if not stats:
1212+
return
12001213
try:
1201-
self._table.write([stat])
1214+
self._table.write(stats)
12021215
except Exception:
12031216
logger.debug("ResourceCollector: write to iris.task failed", exc_info=True)
12041217

12051218
def close(self) -> None:
12061219
self._stop.set()
1207-
self._executor.shutdown(wait=False)
12081220
self._thread.join(timeout=5)
12091221

12101222

@@ -1310,8 +1322,14 @@ class K8sTaskProvider:
13101322
# Pre-resolved iris.profile Table handle injected by the controller
13111323
# alongside task_stats_table. None in test mode.
13121324
profile_table: Table | None = None
1325+
# Log fetch fan-out: logs have no bulk API, so each pod is streamed on its
1326+
# own worker thread.
13131327
poll_concurrency: int = 32
13141328
log_poll_interval: float = 15.0
1329+
# Resource-usage poll cadence. Defaults to the metrics-server scrape
1330+
# resolution (15s) — sampling faster only re-reads the same value. One bulk
1331+
# metrics list per tick covers every managed pod (see ResourceCollector).
1332+
resource_poll_interval: float = 15.0
13151333
# Cluster-wide kubectl scans (pod list, stray-pod GC, pod poll, node refresh)
13161334
# are coarse-grained: the controller ticks reconcile at poll_interval (1s),
13171335
# but these LISTs run at most once per cluster_scan_interval to bound kubectl
@@ -1335,7 +1353,10 @@ def _ensure_resource_collector(self) -> ResourceCollector | None:
13351353
return None
13361354
if self._resource_collector is None:
13371355
self._resource_collector = ResourceCollector(
1338-
self.kubectl, self.task_stats_table, concurrency=self.poll_concurrency
1356+
self.kubectl,
1357+
self.task_stats_table,
1358+
labels=_MANAGED_POD_LABELS,
1359+
poll_interval=self.resource_poll_interval,
13391360
)
13401361
return self._resource_collector
13411362

lib/iris/tests/cluster/backends/k8s/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def provider(k8s, log_client, task_stats_table):
5454
log_client=log_client,
5555
task_stats_table=task_stats_table,
5656
log_poll_interval=1.0,
57+
resource_poll_interval=0.5,
5758
cluster_scan_interval=0.0,
5859
)
5960
yield p

0 commit comments

Comments
 (0)