@@ -852,11 +852,6 @@ def to_status_response(self, namespace: str) -> controller_pb2.Controller.GetKub
852852 )
853853
854854
855- # After this many consecutive fetch failures, the LogCollector/ResourceCollector
856- # will auto-untrack a pod (it's likely been deleted).
857- _COLLECTOR_MAX_CONSECUTIVE_FAILURES = 5
858-
859-
860855@dataclass
861856class _LogPod :
862857 """A pod tracked by LogCollector for incremental log fetching."""
@@ -872,14 +867,23 @@ class LogCollector:
872867 """Background log fetcher that pushes entries to the LogService.
873868
874869 Runs on its own daemon thread with a bounded ThreadPoolExecutor.
875- The sync loop calls track()/untrack() to manage the set of pods;
876- the collector independently fetches logs and pushes them via the
877- LogPusher without blocking the scheduling path.
870+ The sync loop calls set_pods() once per cycle with the authoritative
871+ set of pods to track. The collector diffs against its internal state,
872+ does a final fetch for removed pods, and starts tracking new ones.
873+ This avoids drift between what the sync loop thinks is tracked and
874+ what the collector is actually polling.
878875 """
879876
880- def __init__ (self , kubectl : K8sService , log_pusher : LogPusherProtocol , concurrency : int = 8 ):
877+ def __init__ (
878+ self ,
879+ kubectl : K8sService ,
880+ log_pusher : LogPusherProtocol ,
881+ concurrency : int = 8 ,
882+ poll_interval : float = 15.0 ,
883+ ):
881884 self ._kubectl = kubectl
882885 self ._log_pusher = log_pusher
886+ self ._poll_interval = poll_interval
883887 self ._pods : dict [str , _LogPod ] = {}
884888 self ._lock = threading .Lock ()
885889 self ._pod_locks : dict [str , threading .Lock ] = {}
@@ -888,23 +892,29 @@ def __init__(self, kubectl: K8sService, log_pusher: LogPusherProtocol, concurren
888892 self ._thread = threading .Thread (target = self ._run , daemon = True , name = "log-collector" )
889893 self ._thread .start ()
890894
891- def track (self , pod_name : str , task_id : JobName , attempt_id : int ) -> None :
892- """Start collecting logs for a pod."""
893- key = f"{ task_id .to_wire ()} :{ attempt_id } "
894- with self ._lock :
895- if key not in self ._pods :
896- self ._pods [key ] = _LogPod (pod_name = pod_name , task_id = task_id , attempt_id = attempt_id )
897- self ._pod_locks [key ] = threading .Lock ()
895+ def set_pods (self , pods : dict [str , _LogPod ]) -> None :
896+ """Declare the authoritative set of pods to collect logs for.
898897
899- def untrack (self , task_id : JobName , attempt_id : int ) -> None :
900- """Stop collecting logs for a pod. Does one final incremental fetch."""
901- key = f"{ task_id .to_wire ()} :{ attempt_id } "
898+ New keys are added. Keys absent from `pods` are removed after a
899+ synchronous final log fetch. Existing keys are preserved (keeping
900+ their cursor state).
901+ """
902902 with self ._lock :
903- pod = self ._pods .pop (key , None )
904- pod_lock = self ._pod_locks .pop (key , None )
905- if pod is not None and pod_lock is not None :
906- with pod_lock :
907- self ._fetch_and_store (pod )
903+ removed_keys = self ._pods .keys () - pods .keys ()
904+ removed = [(key , self ._pods [key ], self ._pod_locks .get (key )) for key in removed_keys ]
905+ for key in removed_keys :
906+ del self ._pods [key ]
907+ self ._pod_locks .pop (key , None )
908+ for key , pod in pods .items ():
909+ if key not in self ._pods :
910+ self ._pods [key ] = pod
911+ self ._pod_locks [key ] = threading .Lock ()
912+
913+ # Final fetch for removed pods (outside lock to avoid holding it during I/O).
914+ for _key , pod , pod_lock in removed :
915+ if pod_lock is not None :
916+ with pod_lock :
917+ self ._fetch_and_store (pod )
908918
909919 def _run (self ) -> None :
910920 while not self ._stop .is_set ():
@@ -914,33 +924,19 @@ def _run(self) -> None:
914924 with self ._lock :
915925 pod_lock = self ._pod_locks .get (key )
916926 if pod_lock is not None :
917- self ._executor .submit (self ._guarded_fetch , pod , pod_lock )
918- self ._stop .wait (timeout = 2.0 )
927+ self ._executor .submit (self ._guarded_fetch , key , pod , pod_lock )
928+ self ._stop .wait (timeout = self . _poll_interval )
919929
920- def _guarded_fetch (self , pod : _LogPod , pod_lock : threading .Lock ) -> None :
930+ def _guarded_fetch (self , key : str , pod : _LogPod , pod_lock : threading .Lock ) -> None :
921931 if not pod_lock .acquire (blocking = False ):
922932 return
923933 try :
924934 self ._fetch_and_store (pod )
925- if pod .consecutive_failures >= _COLLECTOR_MAX_CONSECUTIVE_FAILURES :
926- key = f"{ pod .task_id .to_wire ()} :{ pod .attempt_id } "
927- logger .info (
928- "LogCollector: auto-untracking pod %s after %d consecutive failures" ,
929- pod .pod_name ,
930- pod .consecutive_failures ,
931- )
932- with self ._lock :
933- self ._pods .pop (key , None )
934- self ._pod_locks .pop (key , None )
935935 finally :
936936 pod_lock .release ()
937937
938938 def _fetch_and_store (self , pod : _LogPod ) -> bool :
939- """Fetch logs since last timestamp and advance. Must be called under pod lock.
940-
941- Returns True on success, False on failure. Consecutive failures are tracked
942- so the collector can stop polling deleted pods.
943- """
939+ """Fetch logs since last timestamp and advance. Must be called under pod lock."""
944940 try :
945941 result = self ._kubectl .stream_logs (pod .pod_name , container = "task" , since_time = pod .last_timestamp )
946942 if result .lines :
@@ -952,13 +948,8 @@ def _fetch_and_store(self, pod: _LogPod) -> bool:
952948 return True
953949 except Exception as e :
954950 pod .consecutive_failures += 1
955- logger .warning (
956- "LogCollector: fetch failed for pod %s (%d/%d): %s" ,
957- pod .pod_name ,
958- pod .consecutive_failures ,
959- _COLLECTOR_MAX_CONSECUTIVE_FAILURES ,
960- e ,
961- )
951+ if pod .consecutive_failures <= 1 :
952+ logger .warning ("LogCollector: fetch failed for pod %s: %s" , pod .pod_name , e )
962953 return False
963954
964955 def close (self ) -> None :
@@ -968,35 +959,35 @@ def close(self) -> None:
968959
969960
970961class ResourceCollector :
971- """Background resource usage collector that writes to TaskUpdate-compatible storage .
962+ """Background resource usage collector.
972963
973- Same pattern as LogCollector: runs on its own daemon thread with a bounded
974- ThreadPoolExecutor. Calls kubectl top pod for each tracked pod and stores
975- the latest reading so the sync loop can pick it up without blocking.
964+ Same set_pods() pattern as LogCollector: the sync loop declares the
965+ authoritative set of running pods once per cycle, and the collector
966+ diffs internally. Pods removed from the set have their cached results
967+ cleaned up immediately.
976968 """
977969
978970 def __init__ (self , kubectl : K8sService , concurrency : int = 8 ):
979971 self ._kubectl = kubectl
980972 self ._pods : dict [str , str ] = {} # cursor_key -> pod_name
981- self ._consecutive_failures : dict [str , int ] = {} # cursor_key -> failure count
982973 self ._results : dict [str , job_pb2 .ResourceUsage ] = {} # cursor_key -> latest reading
983974 self ._lock = threading .Lock ()
984975 self ._stop = threading .Event ()
985976 self ._executor = ThreadPoolExecutor (max_workers = concurrency , thread_name_prefix = "resource-collect" )
986977 self ._thread = threading .Thread (target = self ._run , daemon = True , name = "resource-collector" )
987978 self ._thread .start ()
988979
989- def track (self , pod_name : str , task_id : JobName , attempt_id : int ) -> None :
990- key = f"{ task_id .to_wire ()} :{ attempt_id } "
991- with self ._lock :
992- self ._pods [key ] = pod_name
980+ def set_pods (self , pods : dict [str , str ]) -> None :
981+ """Declare the authoritative set of pods to collect resources for.
993982
994- def untrack (self , task_id : JobName , attempt_id : int ) -> None :
995- key = f"{ task_id .to_wire ()} :{ attempt_id } "
983+ `pods` maps cursor_key -> pod_name. Keys absent from `pods` are
984+ removed along with their cached results.
985+ """
996986 with self ._lock :
997- self ._pods .pop (key , None )
998- self ._results .pop (key , None )
999- self ._consecutive_failures .pop (key , None )
987+ removed_keys = self ._pods .keys () - pods .keys ()
988+ for key in removed_keys :
989+ self ._results .pop (key , None )
990+ self ._pods = dict (pods )
1000991
1001992 def get (self , task_id : JobName , attempt_id : int ) -> job_pb2 .ResourceUsage | None :
1002993 """Return the latest resource reading for a pod (non-blocking)."""
@@ -1031,24 +1022,9 @@ def _fetch_one(self, key: str, pod_name: str) -> None:
10311022 memory_mb = mem_bytes // (1024 * 1024 ),
10321023 )
10331024 with self ._lock :
1034- self ._results [key ] = usage
1035- self ._consecutive_failures [key ] = 0
1036- return
1037-
1038- # top_pod returned None (metrics unavailable) or raised — treat as a miss.
1039- with self ._lock :
1040- count = self ._consecutive_failures .get (key , 0 ) + 1
1041- self ._consecutive_failures [key ] = count
1042- if count >= _COLLECTOR_MAX_CONSECUTIVE_FAILURES :
1043- logger .info (
1044- "ResourceCollector: auto-untracking pod %s after %d consecutive misses" ,
1045- pod_name ,
1046- count ,
1047- )
1048- with self ._lock :
1049- self ._pods .pop (key , None )
1050- self ._results .pop (key , None )
1051- self ._consecutive_failures .pop (key , None )
1025+ # Only store if the key is still tracked (may have been removed by set_pods).
1026+ if key in self ._pods :
1027+ self ._results [key ] = usage
10521028
10531029 def close (self ) -> None :
10541030 self ._stop .set ()
@@ -1086,6 +1062,7 @@ class K8sTaskProvider:
10861062 task_env : dict [str , str ] = field (default_factory = dict )
10871063 log_pusher : LogPusherProtocol | None = None
10881064 poll_concurrency : int = 32
1065+ log_poll_interval : float = 15.0
10891066 _pod_not_found_counts : dict [str , int ] = field (default_factory = dict , init = False , repr = False )
10901067 _log_collector : LogCollector | None = field (default = None , init = False , repr = False )
10911068 _resource_collector : ResourceCollector | None = field (default = None , init = False , repr = False )
@@ -1100,24 +1077,11 @@ def _ensure_resource_collector(self) -> ResourceCollector:
11001077
11011078 def _ensure_log_collector (self ) -> LogCollector | None :
11021079 if self ._log_collector is None and self .log_pusher is not None :
1103- self ._log_collector = LogCollector (self .kubectl , self .log_pusher , concurrency = self .poll_concurrency )
1080+ self ._log_collector = LogCollector (
1081+ self .kubectl , self .log_pusher , concurrency = self .poll_concurrency , poll_interval = self .log_poll_interval
1082+ )
11041083 return self ._log_collector
11051084
1106- def _track_pod (self , pod_name : str , task_id : JobName , attempt_id : int , phase : str ) -> None :
1107- """Register a pod with background collectors, creating them lazily."""
1108- log_collector = self ._ensure_log_collector ()
1109- if log_collector is not None :
1110- log_collector .track (pod_name , task_id , attempt_id )
1111- if phase == "Running" :
1112- self ._ensure_resource_collector ().track (pod_name , task_id , attempt_id )
1113-
1114- def _untrack_pod (self , task_id : JobName , attempt_id : int ) -> None :
1115- """Remove a pod from all background collectors."""
1116- if self ._log_collector is not None :
1117- self ._log_collector .untrack (task_id , attempt_id )
1118- if self ._resource_collector is not None :
1119- self ._resource_collector .untrack (task_id , attempt_id )
1120-
11211085 def sync (self , batch : DirectProviderBatch ) -> DirectProviderSyncResult :
11221086 """Sync task state: apply new pods, delete killed pods, poll running pods."""
11231087 apply_failures : list [TaskUpdate ] = []
@@ -1511,15 +1475,27 @@ def _poll_pods(self, running: list[RunningTaskEntry], cached_pods: list[dict]) -
15111475 period-to-FAILED path for legitimately Succeeded pods.
15121476
15131477 Log fetching and resource usage collection are handled by background
1514- LogCollector and ResourceCollector threads. This method only reads pod
1515- phase and the latest cached resource snapshot.
1478+ LogCollector and ResourceCollector threads. After building updates,
1479+ this method calls set_pods() on each collector with the authoritative
1480+ set of non-terminal pods, so the collectors can never drift.
15161481 """
15171482 if not running :
1483+ # No running tasks — clear all collectors.
1484+ log_collector = self ._ensure_log_collector ()
1485+ if log_collector is not None :
1486+ log_collector .set_pods ({})
1487+ if self ._resource_collector is not None :
1488+ self ._resource_collector .set_pods ({})
15181489 return []
15191490
15201491 pods_by_name : dict [str , dict ] = {pod .get ("metadata" , {}).get ("name" , "" ): pod for pod in cached_pods }
15211492 updates : list [TaskUpdate ] = []
15221493
1494+ # Build up the authoritative pod sets for collectors.
1495+ log_pods : dict [str , _LogPod ] = {}
1496+ resource_pods : dict [str , str ] = {} # cursor_key -> pod_name
1497+ terminal_log_pods : dict [str , _LogPod ] = {} # pods that completed this cycle
1498+
15231499 for entry in running :
15241500 pod_name = _pod_name (entry .task_id , entry .attempt_id )
15251501 cursor_key = f"{ entry .task_id .to_wire ()} :{ entry .attempt_id } "
@@ -1543,7 +1519,6 @@ def _poll_pods(self, running: list[RunningTaskEntry], cached_pods: list[dict]) -
15431519 continue
15441520 # Grace exhausted — pod is truly gone.
15451521 self ._pod_not_found_counts .pop (cursor_key , None )
1546- self ._untrack_pod (entry .task_id , entry .attempt_id )
15471522 updates .append (
15481523 TaskUpdate (
15491524 task_id = entry .task_id ,
@@ -1558,16 +1533,20 @@ def _poll_pods(self, running: list[RunningTaskEntry], cached_pods: list[dict]) -
15581533 update = _task_update_from_pod (entry , pod )
15591534 phase = pod .get ("status" , {}).get ("phase" , "" )
15601535
1561- self ._track_pod (pod_name , entry .task_id , entry .attempt_id , phase )
1536+ if phase not in ("Succeeded" , "Failed" ):
1537+ log_pods [cursor_key ] = _LogPod (pod_name = pod_name , task_id = entry .task_id , attempt_id = entry .attempt_id )
1538+ if phase == "Running" :
1539+ resource_pods [cursor_key ] = pod_name
1540+ else :
1541+ terminal_log_pods [cursor_key ] = _LogPod (
1542+ pod_name = pod_name , task_id = entry .task_id , attempt_id = entry .attempt_id
1543+ )
15621544
15631545 # Read latest cached resource usage (non-blocking).
15641546 resource_usage = None
15651547 if self ._resource_collector is not None :
15661548 resource_usage = self ._resource_collector .get (entry .task_id , entry .attempt_id )
15671549
1568- if phase in ("Succeeded" , "Failed" ):
1569- self ._untrack_pod (entry .task_id , entry .attempt_id )
1570-
15711550 updates .append (
15721551 TaskUpdate (
15731552 task_id = update .task_id ,
@@ -1579,6 +1558,18 @@ def _poll_pods(self, running: list[RunningTaskEntry], cached_pods: list[dict]) -
15791558 )
15801559 )
15811560
1561+ # Sync collectors with the authoritative pod sets.
1562+ # set_pods() does a final log fetch for pods that drop out of the set.
1563+ # For pods that completed this cycle, we include them first so they're
1564+ # added (if not already tracked), then call set_pods again without them
1565+ # to trigger the final fetch on removal.
1566+ log_collector = self ._ensure_log_collector ()
1567+ if log_collector is not None :
1568+ if terminal_log_pods :
1569+ log_collector .set_pods ({** log_pods , ** terminal_log_pods })
1570+ log_collector .set_pods (log_pods )
1571+ self ._ensure_resource_collector ().set_pods (resource_pods )
1572+
15821573 return updates
15831574
15841575 def _fetch_scheduling_events (self , cached_pods : list [dict ]) -> list [SchedulingEvent ]:
0 commit comments