4141from iris .cluster .types import JobName , TaskAttempt
4242from iris .cluster .worker .stats import IrisTaskStat
4343from iris .rpc import job_pb2
44+ from iris .test_util import wait_for_condition
45+ from rigging .timing import Duration
4446
4547from .conftest import make_batch , make_kueue_provider , make_run_req , populate_node , populate_pod
4648
@@ -532,86 +534,53 @@ def test_sync_survives_node_list_failure(provider, k8s):
532534# ---------------------------------------------------------------------------
533535
534536
535- def _collect_resources_once (provider ) -> None :
536- """Drive one synchronous resource-collection pass.
537+ def test_resource_stats_only_for_running_tasks (provider , k8s , task_stats_table ):
538+ """reconcile registers running pods (not terminal ones) so the background
539+ collector emits IrisTaskStat rows only for running tasks."""
540+ running = RunningTaskEntry (task_id = JobName .from_wire ("/job/run" ), attempt_id = 0 )
541+ terminal = RunningTaskEntry (task_id = JobName .from_wire ("/job/done" ), attempt_id = 0 )
542+ running_pod = _pod_name (running .task_id , running .attempt_id )
543+ terminal_pod = _pod_name (terminal .task_id , terminal .attempt_id )
537544
538- reconcile() registers the running-pod set with the background collector;
539- this runs a single collection against that set without waiting on (or
540- racing) the collector's poll thread.
541- """
542- assert provider ._resource_collector is not None , "reconcile should have started the collector"
543- provider ._resource_collector ._collect_once ()
544-
545-
546- def test_resource_stats_from_kubectl_top (provider , k8s , task_stats_table ):
547- """Running pods emit IrisTaskStat rows via the ResourceCollector."""
548-
549- task_id = JobName .from_wire ("/job/0" )
550- attempt_id = 0
551- pod_name = _pod_name (task_id , attempt_id )
552- entry = RunningTaskEntry (task_id = task_id , attempt_id = attempt_id )
553-
554- populate_pod (k8s , pod_name , "Running" )
555- k8s .set_top_pod (pod_name , PodResourceUsage (cpu_millicores = 500 , memory_bytes = 1024 * 1024 * 1024 ))
545+ populate_pod (k8s , running_pod , "Running" )
546+ populate_pod (k8s , terminal_pod , "Succeeded" )
547+ k8s .set_top_pod (running_pod , PodResourceUsage (cpu_millicores = 500 , memory_bytes = 1024 * 1024 * 1024 ))
548+ k8s .set_top_pod (terminal_pod , PodResourceUsage (cpu_millicores = 999 , memory_bytes = 1024 ))
556549
557- # reconcile registers the pod; then collect once.
558- provider .reconcile (make_batch (running_tasks = [entry ]))
559- _collect_resources_once (provider )
550+ provider .reconcile (make_batch (running_tasks = [running , terminal ]))
551+ # The collector samples all tracked pods in one pass, so once the running
552+ # pod's row lands a full cycle has run — the terminal pod's absence is real.
553+ wait_for_condition (lambda : bool (task_stats_table .writes ), timeout = Duration .from_seconds (5.0 ))
560554
561- rows = [row for batch_rows in task_stats_table .writes for row in batch_rows ]
562- assert rows , "ResourceCollector did not write any IrisTaskStat rows"
555+ rows = [row for batch_rows in list (task_stats_table .writes ) for row in batch_rows ]
563556 assert all (isinstance (r , IrisTaskStat ) for r in rows )
564- latest = rows [- 1 ]
565- assert latest .task_id == task_id .to_wire ()
566- assert latest .attempt_id == attempt_id
567- assert latest .worker_id == pod_name
568- assert latest .cpu_millicores == 500
569- assert latest .memory_mb == 1024
557+ assert {r .worker_id for r in rows } == {running_pod }, "only the running pod should be sampled"
558+ row = next (r for r in rows if r .worker_id == running_pod )
559+ assert row .task_id == running .task_id .to_wire ()
560+ assert row .cpu_millicores == 500
561+ assert row .memory_mb == 1024
570562
571563
572- def test_resource_stats_skipped_when_metrics_unavailable (provider , k8s , task_stats_table ):
573- """No IrisTaskStat row is written when a pod has no metrics sample."""
574- task_id = JobName .from_wire ("/job/0" )
575- attempt_id = 0
576- pod_name = _pod_name (task_id , attempt_id )
577- entry = RunningTaskEntry (task_id = task_id , attempt_id = attempt_id )
564+ def test_resource_collector_skips_pod_without_metrics_sample (k8s , task_stats_table ):
565+ """A tracked pod with no metrics sample produces no row."""
566+ k8s .set_top_pod ("pod-a" , None )
578567
579- populate_pod (k8s , pod_name , "Running" )
580- k8s .set_top_pod (pod_name , None )
581-
582- provider .reconcile (make_batch (running_tasks = [entry ]))
583- _collect_resources_once (provider )
568+ collector = ResourceCollector (k8s , task_stats_table , poll_interval = 60.0 )
569+ collector .close () # stop the background loop; drive one collection synchronously
570+ collector .set_pods ({("/job/0" , 0 ): "pod-a" })
571+ collector .collect_once ()
584572
585573 assert task_stats_table .writes == []
586574
587575
588- def test_resource_stats_skipped_when_top_pods_raises (provider , k8s , task_stats_table ):
589- """A raising bulk metrics query is swallowed; no IrisTaskStat row is written."""
590- task_id = JobName .from_wire ("/job/0" )
591- attempt_id = 0
592- pod_name = _pod_name (task_id , attempt_id )
593- entry = RunningTaskEntry (task_id = task_id , attempt_id = attempt_id )
594-
595- populate_pod (k8s , pod_name , "Running" )
576+ def test_resource_collector_swallows_metrics_query_failure (k8s , task_stats_table ):
577+ """A raising bulk metrics query is swallowed; no row is written."""
596578 k8s .inject_persistent_failure ("top_pods" , RuntimeError ("metrics-server unavailable" ))
597579
598- provider .reconcile (make_batch (running_tasks = [entry ]))
599- _collect_resources_once (provider )
600-
601- assert task_stats_table .writes == []
602-
603-
604- def test_resource_stats_skipped_for_non_running_pods (provider , k8s , task_stats_table ):
605- """Terminal pods are not registered with the resource collector, so no rows land."""
606- task_id = JobName .from_wire ("/job/0" )
607- attempt_id = 0
608- pod_name = _pod_name (task_id , attempt_id )
609- entry = RunningTaskEntry (task_id = task_id , attempt_id = attempt_id )
610-
611- populate_pod (k8s , pod_name , "Succeeded" )
612-
613- provider .reconcile (make_batch (running_tasks = [entry ]))
614- _collect_resources_once (provider )
580+ collector = ResourceCollector (k8s , task_stats_table , poll_interval = 60.0 )
581+ collector .close ()
582+ collector .set_pods ({("/job/0" , 0 ): "pod-a" })
583+ collector .collect_once ()
615584
616585 assert task_stats_table .writes == []
617586
@@ -1140,21 +1109,21 @@ def test_log_collector_set_pods_preserves_cursor_state(k8s, log_client):
11401109
11411110
11421111def test_resource_collector_set_pods_replaces_active_set (k8s , task_stats_table ):
1143- """set_pods() replaces the tracked pod set wholesale."""
1112+ """set_pods() replaces the tracked pod set wholesale: a pod dropped from the
1113+ set stops being sampled on the next collection."""
1114+ k8s .set_top_pod ("pod-a" , PodResourceUsage (cpu_millicores = 100 , memory_bytes = 128 * 1024 * 1024 ))
1115+ k8s .set_top_pod ("pod-b" , PodResourceUsage (cpu_millicores = 100 , memory_bytes = 128 * 1024 * 1024 ))
11441116
11451117 collector = ResourceCollector (k8s , task_stats_table , poll_interval = 60.0 )
1146- key_a = ("/job/0" , 0 )
1147- key_b = ("/job/1" , 0 )
1118+ collector .close () # stop the background loop; drive collections synchronously
11481119
1149- collector .set_pods ({key_a : "pod-a" , key_b : "pod-b" })
1150- with collector ._lock :
1151- assert collector . _pods == {key_a : "pod-a" , key_b : "pod-b" }
1120+ collector .set_pods ({( "/job/0" , 0 ) : "pod-a" , ( "/job/1" , 0 ) : "pod-b" })
1121+ collector .collect_once ()
1122+ assert { r . worker_id for r in task_stats_table . writes [ - 1 ]} == {"pod-a" , "pod-b" }
11521123
1153- collector .set_pods ({key_b : "pod-b" })
1154- with collector ._lock :
1155- assert collector ._pods == {key_b : "pod-b" }
1156-
1157- collector .close ()
1124+ collector .set_pods ({("/job/1" , 0 ): "pod-b" })
1125+ collector .collect_once ()
1126+ assert {r .worker_id for r in task_stats_table .writes [- 1 ]} == {"pod-b" }
11581127
11591128
11601129def test_resource_collector_writes_iris_task_rows (k8s , task_stats_table ):
@@ -1166,7 +1135,7 @@ def test_resource_collector_writes_iris_task_rows(k8s, task_stats_table):
11661135 # Stop the background loop so we drive a single collection deterministically.
11671136 collector .close ()
11681137 collector .set_pods ({("/job/0" , 3 ): "pod-a" })
1169- collector ._collect_once ()
1138+ collector .collect_once ()
11701139
11711140 rows = [row for batch_rows in task_stats_table .writes for row in batch_rows ]
11721141 assert rows , "no rows emitted"
@@ -1194,7 +1163,7 @@ def test_resource_collector_uses_one_bulk_query_for_many_pods(k8s, task_stats_ta
11941163 collector .close ()
11951164 collector .set_pods (pods )
11961165 calls_before = k8s .top_pods_call_count
1197- collector ._collect_once ()
1166+ collector .collect_once ()
11981167
11991168 assert k8s .top_pods_call_count - calls_before == 1 , "expected exactly one bulk metrics query"
12001169 assert len (task_stats_table .writes ) == 1 , "expected a single batched write"
0 commit comments