Skip to content

Commit ffa2acb

Browse files
committed
[iris] k8s: address lint review on resource collector
Drive resource-collection tests synchronously via a single _collect_once() pass instead of sleeping on the background poll thread; raise the test collector's poll interval so the thread never races the assertions. Split the fake's persistent-failure injection into inject_persistent_failure() rather than a boolean flag, and trim implementation narration from the top_pods / ResourceCollector docstrings.
1 parent 81ed3a5 commit ffa2acb

5 files changed

Lines changed: 48 additions & 48 deletions

File tree

lib/iris/src/iris/cluster/backends/k8s/fake.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -474,17 +474,17 @@ def close(self) -> None:
474474

475475
# -- Failure injection --
476476

477-
def inject_failure(self, operation: str, error: Exception, *, persistent: bool = False) -> None:
478-
"""Inject a failure for *operation*.
477+
def inject_failure(self, operation: str, error: Exception) -> None:
478+
"""Inject a one-shot failure consumed by the next call to *operation*."""
479+
self._injected_failures[operation] = error
479480

480-
One-shot by default (consumed by the next call); ``persistent=True``
481-
raises on every call until cleared — needed for operations a background
482-
loop retries on its own cadence.
481+
def inject_persistent_failure(self, operation: str, error: Exception) -> None:
482+
"""Fail every call to *operation* until cleared.
483+
484+
Needed for operations a background loop retries on its own cadence,
485+
where a one-shot failure would be consumed by the first poll.
483486
"""
484-
if persistent:
485-
self._persistent_failures[operation] = error
486-
else:
487-
self._injected_failures[operation] = error
487+
self._persistent_failures[operation] = error
488488

489489
def clear_failure(self, operation: str) -> None:
490490
self._injected_failures.pop(operation, None)

lib/iris/src/iris/cluster/backends/k8s/service.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -687,15 +687,12 @@ def rm_files(self, pod_name: str, paths: list[str], *, container: str | None = N
687687
# -- top_pods ------------------------------------------------------------
688688

689689
def top_pods(self, *, labels: dict[str, str] | None = None) -> dict[str, PodResourceUsage]:
690-
"""Bulk pod CPU/memory usage via a single metrics.k8s.io list call.
690+
"""Return CPU/memory usage for every pod, keyed by pod name.
691691
692692
Lists ``PodMetrics`` for the namespace (optionally scoped by ``labels``)
693-
in one request and returns a ``pod_name -> PodResourceUsage`` map. One
694-
request covers every pod, so resource collection over N pods costs a
695-
single API round-trip instead of N per-pod GETs.
696-
697-
A 404 means the metrics API is unavailable (metrics-server absent);
698-
returns an empty map rather than raising so collection degrades quietly.
693+
via the metrics.k8s.io list endpoint. A 404 means the metrics API is
694+
unavailable (metrics-server absent); returns an empty map rather than
695+
raising so callers degrade quietly.
699696
"""
700697
logger.info("k8s: top_pods labels=%s", labels)
701698
kwargs = self._request_timeout_kwargs()

lib/iris/src/iris/cluster/backends/k8s/tasks.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,16 +1134,14 @@ def close(self) -> None:
11341134

11351135

11361136
class ResourceCollector:
1137-
"""Background resource usage collector that writes to ``iris.task`` stats.
1138-
1139-
Same set_pods() pattern as LogCollector: the sync loop declares the
1140-
authoritative set of running pods once per cycle. Each tick the collector
1141-
issues a single bulk metrics list (``kubectl top`` equivalent) scoped to the
1142-
managed-pod labels, then appends one ``IrisTaskStat`` row per tracked pod
1143-
that has a sample — to the same table the worker daemon writes to on the
1144-
GCE/TPU path, so the dashboard's ``iris.task`` queries cover both runtimes
1145-
uniformly. One API round-trip covers every pod, so cost is independent of
1146-
pod count and no per-pod thread fan-out is needed.
1137+
"""Background thread that samples running pods' CPU/memory usage.
1138+
1139+
The reconcile loop declares the authoritative set of running pods via
1140+
``set_pods()`` once per cycle. Each ``poll_interval`` the collector samples
1141+
those pods via one bulk metrics query and appends an ``IrisTaskStat`` row
1142+
per pod to the ``iris.task`` table — the same table the worker daemon writes
1143+
to on the GCE/TPU path, so the dashboard's ``iris.task`` queries cover both
1144+
runtimes uniformly.
11471145
11481146
``poll_interval`` defaults to the metrics-server scrape resolution (15s);
11491147
polling faster only re-reads the same sample.

lib/iris/tests/cluster/backends/k8s/conftest.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ def provider(k8s, log_client, task_stats_table):
5454
log_client=log_client,
5555
task_stats_table=task_stats_table,
5656
log_poll_interval=1.0,
57-
resource_poll_interval=0.5,
57+
# Long enough that the background collector never fires mid-test; resource
58+
# tests drive a collection pass synchronously instead of racing the thread.
59+
resource_poll_interval=3600.0,
5860
cluster_scan_interval=0.0,
5961
)
6062
yield p

lib/iris/tests/cluster/backends/k8s/test_provider.py

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -532,8 +532,19 @@ def test_sync_survives_node_list_failure(provider, k8s):
532532
# ---------------------------------------------------------------------------
533533

534534

535+
def _collect_resources_once(provider) -> None:
536+
"""Drive one synchronous resource-collection pass.
537+
538+
reconcile() registers the running-pod set with the background collector;
539+
this runs a single collection against that set without waiting on (or
540+
racing) the collector's poll thread.
541+
"""
542+
assert provider._resource_collector is not None, "reconcile should have started the collector"
543+
provider._resource_collector._collect_once()
544+
545+
535546
def test_resource_stats_from_kubectl_top(provider, k8s, task_stats_table):
536-
"""Running pods emit IrisTaskStat rows via the background ResourceCollector."""
547+
"""Running pods emit IrisTaskStat rows via the ResourceCollector."""
537548

538549
task_id = JobName.from_wire("/job/0")
539550
attempt_id = 0
@@ -543,12 +554,9 @@ def test_resource_stats_from_kubectl_top(provider, k8s, task_stats_table):
543554
populate_pod(k8s, pod_name, "Running")
544555
k8s.set_top_pod(pod_name, PodResourceUsage(cpu_millicores=500, memory_bytes=1024 * 1024 * 1024))
545556

546-
batch = make_batch(running_tasks=[entry])
547-
# First sync registers the pod with the ResourceCollector.
548-
provider.reconcile(batch)
549-
# Wait for background collector to fetch and write.
550-
time.sleep(2)
551-
# No more sync needed — the row has already been written to the table.
557+
# reconcile registers the pod; then collect once.
558+
provider.reconcile(make_batch(running_tasks=[entry]))
559+
_collect_resources_once(provider)
552560

553561
rows = [row for batch_rows in task_stats_table.writes for row in batch_rows]
554562
assert rows, "ResourceCollector did not write any IrisTaskStat rows"
@@ -562,7 +570,7 @@ def test_resource_stats_from_kubectl_top(provider, k8s, task_stats_table):
562570

563571

564572
def test_resource_stats_skipped_when_metrics_unavailable(provider, k8s, task_stats_table):
565-
"""No IrisTaskStat row is written when kubectl top returns None."""
573+
"""No IrisTaskStat row is written when a pod has no metrics sample."""
566574
task_id = JobName.from_wire("/job/0")
567575
attempt_id = 0
568576
pod_name = _pod_name(task_id, attempt_id)
@@ -571,28 +579,24 @@ def test_resource_stats_skipped_when_metrics_unavailable(provider, k8s, task_sta
571579
populate_pod(k8s, pod_name, "Running")
572580
k8s.set_top_pod(pod_name, None)
573581

574-
batch = make_batch(running_tasks=[entry])
575-
provider.reconcile(batch)
576-
time.sleep(2)
582+
provider.reconcile(make_batch(running_tasks=[entry]))
583+
_collect_resources_once(provider)
577584

578585
assert task_stats_table.writes == []
579586

580587

581588
def test_resource_stats_skipped_when_top_pods_raises(provider, k8s, task_stats_table):
582-
"""No IrisTaskStat row is written when the bulk metrics query raises."""
589+
"""A raising bulk metrics query is swallowed; no IrisTaskStat row is written."""
583590
task_id = JobName.from_wire("/job/0")
584591
attempt_id = 0
585592
pod_name = _pod_name(task_id, attempt_id)
586593
entry = RunningTaskEntry(task_id=task_id, attempt_id=attempt_id)
587594

588595
populate_pod(k8s, pod_name, "Running")
589-
# Persistent: the background collector retries on its own cadence, so a
590-
# one-shot failure would be consumed and later polls would succeed.
591-
k8s.inject_failure("top_pods", RuntimeError("metrics-server unavailable"), persistent=True)
596+
k8s.inject_persistent_failure("top_pods", RuntimeError("metrics-server unavailable"))
592597

593-
batch = make_batch(running_tasks=[entry])
594-
provider.reconcile(batch)
595-
time.sleep(2)
598+
provider.reconcile(make_batch(running_tasks=[entry]))
599+
_collect_resources_once(provider)
596600

597601
assert task_stats_table.writes == []
598602

@@ -606,9 +610,8 @@ def test_resource_stats_skipped_for_non_running_pods(provider, k8s, task_stats_t
606610

607611
populate_pod(k8s, pod_name, "Succeeded")
608612

609-
batch = make_batch(running_tasks=[entry])
610-
provider.reconcile(batch)
611-
time.sleep(2)
613+
provider.reconcile(make_batch(running_tasks=[entry]))
614+
_collect_resources_once(provider)
612615

613616
assert task_stats_table.writes == []
614617

0 commit comments

Comments
 (0)