3333
3434
3535@dataclasses .dataclass
36- class CachedEntityState :
37- isFatal : bool
38- isHealthy : bool
36+ class EntityCacheEntry :
37+ active_errors : set [str ] = dataclasses .field (default_factory = set )
38+
39+ @property
40+ def is_healthy (self ) -> bool :
41+ return not self .active_errors
3942
4043
4144class PlatformConnectorEventProcessor (dcgmtypes .CallbackInterface ):
@@ -46,7 +49,6 @@ def __init__(
4649 exit : Event ,
4750 dcgm_errors_info_dict : dict [str , str ],
4851 state_file_path : str ,
49- dcgm_health_conditions_categorization_mapping_config : dict [str , str ],
5052 metadata_path : str ,
5153 processing_strategy : platformconnector_pb2 .ProcessingStrategy ,
5254 ) -> None :
@@ -60,8 +62,7 @@ def __init__(
6062 self .state_file_path = state_file_path
6163 self .node_bootid_path = "/proc/sys/kernel/random/boot_id"
6264 self .old_bootid = self .read_old_system_bootid_from_state_file ()
63- self .entity_cache : dict [str , CachedEntityState ] = {}
64- self .dcgm_health_conditions_categorization_mapping_config = dcgm_health_conditions_categorization_mapping_config
65+ self .entity_cache : dict [str , EntityCacheEntry ] = {}
6566 self ._metadata_reader = MetadataReader (metadata_path )
6667 self ._processing_strategy = processing_strategy
6768
@@ -94,7 +95,8 @@ def clear_dcgm_connectivity_failure(self, timestamp: Timestamp) -> None:
9495 check_name = "GpuDcgmConnectivityFailure"
9596
9697 key = self ._build_cache_key (check_name , "DCGM" , "ALL" )
97- if key not in self .entity_cache or not self .entity_cache [key ].isHealthy :
98+ entry = self .entity_cache .get (key )
99+ if entry is None or not entry .is_healthy :
98100 event_metadata = {}
99101 chassis_serial = self ._metadata_reader .get_chassis_serial ()
100102 if chassis_serial :
@@ -121,10 +123,9 @@ def clear_dcgm_connectivity_failure(self, timestamp: Timestamp) -> None:
121123 if len (health_events ):
122124 try :
123125 if self .send_health_event_with_retries (health_events ):
124- # Only update cache after successful send
125- self .entity_cache [key ] = CachedEntityState (isFatal = False , isHealthy = True )
126- log .info (f"Updated cache for key { key } after successful send" )
127- metrics .dcgm_health_active_events .labels (event_type = check_name , gpu_id = "" , severity = "fatal" ).set (0 )
126+ self .entity_cache [key ] = EntityCacheEntry ()
127+ log .info (f"Updated cache for key { key } with value { self .entity_cache [key ]} after successful send" )
128+ metrics .dcgm_health_active_events .labels (event_type = check_name , gpu_id = "" ).set (0 )
128129 except Exception as e :
129130 log .error (f"Exception while sending DCGM connectivity restored events: { e } " )
130131 raise
@@ -142,8 +143,8 @@ def health_event_occurred(self, health_details: dict[str, dcgmtypes.HealthDetail
142143
143144 health_events = []
144145 # Collect pending cache and metric updates to apply only after successful send
145- pending_cache_updates : dict [str , CachedEntityState ] = {}
146- pending_metric_updates : list [tuple [str , int , str , int ]] = [] # (event_type, gpu_id, severity , value)
146+ pending_cache_updates : dict [str , EntityCacheEntry ] = {}
147+ pending_metric_updates : list [tuple [str , int , int ]] = [] # (event_type, gpu_id, value)
147148
148149 for watch_name , details in health_details .items ():
149150 check_name = self ._convert_dcgm_watch_name_to_check_name (watch_name )
@@ -178,27 +179,16 @@ def health_event_occurred(self, health_details: dict[str, dcgmtypes.HealthDetail
178179
179180 entities_impacted_supports_component_reset = pci_address and gpu_uuid
180181
182+ recommended_action = self .get_recommended_action_from_dcgm_error_map (failure_details .code )
183+ isHealthy = False
184+ isFatal = recommended_action != platformconnector_pb2 .NONE
181185 key = self ._build_cache_key (check_name , entity .entityType , entity .entityValue )
182- isFatal = False
183- isHealthy = True
184- if details .status == dcgmtypes .HealthStatus .PASS :
185- isFatal = False
186- isHealthy = True
187- else :
188- isFatal = (
189- False
190- if self .dcgm_health_conditions_categorization_mapping_config [watch_name ] == "NonFatal"
191- else True
192- )
193- isHealthy = False
194- if (
195- key not in self .entity_cache
196- or self .entity_cache [key ].isFatal != isFatal
197- or self .entity_cache [key ].isHealthy != isHealthy
198- ):
199- # Defer cache update until after successful send
200- pending_cache_updates [key ] = CachedEntityState (isFatal = isFatal , isHealthy = isHealthy )
201- recommended_action = self .get_recommended_action_from_dcgm_error_map (failure_details .code )
186+
187+ entry = self .entity_cache .get (key )
188+ if entry is None or failure_details .code not in entry .active_errors :
189+ existing_errors = set (entry .active_errors ) if entry else set ()
190+ existing_errors .add (failure_details .code )
191+ pending_cache_updates [key ] = EntityCacheEntry (active_errors = existing_errors )
202192
203193 # The COMPONENT_RESET recommended action requires that the GPU_UUID is present on the
204194 # unhealthy HealthEvent. Sending an event with COMPONENT_RESET that is missing the GPU_UUID
@@ -244,13 +234,7 @@ def health_event_occurred(self, health_details: dict[str, dcgmtypes.HealthDetail
244234 processingStrategy = self ._processing_strategy ,
245235 )
246236 )
247- severity = (
248- "non_fatal"
249- if self .dcgm_health_conditions_categorization_mapping_config [watch_name ] == "NonFatal"
250- else "fatal"
251- )
252- # Defer metric update until after successful send
253- pending_metric_updates .append ((check_name , gpu_id , severity , 1 ))
237+ pending_metric_updates .append ((check_name , gpu_id , 1 ))
254238 else :
255239
256240 entity = platformconnector_pb2 .Entity (entityType = self ._component_class , entityValue = str (gpu_id ))
@@ -270,63 +254,49 @@ def health_event_occurred(self, health_details: dict[str, dcgmtypes.HealthDetail
270254 )
271255
272256 key = self ._build_cache_key (check_name , entity .entityType , entity .entityValue )
273- if (
274- key not in self .entity_cache
275- or self .entity_cache [key ].isFatal
276- or not self .entity_cache [key ].isHealthy
277- ):
278- # Don't send health events for non-fatal health conditions when they are healthy
279- # they will get published as node conditions which we don't want to do to have
280- # consistency in the health events publishing logic
281- if self .dcgm_health_conditions_categorization_mapping_config [watch_name ] == "NonFatal" :
282- log .debug (f"Skipping non-fatal health event for watch { watch_name } " )
283- # For non-fatal events that are not sent, update cache immediately
284- # (no race condition since nothing is being sent)
285- self .entity_cache [key ] = CachedEntityState (isFatal = False , isHealthy = True )
286- log .info (f"Updated cache for key { key } with value { self .entity_cache [key ]} " )
287- metrics .dcgm_health_active_events .labels (
288- event_type = check_name , gpu_id = gpu_id , severity = "non_fatal"
289- ).set (0 )
290- else :
291- # Defer cache update until after successful send
292- pending_cache_updates [key ] = CachedEntityState (isFatal = False , isHealthy = True )
293- event_metadata = {}
294- chassis_serial = self ._metadata_reader .get_chassis_serial ()
295- if chassis_serial :
296- event_metadata ["chassis_serial" ] = chassis_serial
297-
298- health_events .append (
299- platformconnector_pb2 .HealthEvent (
300- version = self ._version ,
301- agent = self ._agent ,
302- componentClass = self ._component_class ,
303- checkName = check_name ,
304- generatedTimestamp = timestamp ,
305- isFatal = False ,
306- isHealthy = True ,
307- errorCode = [],
308- entitiesImpacted = entities_impacted ,
309- message = f"GPU { self ._get_dcgm_watch (watch_name )} watch reported no errors" ,
310- recommendedAction = platformconnector_pb2 .NONE ,
311- nodeName = self ._node_name ,
312- metadata = event_metadata ,
313- processingStrategy = self ._processing_strategy ,
314- )
257+ entry = self .entity_cache .get (key )
258+
259+ if entry is None or not entry .is_healthy :
260+ had_errors = entry is not None and not entry .is_healthy
261+ pending_cache_updates [key ] = EntityCacheEntry ()
262+
263+ event_metadata = {}
264+ chassis_serial = self ._metadata_reader .get_chassis_serial ()
265+ if chassis_serial :
266+ event_metadata ["chassis_serial" ] = chassis_serial
267+
268+ health_events .append (
269+ platformconnector_pb2 .HealthEvent (
270+ version = self ._version ,
271+ agent = self ._agent ,
272+ componentClass = self ._component_class ,
273+ checkName = check_name ,
274+ generatedTimestamp = timestamp ,
275+ isFatal = False ,
276+ isHealthy = True ,
277+ errorCode = [],
278+ entitiesImpacted = entities_impacted ,
279+ message = f"GPU { self ._get_dcgm_watch (watch_name )} watch reported no errors" ,
280+ recommendedAction = platformconnector_pb2 .NONE ,
281+ nodeName = self ._node_name ,
282+ metadata = event_metadata ,
283+ processingStrategy = self ._processing_strategy ,
315284 )
316- # Defer metric update until after successful send
317- pending_metric_updates .append ((check_name , gpu_id , "fatal" , 0 ))
285+ )
286+ if had_errors :
287+ pending_metric_updates .append ((check_name , gpu_id , 0 ))
318288 log .debug (f"dcgm health event is { health_events } " )
319289 if len (health_events ):
320290 try :
321291 if self .send_health_event_with_retries (health_events ):
322292 # Only update cache and metrics after successful send
323293 for key , state in pending_cache_updates .items ():
324294 self .entity_cache [key ] = state
325- log .info (f"Updated cache for key { key } with value { state } after successful send" )
326- for event_type , gpu_id , severity , value in pending_metric_updates :
327- metrics . dcgm_health_active_events . labels (
328- event_type = event_type , gpu_id = gpu_id , severity = severity
329- ).set (value )
295+ log .info (
296+ f"Updated cache for key { key } with value { self . entity_cache [ key ] } after successful send"
297+ )
298+ for event_type , gpu_id , value in pending_metric_updates :
299+ metrics . dcgm_health_active_events . labels ( event_type = event_type , gpu_id = gpu_id ).set (value )
330300 except Exception as e :
331301 log .error (f"Exception while sending health events: { e } " )
332302
@@ -376,7 +346,8 @@ def dcgm_connectivity_failed(self) -> None:
376346 health_events = []
377347 check_name = "GpuDcgmConnectivityFailure"
378348 key = self ._build_cache_key (check_name , "DCGM" , "ALL" )
379- if key not in self .entity_cache or self .entity_cache [key ].isHealthy :
349+ entry = self .entity_cache .get (key )
350+ if entry is None or entry .is_healthy :
380351 event_metadata = {}
381352 chassis_serial = self ._metadata_reader .get_chassis_serial ()
382353 if chassis_serial :
@@ -403,12 +374,11 @@ def dcgm_connectivity_failed(self) -> None:
403374 if len (health_events ):
404375 try :
405376 if self .send_health_event_with_retries (health_events ):
406- # Only update cache after successful send
407- self .entity_cache [key ] = CachedEntityState (isFatal = True , isHealthy = False )
408- log .info (f"Updated cache for key { key } after successful send" )
409- metrics .dcgm_health_active_events .labels (
410- event_type = check_name , gpu_id = "" , severity = "fatal"
411- ).set (1 )
377+ self .entity_cache [key ] = EntityCacheEntry (active_errors = {"DCGM_CONNECTIVITY_ERROR" })
378+ log .info (
379+ f"Updated cache for key { key } with value { self .entity_cache [key ]} after successful send"
380+ )
381+ metrics .dcgm_health_active_events .labels (event_type = check_name , gpu_id = "" ).set (1 )
412382 except Exception as e :
413383 log .error (f"Exception while sending DCGM connectivity failure events: { e } " )
414384 raise
0 commit comments