|
| 1 | +"""Coordinator housekeeping: diagnostics, cleanup, and connection event handling.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +import logging |
| 6 | +import time |
| 7 | +from datetime import UTC, datetime |
| 8 | +from typing import TYPE_CHECKING, Any |
| 9 | + |
| 10 | +from .const import ( |
| 11 | + DOMAIN, |
| 12 | + MAGIC_SOC_DESCRIPTOR, |
| 13 | + PREDICTED_SOC_DESCRIPTOR, |
| 14 | +) |
| 15 | +from .debug import debug_enabled |
| 16 | +from .soc_wiring import ( |
| 17 | + _descriptor_float, |
| 18 | + _get_aux_kw, |
| 19 | + anchor_driving_session, |
| 20 | + anchor_soc_session, |
| 21 | + end_driving_session, |
| 22 | +) |
| 23 | +from .utils import redact_vin |
| 24 | + |
| 25 | +if TYPE_CHECKING: |
| 26 | + from .coordinator import CardataCoordinator |
| 27 | + |
| 28 | +_LOGGER = logging.getLogger(__name__) |
| 29 | + |
| 30 | + |
| 31 | +async def async_handle_connection_event( |
| 32 | + coordinator: CardataCoordinator, status: str, reason: str | None = None |
| 33 | +) -> None: |
| 34 | + """Handle MQTT connection status change.""" |
| 35 | + coordinator.connection_status = status |
| 36 | + if reason: |
| 37 | + coordinator.last_disconnect_reason = reason |
| 38 | + elif status == "connected": |
| 39 | + coordinator.last_disconnect_reason = None |
| 40 | + |
| 41 | + async with coordinator._lock: |
| 42 | + for vin in coordinator._soc_predictor.get_tracked_vins(): |
| 43 | + vehicle_state = coordinator.data.get(vin) |
| 44 | + if not vehicle_state: |
| 45 | + continue |
| 46 | + |
| 47 | + status_state = vehicle_state.get("vehicle.drivetrain.electricEngine.charging.status") |
| 48 | + if status_state and status_state.value: |
| 49 | + status_val = str(status_state.value) |
| 50 | + coordinator._soc_predictor.update_charging_status(vin, status_val) |
| 51 | + |
| 52 | + if coordinator._soc_predictor.is_charging( |
| 53 | + vin |
| 54 | + ) and not coordinator._soc_predictor.has_active_session(vin): |
| 55 | + _LOGGER.info( |
| 56 | + "Reconnection: restoring charging session for %s (status: %s)", |
| 57 | + redact_vin(vin), |
| 58 | + status_val, |
| 59 | + ) |
| 60 | + manual_cap = coordinator.get_manual_battery_capacity(vin) |
| 61 | + anchor_soc_session( |
| 62 | + coordinator._soc_predictor, |
| 63 | + coordinator._magic_soc, |
| 64 | + vin, |
| 65 | + vehicle_state, |
| 66 | + manual_cap, |
| 67 | + ) |
| 68 | + |
| 69 | + voltage = _descriptor_float( |
| 70 | + vehicle_state.get("vehicle.drivetrain.electricEngine.charging.acVoltage") |
| 71 | + ) |
| 72 | + current = _descriptor_float( |
| 73 | + vehicle_state.get("vehicle.drivetrain.electricEngine.charging.acAmpere") |
| 74 | + ) |
| 75 | + phases = _descriptor_float( |
| 76 | + vehicle_state.get("vehicle.drivetrain.electricEngine.charging.phaseNumber") |
| 77 | + ) |
| 78 | + |
| 79 | + if voltage and current: |
| 80 | + aux_kw = _get_aux_kw(vehicle_state) |
| 81 | + coordinator._soc_predictor.update_ac_charging_data(vin, voltage, current, phases, aux_kw) |
| 82 | + _LOGGER.info( |
| 83 | + "Reconnection: restored AC charging data for %s (%.1fV × %.1fA)", |
| 84 | + redact_vin(vin), |
| 85 | + voltage, |
| 86 | + current, |
| 87 | + ) |
| 88 | + await async_log_diagnostics(coordinator) |
| 89 | + |
| 90 | + |
| 91 | +async def async_log_diagnostics(coordinator: CardataCoordinator) -> None: |
| 92 | + """Thread-safe async version of diagnostics logging.""" |
| 93 | + if debug_enabled(): |
| 94 | + _LOGGER.debug( |
| 95 | + "Stream heartbeat: status=%s last_reason=%s last_message=%s", |
| 96 | + coordinator.connection_status, |
| 97 | + coordinator.last_disconnect_reason, |
| 98 | + coordinator.last_message_at, |
| 99 | + ) |
| 100 | + coordinator._safe_dispatcher_send(coordinator.signal_diagnostics) |
| 101 | + |
| 102 | + # Check for derived isMoving state changes (GPS staleness timeout) |
| 103 | + tracked_vins = coordinator._motion_detector.get_tracked_vins() |
| 104 | + for vin in tracked_vins: |
| 105 | + if coordinator._motion_detector.has_signaled_entity(vin): |
| 106 | + current_derived = coordinator.get_derived_is_moving(vin) |
| 107 | + vehicle_data = coordinator.data.get(vin) |
| 108 | + bmw_provided = vehicle_data.get("vehicle.isMoving") if vehicle_data else None |
| 109 | + |
| 110 | + if bmw_provided is None and current_derived is not None: |
| 111 | + last_sent = coordinator._last_derived_is_moving.get(vin) |
| 112 | + if last_sent != current_derived: |
| 113 | + _LOGGER.debug( |
| 114 | + "isMoving state changed for %s: %s -> %s", |
| 115 | + redact_vin(vin), |
| 116 | + last_sent, |
| 117 | + current_derived, |
| 118 | + ) |
| 119 | + coordinator._last_derived_is_moving[vin] = current_derived |
| 120 | + coordinator._safe_dispatcher_send(coordinator.signal_update, vin, "vehicle.isMoving") |
| 121 | + |
| 122 | + if last_sent is True and current_derived is False: |
| 123 | + runtime = coordinator.hass.data.get(DOMAIN, {}).get(coordinator.entry_id) |
| 124 | + if runtime is not None: |
| 125 | + runtime.request_trip_poll(vin) |
| 126 | + _end_driving_session_from_state(coordinator, vin) |
| 127 | + if coordinator._magic_soc.has_signaled_magic_soc_entity(vin): |
| 128 | + coordinator._safe_dispatcher_send(coordinator.signal_update, vin, MAGIC_SOC_DESCRIPTOR) |
| 129 | + |
| 130 | + if last_sent is not True and current_derived is True: |
| 131 | + _anchor_driving_session_from_state(coordinator, vin) |
| 132 | + if coordinator._magic_soc.has_signaled_magic_soc_entity(vin): |
| 133 | + coordinator._safe_dispatcher_send(coordinator.signal_update, vin, MAGIC_SOC_DESCRIPTOR) |
| 134 | + |
| 135 | + # Periodic AC energy accumulation |
| 136 | + schedule_soc_debounce = False |
| 137 | + updated_vins = coordinator._soc_predictor.periodic_update_all() |
| 138 | + for vin in updated_vins: |
| 139 | + if coordinator._soc_predictor.has_signaled_entity(vin): |
| 140 | + if coordinator._pending_manager.add_update(vin, PREDICTED_SOC_DESCRIPTOR): |
| 141 | + schedule_soc_debounce = True |
| 142 | + if coordinator._magic_soc.has_signaled_magic_soc_entity(vin): |
| 143 | + if coordinator._pending_manager.add_update(vin, MAGIC_SOC_DESCRIPTOR): |
| 144 | + schedule_soc_debounce = True |
| 145 | + |
| 146 | + # Periodic predicted SOC recalculation during charging |
| 147 | + for vin in coordinator._soc_predictor.get_tracked_vins(): |
| 148 | + if coordinator._soc_predictor.is_charging(vin) and coordinator._soc_predictor.has_signaled_entity(vin): |
| 149 | + current_estimate = coordinator.get_predicted_soc(vin) |
| 150 | + if current_estimate is not None: |
| 151 | + last_soc_sent = coordinator._last_predicted_soc_sent.get(vin) |
| 152 | + if current_estimate != last_soc_sent: |
| 153 | + coordinator._last_predicted_soc_sent[vin] = current_estimate |
| 154 | + if coordinator._pending_manager.add_update(vin, PREDICTED_SOC_DESCRIPTOR): |
| 155 | + schedule_soc_debounce = True |
| 156 | + if coordinator._magic_soc.has_signaled_magic_soc_entity(vin): |
| 157 | + if coordinator._pending_manager.add_update(vin, MAGIC_SOC_DESCRIPTOR): |
| 158 | + schedule_soc_debounce = True |
| 159 | + if debug_enabled(): |
| 160 | + _LOGGER.debug( |
| 161 | + "Periodic SOC update for %s: %.1f%% (was: %s)", |
| 162 | + redact_vin(vin), |
| 163 | + current_estimate, |
| 164 | + f"{last_soc_sent:.1f}%" if last_soc_sent else "None", |
| 165 | + ) |
| 166 | + |
| 167 | + if schedule_soc_debounce: |
| 168 | + await coordinator._async_schedule_debounced_update() |
| 169 | + |
| 170 | + # Periodically cleanup stale VIN tracking data and old descriptors |
| 171 | + coordinator._cleanup_counter += 1 |
| 172 | + if coordinator._cleanup_counter >= coordinator._CLEANUP_INTERVAL: |
| 173 | + coordinator._cleanup_counter = 0 |
| 174 | + await async_cleanup_stale_vins(coordinator) |
| 175 | + await async_cleanup_old_descriptors(coordinator) |
| 176 | + |
| 177 | + # Check for stale pending updates (debounce timer failed to fire) |
| 178 | + now = datetime.now(UTC) |
| 179 | + await async_check_stale_pending_updates(coordinator, now) |
| 180 | + |
| 181 | + |
| 182 | +async def async_check_stale_pending_updates(coordinator: CardataCoordinator, now: datetime) -> None: |
| 183 | + """Clear pending updates if they've been accumulating too long.""" |
| 184 | + cleared = coordinator._pending_manager.check_and_clear_stale(now) |
| 185 | + if cleared > 0: |
| 186 | + async with coordinator._debounce_lock: |
| 187 | + if coordinator._update_debounce_handle is not None: |
| 188 | + coordinator._update_debounce_handle() |
| 189 | + coordinator._update_debounce_handle = None |
| 190 | + |
| 191 | + |
| 192 | +async def async_cleanup_stale_vins(coordinator: CardataCoordinator) -> None: |
| 193 | + """Remove tracking data for VINs no longer in coordinator.data.""" |
| 194 | + async with coordinator._lock: |
| 195 | + valid_vins = set(coordinator.data.keys()) |
| 196 | + if not valid_vins: |
| 197 | + return |
| 198 | + |
| 199 | + tracking_dicts: list[dict[str, Any]] = [ |
| 200 | + coordinator._last_derived_is_moving, |
| 201 | + coordinator._last_vin_message_at, |
| 202 | + coordinator._last_poll_at, |
| 203 | + coordinator._last_predicted_soc_sent, |
| 204 | + ] |
| 205 | + |
| 206 | + stale_vins: set[str] = set() |
| 207 | + for d in tracking_dicts: |
| 208 | + for k in d.keys(): |
| 209 | + base_vin = k.removesuffix("_bmw") |
| 210 | + if base_vin not in valid_vins: |
| 211 | + stale_vins.add(k) |
| 212 | + |
| 213 | + stale_vins.update(vin for vin in coordinator._motion_detector.get_tracked_vins() if vin not in valid_vins) |
| 214 | + stale_vins.update(vin for vin in coordinator._soc_predictor.get_tracked_vins() if vin not in valid_vins) |
| 215 | + stale_vins.update(vin for vin in coordinator._magic_soc.get_tracked_vins() if vin not in valid_vins) |
| 216 | + |
| 217 | + if stale_vins: |
| 218 | + for vin in stale_vins: |
| 219 | + for d in tracking_dicts: |
| 220 | + d.pop(vin, None) |
| 221 | + coordinator._motion_detector.cleanup_vin(vin) |
| 222 | + coordinator._soc_predictor.cleanup_vin(vin) |
| 223 | + coordinator._magic_soc.cleanup_vin(vin) |
| 224 | + coordinator._pending_manager.remove_vin(vin) |
| 225 | + _LOGGER.debug( |
| 226 | + "Cleaned up tracking data for %d stale VIN(s)", |
| 227 | + len(stale_vins), |
| 228 | + ) |
| 229 | + |
| 230 | + |
| 231 | +async def async_cleanup_old_descriptors(coordinator: CardataCoordinator) -> None: |
| 232 | + """Remove descriptors that haven't been updated in MAX_DESCRIPTOR_AGE_SECONDS.""" |
| 233 | + now = time.time() |
| 234 | + max_age = coordinator._MAX_DESCRIPTOR_AGE_SECONDS |
| 235 | + total_evicted = 0 |
| 236 | + |
| 237 | + async with coordinator._lock: |
| 238 | + for _vin, vehicle_state in list(coordinator.data.items()): |
| 239 | + old_descriptors = [ |
| 240 | + desc |
| 241 | + for desc, state in vehicle_state.items() |
| 242 | + if state.last_seen > 0 and (now - state.last_seen) > max_age |
| 243 | + ] |
| 244 | + for desc in old_descriptors: |
| 245 | + del vehicle_state[desc] |
| 246 | + total_evicted += 1 |
| 247 | + |
| 248 | + if total_evicted > 0: |
| 249 | + coordinator._descriptors_evicted_count += total_evicted |
| 250 | + _LOGGER.debug( |
| 251 | + "Evicted %d old descriptor(s) not updated in %d days", |
| 252 | + total_evicted, |
| 253 | + max_age // 86400, |
| 254 | + ) |
| 255 | + |
| 256 | + |
| 257 | +def _anchor_driving_session_from_state(coordinator: CardataCoordinator, vin: str) -> None: |
| 258 | + """Anchor driving session from stored vehicle state.""" |
| 259 | + vehicle_state = coordinator.data.get(vin) |
| 260 | + if vehicle_state: |
| 261 | + manual_cap = coordinator.get_manual_battery_capacity(vin) |
| 262 | + anchor_driving_session(coordinator._magic_soc, coordinator._soc_predictor, vin, vehicle_state, manual_cap) |
| 263 | + |
| 264 | + |
| 265 | +def _end_driving_session_from_state(coordinator: CardataCoordinator, vin: str) -> None: |
| 266 | + """End driving session from stored vehicle state.""" |
| 267 | + vehicle_state = coordinator.data.get(vin) |
| 268 | + if vehicle_state: |
| 269 | + end_driving_session(coordinator._magic_soc, vin, vehicle_state) |
0 commit comments