diff --git a/homeassistant/components/ohme/__init__.py b/homeassistant/components/ohme/__init__.py index 6ed86c183ef9b7..dab3780c3440a7 100644 --- a/homeassistant/components/ohme/__init__.py +++ b/homeassistant/components/ohme/__init__.py @@ -1,5 +1,7 @@ """Set up ohme integration.""" +import logging + from ohme import ApiException, AuthException, OhmeApiClient from homeassistant.const import CONF_EMAIL, CONF_PASSWORD @@ -16,10 +18,13 @@ OhmeDeviceInfoCoordinator, OhmeRuntimeData, ) +from .history import async_ensure_energy_history, async_remove_energy_history from .services import async_setup_services CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN) +_LOGGER = logging.getLogger(__name__) + async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up Ohme integration.""" @@ -30,6 +35,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: OhmeConfigEntry) -> bool: """Set up Ohme from a config entry.""" + _LOGGER.debug("Setting up Ohme config entry %s", entry.entry_id) client = OhmeApiClient( email=entry.data[CONF_EMAIL], @@ -53,22 +59,54 @@ async def async_setup_entry(hass: HomeAssistant, entry: OhmeConfigEntry) -> bool translation_key="api_failed", translation_domain=DOMAIN ) from e - coordinators = ( - OhmeChargeSessionCoordinator(hass, entry, client), - OhmeDeviceInfoCoordinator(hass, entry, client), + charge_session_coordinator = OhmeChargeSessionCoordinator(hass, entry, client) + device_info_coordinator = OhmeDeviceInfoCoordinator(hass, entry, client) + + if entry.unique_id != client.serial: + _LOGGER.debug( + "Updating Ohme config entry %s unique_id to charger serial %s", + entry.entry_id, + client.serial, + ) + hass.config_entries.async_update_entry(entry, unique_id=client.serial) + + entry.runtime_data = OhmeRuntimeData( + charge_session_coordinator=charge_session_coordinator, + device_info_coordinator=device_info_coordinator, ) - for coordinator in coordinators: + for coordinator in (charge_session_coordinator, device_info_coordinator): await coordinator.async_config_entry_first_refresh() - entry.runtime_data = OhmeRuntimeData(*coordinators) - await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) + try: + result = await async_ensure_energy_history(hass, entry) + except Exception: + _LOGGER.exception("Failed to initialize Ohme energy history sync") + else: + _LOGGER.debug( + "Initialized Ohme energy history sync for entry %s: %s", + entry.entry_id, + result, + ) + + charge_session_coordinator.seed_history_sync_state() + charge_session_coordinator.enable_history_sync() + return True async def async_unload_entry(hass: HomeAssistant, entry: OhmeConfigEntry) -> bool: """Unload a config entry.""" - + _LOGGER.debug("Unloading Ohme config entry %s", entry.entry_id) + entry.runtime_data.charge_session_coordinator.disable_history_sync() return await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + + +async def async_remove_entry(hass: HomeAssistant, entry: OhmeConfigEntry) -> None: + """Remove a config entry and its imported recorder history.""" + try: + await async_remove_energy_history(hass, entry) + except Exception: + _LOGGER.exception("Failed to remove Ohme energy history") diff --git a/homeassistant/components/ohme/config_flow.py b/homeassistant/components/ohme/config_flow.py index 1037c3a7c8bebb..1363e695373e3a 100644 --- a/homeassistant/components/ohme/config_flow.py +++ b/homeassistant/components/ohme/config_flow.py @@ -1,20 +1,30 @@ """Config flow for ohme integration.""" +from __future__ import annotations + from collections.abc import Mapping from typing import Any from ohme import ApiException, AuthException, OhmeApiClient import voluptuous as vol -from homeassistant.config_entries import ConfigFlow, ConfigFlowResult +from homeassistant.config_entries import ( + ConfigEntry, + ConfigFlow, + ConfigFlowResult, + OptionsFlowWithReload, +) from homeassistant.const import CONF_EMAIL, CONF_PASSWORD from homeassistant.helpers.selector import ( + NumberSelector, + NumberSelectorConfig, + NumberSelectorMode, TextSelector, TextSelectorConfig, TextSelectorType, ) -from .const import DOMAIN +from .const import CONF_BACKFILL_DAYS, DEFAULT_BACKFILL_DAYS, DOMAIN USER_SCHEMA = vol.Schema( { @@ -48,6 +58,11 @@ class OhmeConfigFlow(ConfigFlow, domain=DOMAIN): """Config flow.""" + @staticmethod + def async_get_options_flow(config_entry: ConfigEntry) -> OhmeOptionsFlow: + """Return the options flow.""" + return OhmeOptionsFlow() + async def async_step_user( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: @@ -63,7 +78,9 @@ async def async_step_user( ) if not errors: return self.async_create_entry( - title=user_input[CONF_EMAIL], data=user_input + title=user_input[CONF_EMAIL], + data=user_input, + options={CONF_BACKFILL_DAYS: DEFAULT_BACKFILL_DAYS}, ) return self.async_show_form( @@ -137,3 +154,34 @@ async def _validate_account(self, email: str, password: str) -> dict[str, str]: errors["base"] = "unknown" return errors + + +class OhmeOptionsFlow(OptionsFlowWithReload): + """Handle Ohme options.""" + + async def async_step_init( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Manage integration options.""" + if user_input is not None: + return self.async_create_entry(title="", data=user_input) + + return self.async_show_form( + step_id="init", + data_schema=vol.Schema( + { + vol.Required( + CONF_BACKFILL_DAYS, + default=self.config_entry.options.get( + CONF_BACKFILL_DAYS, DEFAULT_BACKFILL_DAYS + ), + ): NumberSelector( + NumberSelectorConfig( + min=0, + mode=NumberSelectorMode.BOX, + step=1, + ) + ) + } + ), + ) diff --git a/homeassistant/components/ohme/const.py b/homeassistant/components/ohme/const.py index d97f6e3cfd7d87..7a7dae5016e4c9 100644 --- a/homeassistant/components/ohme/const.py +++ b/homeassistant/components/ohme/const.py @@ -2,6 +2,9 @@ from homeassistant.const import Platform +CONF_BACKFILL_DAYS = "backfill_days" +DEFAULT_BACKFILL_DAYS = 365 +DEFAULT_RECENT_SYNC_DAYS = 7 DOMAIN = "ohme" PLATFORMS = [ Platform.BUTTON, diff --git a/homeassistant/components/ohme/coordinator.py b/homeassistant/components/ohme/coordinator.py index ff66942781f578..29a2c5acfaf8a9 100644 --- a/homeassistant/components/ohme/coordinator.py +++ b/homeassistant/components/ohme/coordinator.py @@ -1,20 +1,41 @@ """Ohme coordinators.""" from abc import abstractmethod +import asyncio +from collections.abc import Callable from dataclasses import dataclass -from datetime import timedelta +from datetime import datetime, timedelta import logging +from typing import Any, cast -from ohme import ApiException, OhmeApiClient +from ohme import ApiException, ChargerStatus, OhmeApiClient from homeassistant.config_entries import ConfigEntry -from homeassistant.core import HomeAssistant +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.event import async_call_later, async_track_time_interval from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from .const import DOMAIN +from .history import async_sync_repair_energy_history, async_sync_session_energy_history _LOGGER = logging.getLogger(__name__) +DAILY_REPAIR_INTERVAL = timedelta(days=1) +FINALIZED_SYNC_RETRY_DELAY = timedelta(minutes=15) + + +def _client_session_timestamp(client: OhmeApiClient, attribute: str) -> datetime | None: + """Return a typed optional session timestamp from the Ohme client.""" + return cast(datetime | None, getattr(client, attribute, None)) + + +@dataclass(slots=True) +class HistorySyncRequest: + """Describe a queued history sync request.""" + + reason: str + session_start: datetime | None = None + @dataclass() class OhmeRuntimeData: @@ -27,7 +48,7 @@ class OhmeRuntimeData: type OhmeConfigEntry = ConfigEntry[OhmeRuntimeData] -class OhmeBaseCoordinator(DataUpdateCoordinator[None]): +class OhmeBaseCoordinator(DataUpdateCoordinator[Any]): """Base for all Ohme coordinators.""" config_entry: OhmeConfigEntry @@ -50,30 +71,275 @@ def __init__( self.name = f"Ohme {self.coordinator_name}" self.client = client - async def _async_update_data(self) -> None: + async def _async_update_data(self) -> Any: """Fetch data from API endpoint.""" try: - await self._internal_update_data() + return await self._internal_update_data() except ApiException as e: raise UpdateFailed( translation_key="api_failed", translation_domain=DOMAIN ) from e @abstractmethod - async def _internal_update_data(self) -> None: + async def _internal_update_data(self) -> Any: """Update coordinator data.""" class OhmeChargeSessionCoordinator(OhmeBaseCoordinator): - """Coordinator to pull all updates from the API.""" + """Coordinator to pull live charge-session updates from the API.""" coordinator_name = "Charge Sessions" _default_update_interval = timedelta(seconds=30) + def __init__( + self, hass: HomeAssistant, config_entry: OhmeConfigEntry, client: OhmeApiClient + ) -> None: + """Initialize the charge-session coordinator.""" + super().__init__(hass, config_entry, client) + self._history_sync_enabled = False + self._history_sync_task: asyncio.Task[None] | None = None + self._pending_history_sync: HistorySyncRequest | None = None + self._remove_daily_repair_listener: Callable[[], None] | None = None + self._remove_delayed_retry_listener: Callable[[], None] | None = None + self._tracked_session_start: datetime | None = None + self._last_status: ChargerStatus | None = None + self._completed_session_marker: tuple[datetime, datetime] | None = None + + def enable_history_sync(self) -> None: + """Enable automatic history-sync orchestration.""" + if self._history_sync_enabled: + return + + self._history_sync_enabled = True + _LOGGER.debug("Enabled Ohme history sync orchestration for %s", self.name) + self._remove_daily_repair_listener = async_track_time_interval( + self.hass, + self._async_daily_repair_listener, + DAILY_REPAIR_INTERVAL, + ) + + def disable_history_sync(self) -> None: + """Disable automatic history-sync orchestration.""" + self._history_sync_enabled = False + self._pending_history_sync = None + self._tracked_session_start = None + _LOGGER.debug("Disabled Ohme history sync orchestration for %s", self.name) + if self._remove_daily_repair_listener is not None: + self._remove_daily_repair_listener() + self._remove_daily_repair_listener = None + self._cancel_delayed_retry() + + def seed_history_sync_state(self) -> None: + """Seed transition tracking from the coordinator's current client state.""" + status = self.client.status + session_start = _client_session_timestamp(self.client, "session_start") + session_finish = _client_session_timestamp(self.client, "session_finish") + + self._last_status = status + if session_start is not None and session_finish is not None: + self._completed_session_marker = (session_start, session_finish) + self._tracked_session_start = None + return + + self._completed_session_marker = None + self._tracked_session_start = ( + session_start if session_start is not None and session_finish is None else None + ) + _LOGGER.debug( + "Seeded Ohme history sync state: status=%s session_start=%s session_finish=%s tracked_session_start=%s completed_marker=%s", + status, + session_start, + session_finish, + self._tracked_session_start, + self._completed_session_marker, + ) + + async def _async_daily_repair_listener(self, _: datetime) -> None: + """Run the bounded daily repair sync.""" + self._queue_history_sync(HistorySyncRequest(reason="daily_repair")) + + def _merge_history_sync_requests( + self, + current: HistorySyncRequest | None, + new: HistorySyncRequest, + ) -> HistorySyncRequest: + """Merge overlapping history sync requests conservatively.""" + if current is None: + return new + + if current.session_start is None or new.session_start is None: + return current if current.session_start is None else new + + return new if new.session_start < current.session_start else current + + def _queue_history_sync(self, request: HistorySyncRequest) -> None: + """Schedule a non-overlapping history sync request.""" + if not self._history_sync_enabled: + return + + if self._history_sync_task and not self._history_sync_task.done(): + self._pending_history_sync = self._merge_history_sync_requests( + self._pending_history_sync, request + ) + _LOGGER.debug( + "Merged pending Ohme history sync request: reason=%s session_start=%s", + self._pending_history_sync.reason, + self._pending_history_sync.session_start, + ) + return + + _LOGGER.debug( + "Queueing Ohme history sync: reason=%s session_start=%s", + request.reason, + request.session_start, + ) + self._history_sync_task = self.config_entry.async_create_background_task( + self.hass, + self._async_run_history_sync(request), + f"Ohme history sync ({request.reason})", + ) + + async def _async_run_history_sync(self, request: HistorySyncRequest) -> None: + """Run a history sync request and schedule any queued follow-up.""" + try: + if request.session_start is None: + result = await async_sync_repair_energy_history( + self.hass, + self.config_entry, + reason=request.reason, + ) + else: + result = await async_sync_session_energy_history( + self.hass, + self.config_entry, + session_start=request.session_start, + reason=request.reason, + ) + except Exception: + _LOGGER.exception("Failed Ohme history sync (%s)", request.reason) + else: + _LOGGER.debug("Completed Ohme history sync: %s", result) + finally: + self._history_sync_task = None + pending_request = self._pending_history_sync + self._pending_history_sync = None + if pending_request is not None and self._history_sync_enabled: + self._queue_history_sync(pending_request) + + def _schedule_delayed_retry(self, session_start: datetime | None) -> None: + """Schedule one delayed sync retry for late Ohme finalization.""" + self._cancel_delayed_retry() + _LOGGER.debug( + "Scheduling delayed Ohme history sync retry: session_start=%s delay=%s", + session_start, + FINALIZED_SYNC_RETRY_DELAY, + ) + + @callback + def _retry(_: datetime) -> None: + self._remove_delayed_retry_listener = None + _LOGGER.debug( + "Running delayed Ohme history sync retry: session_start=%s", + session_start, + ) + self._queue_history_sync( + HistorySyncRequest( + reason="session_finalized_retry", + session_start=session_start, + ) + ) + + self._remove_delayed_retry_listener = async_call_later( + self.hass, + FINALIZED_SYNC_RETRY_DELAY, + _retry, + ) + + def _cancel_delayed_retry(self) -> None: + """Cancel any pending delayed retry.""" + if self._remove_delayed_retry_listener is not None: + self._remove_delayed_retry_listener() + self._remove_delayed_retry_listener = None + + def _handle_charge_session_transition(self) -> None: + """Detect finalized-session boundaries and trigger bounded history syncs.""" + status = self.client.status + session_start = _client_session_timestamp(self.client, "session_start") + session_finish = _client_session_timestamp(self.client, "session_finish") + was_active_status = self._last_status in { + ChargerStatus.CHARGING, + ChargerStatus.PLUGGED_IN, + ChargerStatus.PAUSED, + } + + if session_start is not None and session_finish is None: + self._tracked_session_start = session_start + + if session_start is not None and session_finish is not None: + completed_marker = (session_start, session_finish) + if completed_marker != self._completed_session_marker: + self._completed_session_marker = completed_marker + self._tracked_session_start = None + _LOGGER.debug( + "Detected finalized Ohme session marker: status=%s session_start=%s session_finish=%s", + status, + session_start, + session_finish, + ) + self._queue_history_sync( + HistorySyncRequest( + reason="session_finalized", + session_start=session_start, + ) + ) + self._schedule_delayed_retry(session_start) + elif ( + self._tracked_session_start is not None + and self._last_status is not None + and self._last_status is not ChargerStatus.UNPLUGGED + and status is ChargerStatus.UNPLUGGED + ): + session_anchor = self._tracked_session_start + self._tracked_session_start = None + _LOGGER.debug( + "Detected Ohme unplug after tracked session: last_status=%s session_start=%s", + self._last_status, + session_anchor, + ) + self._queue_history_sync( + HistorySyncRequest( + reason="session_unplugged", + session_start=session_anchor, + ) + ) + self._schedule_delayed_retry(session_anchor) + elif was_active_status and status is ChargerStatus.FINISHED: + _LOGGER.debug( + "Observed Ohme finished state without session_finish; waiting for disconnect before syncing" + ) + # A real 2026-04-05 Ohme probe stayed FINISHED_CHARGE for ~87 minutes + # with stale summary/session totals; the meaningful summary update only + # appeared once the charger transitioned to DISCONNECTED / unplugged. + elif was_active_status and status is ChargerStatus.UNPLUGGED: + _LOGGER.debug( + "Detected Ohme unplug without usable session markers; running bounded repair sync" + ) + self._queue_history_sync( + HistorySyncRequest(reason="session_marker_missing") + ) + self._schedule_delayed_retry(None) + elif status is ChargerStatus.UNPLUGGED and session_start is None: + self._tracked_session_start = None + + self._last_status = status + async def _internal_update_data(self) -> None: - """Fetch data from API endpoint.""" + """Fetch data from the live charge-session endpoint.""" await self.client.async_get_charge_session() + if self._history_sync_enabled: + self._handle_charge_session_transition() + class OhmeDeviceInfoCoordinator(OhmeBaseCoordinator): """Coordinator to pull device info and charger settings from the API.""" diff --git a/homeassistant/components/ohme/history.py b/homeassistant/components/ohme/history.py new file mode 100644 index 00000000000000..fb72ea7bdd0489 --- /dev/null +++ b/homeassistant/components/ohme/history.py @@ -0,0 +1,689 @@ +"""Helpers for syncing finalized Ohme energy history into recorder statistics.""" + +from __future__ import annotations + +import asyncio +from collections.abc import Mapping +from dataclasses import dataclass +from datetime import datetime, timedelta +import logging +from typing import TYPE_CHECKING, Any, cast + +from ohme import OhmeApiClient, SummaryGranularity + +from homeassistant.components.recorder import get_instance +from homeassistant.components.recorder.models import ( + StatisticData, + StatisticMeanType, + StatisticMetaData, +) +from homeassistant.components.recorder.statistics import ( + async_add_external_statistics, + get_last_statistics, + statistics_during_period, +) +from homeassistant.const import UnitOfEnergy +from homeassistant.core import HomeAssistant +from homeassistant.helpers.storage import Store +from homeassistant.util import dt as dt_util +from homeassistant.util.unit_conversion import EnergyConverter + +from .const import ( + CONF_BACKFILL_DAYS, + DEFAULT_BACKFILL_DAYS, + DEFAULT_RECENT_SYNC_DAYS, + DOMAIN, +) + +if TYPE_CHECKING: + from .coordinator import OhmeConfigEntry + +_LOGGER = logging.getLogger(__name__) + +FULL_HISTORY_START = datetime(2000, 1, 1, tzinfo=dt_util.UTC) +HOUR_FETCH_WINDOW = timedelta(days=7) +MONTH_PREFILTER_WINDOW = timedelta(days=3660) +CLEAR_STATISTICS_TIMEOUT = 180 +STORE_VERSION = 1 + +MONTH_GRANULARITY: Any = getattr(SummaryGranularity, "MONTH", "MONTH") + + +@dataclass(slots=True) +class ImportedStatisticsState: + """Summarize imported statistics for the Ohme energy statistic.""" + + last_start: datetime | None = None + last_sum_kwh: float | None = None + + @property + def exists(self) -> bool: + """Return True if recorder already has statistics for this sensor.""" + return self.last_sum_kwh is not None + + +def get_backfill_days(config_entry: OhmeConfigEntry) -> int: + """Return the configured Ohme backfill lookback in days.""" + return max( + 0, + int(config_entry.options.get(CONF_BACKFILL_DAYS, DEFAULT_BACKFILL_DAYS)), + ) + + +def backfill_window_covers( + previous_backfill_days: int, current_backfill_days: int +) -> bool: + """Return True if the previous backfill window fully covers the current one.""" + if previous_backfill_days == 0: + return True + if current_backfill_days == 0: + return False + return previous_backfill_days >= current_backfill_days + + +def floor_to_hour(timestamp: datetime) -> datetime: + """Return a UTC datetime rounded down to the hour.""" + return dt_util.as_utc(timestamp).replace(minute=0, second=0, microsecond=0) + + +def history_window_end(now: datetime | None = None) -> datetime: + """Return the next UTC hour boundary for history queries.""" + timestamp = dt_util.as_utc(now or dt_util.utcnow()) + return floor_to_hour(timestamp) + timedelta(hours=1) + + +def history_window_start( + config_entry: OhmeConfigEntry, now: datetime | None = None +) -> datetime: + """Return the configured history window start for this config entry.""" + window_end = history_window_end(now) + backfill_days = get_backfill_days(config_entry) + if backfill_days == 0: + return FULL_HISTORY_START + return max(FULL_HISTORY_START, window_end - timedelta(days=backfill_days)) + + +def repair_window_start( + config_entry: OhmeConfigEntry, now: datetime | None = None +) -> datetime: + """Return the bounded repair window start for daily/fallback syncs.""" + window_end = history_window_end(now) + return max( + history_window_start(config_entry, now), + window_end - timedelta(days=DEFAULT_RECENT_SYNC_DAYS), + ) + + +def extract_total_wh(summary_data: Mapping[str, Any] | None) -> int | None: + """Extract the cumulative charged-energy total from summary payload.""" + total_stats = (summary_data or {}).get("totalStats") or {} + total_wh = total_stats.get("energyChargedTotalWh") + if total_wh is None: + return None + return int(total_wh) + + +def extract_total_kwh(summary_data: Mapping[str, Any] | None) -> float | None: + """Extract the cumulative charged-energy total in kWh.""" + total_wh = extract_total_wh(summary_data) + if total_wh is None: + return None + return total_wh / 1000 + + +def _coerce_row_start(value: Any) -> datetime: + """Normalize recorder statistic row start values to UTC datetimes.""" + if isinstance(value, datetime): + return dt_util.as_utc(value) + return dt_util.utc_from_timestamp(float(value)) + + +def _to_ms(timestamp: datetime) -> int: + """Convert a timezone-aware datetime to milliseconds since epoch.""" + return int(timestamp.timestamp() * 1000) + + +def _store(hass: HomeAssistant, config_entry: OhmeConfigEntry) -> Store[dict[str, Any]]: + """Return the storage helper for persisted sync state.""" + return Store( + hass, + STORE_VERSION, + f"{DOMAIN}.{config_entry.entry_id}.history_sync", + ) + + +async def async_load_sync_state( + hass: HomeAssistant, config_entry: OhmeConfigEntry +) -> dict[str, Any] | None: + """Load persisted sync state for this config entry.""" + return await _store(hass, config_entry).async_load() + + +async def async_save_sync_state( + hass: HomeAssistant, + config_entry: OhmeConfigEntry, +) -> None: + """Persist the last successfully applied history-sync settings.""" + await _store(hass, config_entry).async_save( + {CONF_BACKFILL_DAYS: get_backfill_days(config_entry)} + ) + + +async def async_remove_sync_state( + hass: HomeAssistant, config_entry: OhmeConfigEntry +) -> None: + """Remove persisted sync state for this config entry.""" + await _store(hass, config_entry).async_remove() + + +def statistic_id_from_serial(serial: str) -> str: + """Return the external statistic id used for Ohme charged energy history.""" + return f"{DOMAIN}:total_charged_energy_{serial.lower().replace('-', '_')}" + + +def statistic_id(client: OhmeApiClient) -> str: + """Return the external statistic id used for Ohme charged energy history.""" + return statistic_id_from_serial(client.serial) + + +def statistics_metadata(client: OhmeApiClient) -> StatisticMetaData: + """Build recorder statistics metadata for Ohme charged energy.""" + device_name = client.device_info.get("name", client.serial) + return StatisticMetaData( + mean_type=StatisticMeanType.NONE, + has_sum=True, + name=f"{device_name} total charged energy", + source=DOMAIN, + statistic_id=statistic_id(client), + unit_class=EnergyConverter.UNIT_CLASS, + unit_of_measurement=UnitOfEnergy.KILO_WATT_HOUR, + ) + + +async def async_fetch_active_month_ranges( + client: OhmeApiClient, start: datetime, end: datetime +) -> list[tuple[datetime, datetime]]: + """Fetch month windows which contain non-zero charged energy.""" + if start >= end: + return [] + + active_ranges: list[tuple[datetime, datetime]] = [] + pending_range: tuple[datetime, datetime] | None = None + chunk_start = start + + while chunk_start < end: + chunk_end = min(end, chunk_start + MONTH_PREFILTER_WINDOW) + summary = await client.async_get_charge_summary( + start_ts=_to_ms(chunk_start), + end_ts=_to_ms(chunk_end), + granularity=MONTH_GRANULARITY, + ) + + for stat in cast(list[Mapping[str, Any]], summary.get("stats") or []): + if (stat.get("energyChargedTotalWh") or 0) <= 0: + continue + + start_ms = cast(int | None, stat.get("startTime")) + end_ms = cast(int | None, stat.get("endTime")) + if start_ms is None or end_ms is None: + continue + + month_start = max(start, dt_util.utc_from_timestamp(start_ms / 1000)) + month_end = min(end, dt_util.utc_from_timestamp(end_ms / 1000)) + if month_start >= month_end: + continue + + if pending_range is None: + pending_range = (month_start, month_end) + continue + + pending_start, pending_end = pending_range + if month_start <= pending_end: + pending_range = (pending_start, max(pending_end, month_end)) + continue + + active_ranges.append(pending_range) + pending_range = (month_start, month_end) + + chunk_start = chunk_end + + if pending_range is not None: + active_ranges.append(pending_range) + + _LOGGER.debug( + "Ohme month prefilter found %s active range(s) between %s and %s", + len(active_ranges), + start, + end, + ) + return active_ranges + + +async def async_fetch_hourly_energy_points_for_ranges( + client: OhmeApiClient, ranges: list[tuple[datetime, datetime]] +) -> list[tuple[datetime, float]]: + """Fetch hourly charged-energy points for the provided ranges.""" + hourly_points: dict[datetime, float] = {} + + for range_start, range_end in ranges: + chunk_start = range_start + while chunk_start < range_end: + chunk_end = min(range_end, chunk_start + HOUR_FETCH_WINDOW) + summary = await client.async_get_charge_summary( + start_ts=_to_ms(chunk_start), + end_ts=_to_ms(chunk_end), + granularity=SummaryGranularity.HOUR, + ) + + for stat in cast(list[Mapping[str, Any]], summary.get("stats") or []): + start_ms = cast(int | None, stat.get("startTime")) + if start_ms is None: + continue + + bucket_start = dt_util.utc_from_timestamp(start_ms / 1000) + if bucket_start < chunk_start or bucket_start >= chunk_end: + continue + + hourly_points[bucket_start] = ( + stat.get("energyChargedTotalWh") or 0 + ) / 1000 + + chunk_start = chunk_end + + return sorted(hourly_points.items()) + + +async def async_fetch_hourly_energy_points_direct( + client: OhmeApiClient, start: datetime, end: datetime +) -> list[tuple[datetime, float]]: + """Fetch hourly charged-energy points directly for a bounded window.""" + return await async_fetch_hourly_energy_points_for_ranges(client, [(start, end)]) + + +async def async_fetch_hourly_energy_points( + client: OhmeApiClient, + start: datetime, + end: datetime, + *, + use_month_prefilter: bool | None = None, +) -> list[tuple[datetime, float]]: + """Fetch hourly charged-energy points using the most appropriate strategy.""" + if start >= end: + return [] + + if use_month_prefilter is None: + use_month_prefilter = end - start > HOUR_FETCH_WINDOW + + if not use_month_prefilter: + return await async_fetch_hourly_energy_points_direct(client, start, end) + + active_ranges = await async_fetch_active_month_ranges(client, start, end) + return await async_fetch_hourly_energy_points_for_ranges(client, active_ranges) + + +def build_cumulative_statistics( + hourly_points: list[tuple[datetime, float]], + *, + window_start: datetime, + base_sum_kwh: float = 0.0, +) -> list[StatisticData]: + """Build cumulative recorder rows from hourly energy deltas.""" + statistics: list[StatisticData] = [] + current_sum = base_sum_kwh + + if not hourly_points or hourly_points[0][0] > window_start: + statistics.append( + StatisticData(start=window_start, state=current_sum, sum=current_sum) + ) + + for bucket_start, bucket_kwh in hourly_points: + current_sum = round(current_sum + bucket_kwh, 6) + statistics.append( + StatisticData(start=bucket_start, state=current_sum, sum=current_sum) + ) + + return statistics + + +async def async_get_total_before_window( + client: OhmeApiClient, window_start: datetime +) -> float: + """Return the all-time charged-energy total before the requested window.""" + if window_start <= FULL_HISTORY_START: + return 0.0 + + summary = await client.async_get_charge_summary( + start_ts=_to_ms(FULL_HISTORY_START), + end_ts=_to_ms(window_start), + granularity=SummaryGranularity.DAY, + ) + return extract_total_kwh(cast(Mapping[str, Any], summary)) or 0.0 + + +async def async_get_imported_statistics_state( + hass: HomeAssistant, stat_id: str +) -> ImportedStatisticsState: + """Return the current recorder statistics bounds for this sensor.""" + stats = await get_instance(hass).async_add_executor_job( + get_last_statistics, + hass, + 1, + stat_id, + False, + {"sum"}, + ) + rows = stats.get(stat_id, []) + if not rows: + return ImportedStatisticsState() + + last_sum = rows[-1]["sum"] + return ImportedStatisticsState( + last_start=_coerce_row_start(rows[-1]["start"]), + last_sum_kwh=float(last_sum) if last_sum is not None else None, + ) + + +async def async_get_sum_before( + hass: HomeAssistant, stat_id: str, end: datetime +) -> float: + """Return the cumulative sum immediately before the provided window end.""" + if end <= FULL_HISTORY_START: + return 0.0 + + stats = await get_instance(hass).async_add_executor_job( + statistics_during_period, + hass, + FULL_HISTORY_START, + end, + {stat_id}, + "hour", + None, + {"sum"}, + ) + rows = stats.get(stat_id, []) + if not rows: + return 0.0 + last_sum = rows[-1]["sum"] + return float(last_sum) if last_sum is not None else 0.0 + + +async def async_clear_statistics(hass: HomeAssistant, stat_id: str) -> None: + """Clear statistics for the provided statistic id and wait for completion.""" + done_event = asyncio.Event() + + def _done() -> None: + hass.loop.call_soon_threadsafe(done_event.set) + + get_instance(hass).async_clear_statistics([stat_id], on_done=_done) + async with asyncio.timeout(CLEAR_STATISTICS_TIMEOUT): + await done_event.wait() + + +async def async_remove_energy_history( + hass: HomeAssistant, config_entry: OhmeConfigEntry +) -> None: + """Remove imported Ohme energy history and persisted sync state.""" + runtime_data = getattr(config_entry, "runtime_data", None) + serial = ( + runtime_data.charge_session_coordinator.client.serial + if runtime_data is not None + else config_entry.unique_id + ) + if serial is not None: + _LOGGER.debug( + "Removing imported Ohme energy history for serial %s", serial + ) + await async_clear_statistics(hass, statistic_id_from_serial(serial)) + await async_remove_sync_state(hass, config_entry) + + +async def async_sync_energy_history_window( + hass: HomeAssistant, + config_entry: OhmeConfigEntry, + *, + window_start: datetime, + reason: str, + full_rebuild: bool = False, + base_sum_kwh_override: float | None = None, +) -> dict[str, Any]: + """Sync Ohme finalized hourly history into recorder statistics.""" + client = config_entry.runtime_data.charge_session_coordinator.client + stat_id = statistic_id(client) + metadata = statistics_metadata(client) + desired_window_start = history_window_start(config_entry) + query_end = history_window_end() + normalized_start = max(desired_window_start, floor_to_hour(window_start)) + + if normalized_start >= query_end: + return { + "action": "noop", + "reason": "empty_window", + "statistic_id": stat_id, + "window_start": normalized_start.isoformat(), + "window_end": query_end.isoformat(), + } + + _LOGGER.debug( + "Starting Ohme energy history sync (%s): full_rebuild=%s window_start=%s query_end=%s desired_window_start=%s", + reason, + full_rebuild, + normalized_start, + query_end, + desired_window_start, + ) + + existing = await async_get_imported_statistics_state(hass, stat_id) + if full_rebuild: + if base_sum_kwh_override is not None: + base_sum_kwh = base_sum_kwh_override + _LOGGER.debug( + "Using existing imported Ohme baseline before %s: %s kWh", + normalized_start, + base_sum_kwh, + ) + else: + base_sum_kwh = await async_get_total_before_window(client, normalized_start) + else: + if not existing.exists: + _LOGGER.debug( + "Ohme history sync (%s) found no imported statistics; falling back to full rebuild", + reason, + ) + return await async_full_rebuild_energy_history( + hass, + config_entry, + reason=f"{reason}_fallback_full_rebuild", + ) + base_sum_kwh = await async_get_sum_before(hass, stat_id, normalized_start) + + use_month_prefilter = query_end - normalized_start > HOUR_FETCH_WINDOW + hourly_points = await async_fetch_hourly_energy_points( + client, + normalized_start, + query_end, + use_month_prefilter=use_month_prefilter, + ) + statistics = build_cumulative_statistics( + hourly_points, + window_start=normalized_start, + base_sum_kwh=base_sum_kwh, + ) + if full_rebuild and existing.exists: + await async_clear_statistics(hass, stat_id) + async_add_external_statistics(hass, metadata, statistics) + await async_save_sync_state(hass, config_entry) + + _LOGGER.debug( + "Ohme energy history sync complete (%s): full_rebuild=%s, points=%s, start=%s, end=%s, prefilter=%s", + reason, + full_rebuild, + len(statistics), + normalized_start, + query_end, + use_month_prefilter, + ) + + return { + "action": "full_rebuild" if full_rebuild else "window_sync", + "reason": reason, + "statistic_id": stat_id, + "hours_imported": len(statistics), + "window_start": normalized_start.isoformat(), + "window_end": query_end.isoformat(), + "backfill_days": get_backfill_days(config_entry), + "used_month_prefilter": use_month_prefilter, + } + + +async def async_full_rebuild_energy_history( + hass: HomeAssistant, + config_entry: OhmeConfigEntry, + *, + reason: str, + base_sum_kwh_override: float | None = None, +) -> dict[str, Any]: + """Perform a full history rebuild for the configured backfill window.""" + return await async_sync_energy_history_window( + hass, + config_entry, + window_start=history_window_start(config_entry), + full_rebuild=True, + reason=reason, + base_sum_kwh_override=base_sum_kwh_override, + ) + + +async def async_recover_energy_history( + hass: HomeAssistant, + config_entry: OhmeConfigEntry, + *, + reason: str, +) -> dict[str, Any]: + """Recover any missing history from the last imported hour onward.""" + client = config_entry.runtime_data.charge_session_coordinator.client + stat_id = statistic_id(client) + imported_state = await async_get_imported_statistics_state(hass, stat_id) + if not imported_state.exists or imported_state.last_start is None: + return await async_full_rebuild_energy_history( + hass, + config_entry, + reason=f"{reason}_fallback_full_rebuild", + ) + + return await async_sync_energy_history_window( + hass, + config_entry, + window_start=max( + history_window_start(config_entry), + imported_state.last_start - timedelta(hours=1), + ), + reason=reason, + ) + + +async def async_sync_session_energy_history( + hass: HomeAssistant, + config_entry: OhmeConfigEntry, + *, + session_start: datetime | None, + reason: str, +) -> dict[str, Any]: + """Re-import the finalized session window using the active session anchor.""" + if session_start is None: + return await async_sync_repair_energy_history( + hass, + config_entry, + reason=f"{reason}_fallback_repair", + ) + + return await async_sync_energy_history_window( + hass, + config_entry, + window_start=session_start, + reason=reason, + ) + + +async def async_sync_repair_energy_history( + hass: HomeAssistant, + config_entry: OhmeConfigEntry, + *, + reason: str, +) -> dict[str, Any]: + """Re-import a bounded repair window to catch late provider updates.""" + return await async_sync_energy_history_window( + hass, + config_entry, + window_start=repair_window_start(config_entry), + reason=reason, + ) + + +async def async_ensure_energy_history( + hass: HomeAssistant, + config_entry: OhmeConfigEntry, +) -> dict[str, Any]: + """Ensure recorder statistics exist and match the current history settings.""" + client = config_entry.runtime_data.charge_session_coordinator.client + stat_id = statistic_id(client) + imported_state = await async_get_imported_statistics_state(hass, stat_id) + sync_state = await async_load_sync_state(hass, config_entry) + backfill_days = get_backfill_days(config_entry) + + if not imported_state.exists: + _LOGGER.debug( + "Ohme energy history missing for entry %s; starting full rebuild", + config_entry.entry_id, + ) + return await async_full_rebuild_energy_history( + hass, + config_entry, + reason="missing_statistics", + ) + + if sync_state is None: + _LOGGER.debug( + "Ohme energy history has no persisted sync state for entry %s; saving current settings", + config_entry.entry_id, + ) + await async_save_sync_state(hass, config_entry) + elif sync_state.get(CONF_BACKFILL_DAYS) != backfill_days: + previous_backfill_days = max( + 0, int(sync_state.get(CONF_BACKFILL_DAYS, DEFAULT_BACKFILL_DAYS)) + ) + base_sum_kwh_override: float | None = None + if backfill_window_covers(previous_backfill_days, backfill_days): + base_sum_kwh_override = await async_get_sum_before( + hass, + stat_id, + history_window_start(config_entry), + ) + _LOGGER.debug( + "Ohme backfill shrink for entry %s can reuse recorder baseline: previous=%s current=%s baseline=%s kWh", + config_entry.entry_id, + previous_backfill_days, + backfill_days, + base_sum_kwh_override, + ) + _LOGGER.debug( + "Ohme backfill_days changed for entry %s: previous=%s current=%s; rebuilding history", + config_entry.entry_id, + previous_backfill_days, + backfill_days, + ) + return await async_full_rebuild_energy_history( + hass, + config_entry, + reason="backfill_days_changed", + base_sum_kwh_override=base_sum_kwh_override, + ) + + _LOGGER.debug( + "Ohme energy history present for entry %s; running startup recovery", + config_entry.entry_id, + ) + return await async_recover_energy_history( + hass, + config_entry, + reason="startup_recovery", + ) diff --git a/homeassistant/components/ohme/manifest.json b/homeassistant/components/ohme/manifest.json index 236492603f29d0..f237814c614fd2 100644 --- a/homeassistant/components/ohme/manifest.json +++ b/homeassistant/components/ohme/manifest.json @@ -7,5 +7,7 @@ "integration_type": "device", "iot_class": "cloud_polling", "quality_scale": "platinum", - "requirements": ["ohme==1.9.0"] + "requirements": [ + "ohme @ git+https://github.com/tstordyallison/ohmepy.git@public-session-timestamps-and-summary-granularity" + ] } diff --git a/homeassistant/components/ohme/services.py b/homeassistant/components/ohme/services.py index 62307521fd3e6a..e77f5145d5b53a 100644 --- a/homeassistant/components/ohme/services.py +++ b/homeassistant/components/ohme/services.py @@ -44,18 +44,23 @@ ) -def __get_client(call: ServiceCall) -> OhmeApiClient: - """Get the client from the config entry.""" - entry: OhmeConfigEntry = service.async_get_config_entry( +def __get_entry(call: ServiceCall) -> OhmeConfigEntry: + """Get the config entry from the service call.""" + return service.async_get_config_entry( call.hass, DOMAIN, call.data[ATTR_CONFIG_ENTRY] ) - return entry.runtime_data.charge_session_coordinator.client + +def __get_client(call: ServiceCall) -> OhmeApiClient: + """Get the client from the config entry.""" + return __get_entry(call).runtime_data.charge_session_coordinator.client @callback def async_setup_services(hass: HomeAssistant) -> None: """Register services.""" + if hass.services.has_service(DOMAIN, SERVICE_LIST_CHARGE_SLOTS): + return async def list_charge_slots( service_call: ServiceCall, @@ -68,7 +73,7 @@ async def list_charge_slots( async def set_price_cap( service_call: ServiceCall, ) -> None: - """List of charge slots.""" + """Update the configured price cap.""" client = __get_client(service_call) price_cap = service_call.data[ATTR_PRICE_CAP] await client.async_change_price_cap(cap=price_cap) diff --git a/homeassistant/components/ohme/strings.json b/homeassistant/components/ohme/strings.json index 551076059ce7f6..9ac5f2e7397427 100644 --- a/homeassistant/components/ohme/strings.json +++ b/homeassistant/components/ohme/strings.json @@ -43,6 +43,18 @@ } } }, + "options": { + "step": { + "init": { + "data": { + "backfill_days": "Hourly history backfill days" + }, + "data_description": { + "backfill_days": "How many days of finalized hourly Ohme energy history to import into Home Assistant. Set to 0 to import all available history." + } + } + } + }, "entity": { "button": { "approve": { diff --git a/tests/components/ohme/conftest.py b/tests/components/ohme/conftest.py index 2a830a5bfc8a34..9e6d9a29a2330e 100644 --- a/tests/components/ohme/conftest.py +++ b/tests/components/ohme/conftest.py @@ -54,6 +54,8 @@ def mock_client(): client.status = ChargerStatus.CHARGING client.power = ChargerPower(0, 0, 0) client.available = True + client.session_start = None + client.session_finish = None client.target_soc = 50 client.target_time = (8, 0) @@ -64,6 +66,10 @@ def mock_client(): client.cap_available = True client.cap_enabled = True client.energy = 1000 + client.async_get_charge_session.return_value = None + client.async_get_charge_summary.return_value = { + "totalStats": {"energyChargedTotalWh": 123456} + } client.device_info = { "name": "Ohme Home Pro", "model": "Home Pro", diff --git a/tests/components/ohme/test_config_flow.py b/tests/components/ohme/test_config_flow.py index b8754711d7634a..9201caf686bff0 100644 --- a/tests/components/ohme/test_config_flow.py +++ b/tests/components/ohme/test_config_flow.py @@ -5,7 +5,11 @@ from ohme import ApiException, AuthException import pytest -from homeassistant.components.ohme.const import DOMAIN +from homeassistant.components.ohme.const import ( + CONF_BACKFILL_DAYS, + DEFAULT_BACKFILL_DAYS, + DOMAIN, +) from homeassistant.config_entries import SOURCE_USER from homeassistant.const import CONF_EMAIL, CONF_PASSWORD from homeassistant.core import HomeAssistant @@ -38,6 +42,7 @@ async def test_config_flow_success( CONF_EMAIL: "test@example.com", CONF_PASSWORD: "hunter2", } + assert result["options"] == {CONF_BACKFILL_DAYS: DEFAULT_BACKFILL_DAYS} @pytest.mark.parametrize( @@ -84,6 +89,7 @@ async def test_config_flow_fail( CONF_EMAIL: "test@example.com", CONF_PASSWORD: "hunter1", } + assert result["options"] == {CONF_BACKFILL_DAYS: DEFAULT_BACKFILL_DAYS} async def test_already_configured( @@ -263,3 +269,22 @@ async def test_reconfigure_fail( assert result["type"] is FlowResultType.ABORT assert result["reason"] == "reconfigure_successful" + + +async def test_options_flow( + hass: HomeAssistant, mock_config_entry: MockConfigEntry +) -> None: + """Test the Ohme options flow.""" + mock_config_entry.add_to_hass(hass) + + result = await hass.config_entries.options.async_init(mock_config_entry.entry_id) + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "init" + + result = await hass.config_entries.options.async_configure( + result["flow_id"], + user_input={CONF_BACKFILL_DAYS: 600}, + ) + + assert result["type"] is FlowResultType.CREATE_ENTRY + assert result["data"] == {CONF_BACKFILL_DAYS: 600} diff --git a/tests/components/ohme/test_history.py b/tests/components/ohme/test_history.py new file mode 100644 index 00000000000000..713b5433aea276 --- /dev/null +++ b/tests/components/ohme/test_history.py @@ -0,0 +1,786 @@ +"""Tests for Ohme energy history sync behavior.""" + +from __future__ import annotations + +from datetime import datetime, timedelta +from unittest.mock import AsyncMock, MagicMock, patch + +from freezegun.api import FrozenDateTimeFactory +from ohme import ChargerStatus +import pytest + +from homeassistant.components.ohme.const import CONF_BACKFILL_DAYS +from homeassistant.components.ohme.history import ( + ImportedStatisticsState, + async_ensure_energy_history, + async_remove_energy_history, + async_sync_energy_history_window, + history_window_start, + repair_window_start, + statistic_id_from_serial, +) +from homeassistant.core import HomeAssistant +from homeassistant.util import dt as dt_util + +from . import setup_integration + +from tests.common import MockConfigEntry, async_fire_time_changed + + +def _to_ms(timestamp: datetime) -> int: + """Convert a datetime to milliseconds since the Unix epoch.""" + return int(timestamp.timestamp() * 1000) + + +def _summary_side_effect(total_wh: int) -> AsyncMock: + """Build a mocked summary endpoint for one day of hourly charging.""" + day_start = datetime(2026, 4, 5, tzinfo=dt_util.UTC) + month_points = [(day_start, total_wh)] + hourly_points = [ + (datetime(2026, 4, 5, 3, tzinfo=dt_util.UTC), 2.0), + (datetime(2026, 4, 5, 4, tzinfo=dt_util.UTC), 3.5), + ] + + async def _mock_summary( + *, start_ts: int, end_ts: int, granularity: str + ) -> dict[str, object]: + granularity_name = getattr(granularity, "value", granularity) + + if granularity_name == "MONTH": + return { + "totalStats": {"energyChargedTotalWh": total_wh}, + "stats": [ + { + "startTime": _to_ms(bucket_start), + "endTime": _to_ms(bucket_start + timedelta(days=30)), + "energyChargedTotalWh": bucket_wh, + } + for bucket_start, bucket_wh in month_points + if start_ts <= _to_ms(bucket_start) < end_ts + ], + } + + if granularity_name == "DAY": + return {"totalStats": {"energyChargedTotalWh": total_wh}, "stats": []} + + if granularity_name == "HOUR": + return { + "totalStats": {"energyChargedTotalWh": total_wh}, + "stats": [ + { + "startTime": _to_ms(bucket_start), + "endTime": _to_ms(bucket_start + timedelta(hours=1)), + "energyChargedTotalWh": int(bucket_kwh * 1000), + } + for bucket_start, bucket_kwh in hourly_points + if start_ts <= _to_ms(bucket_start) < end_ts + ], + } + + raise AssertionError(f"Unexpected granularity {granularity}") + + return AsyncMock(side_effect=_mock_summary) + + +def _make_charge_session_side_effect( + mock_client: MagicMock, + updates: list[dict[str, object]], +) -> AsyncMock: + """Build a charge-session side effect that mutates the mocked client.""" + index = 0 + + async def _side_effect() -> None: + nonlocal index + payload = updates[min(index, len(updates) - 1)] + index += 1 + mock_client.status = payload["status"] + mock_client.session_start = payload.get("session_start") + mock_client.session_finish = payload.get("session_finish") + + return AsyncMock(side_effect=_side_effect) + + +@pytest.mark.usefixtures("entity_registry_enabled_by_default") +async def test_setup_bootstraps_total_charged_energy_statistics( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test setup bootstraps external statistics for total charged energy.""" + mock_client.async_get_charge_summary = _summary_side_effect(total_wh=5500) + + with ( + patch( + "homeassistant.components.ohme.history.async_get_imported_statistics_state", + new=AsyncMock(return_value=ImportedStatisticsState()), + ), + patch( + "homeassistant.components.ohme.history.async_save_sync_state", + new=AsyncMock(), + ) as mock_save_state, + patch( + "homeassistant.components.ohme.history.async_add_external_statistics" + ) as mock_add_statistics, + ): + mock_config_entry.add_to_hass(hass) + hass.config_entries.async_update_entry( + mock_config_entry, + options={CONF_BACKFILL_DAYS: 0}, + ) + await hass.config_entries.async_setup(mock_config_entry.entry_id) + await hass.async_block_till_done() + + mock_add_statistics.assert_called_once() + metadata = mock_add_statistics.call_args.args[1] + rows = mock_add_statistics.call_args.args[2] + + assert metadata["statistic_id"].startswith("ohme:total_charged_energy_") + assert len(rows) == 3 + assert rows[0]["sum"] == 0.0 + assert rows[1]["state"] == 2.0 + assert rows[2]["sum"] == 5.5 + + mock_save_state.assert_awaited_once_with(hass, mock_config_entry) + + +async def test_remove_energy_history_uses_entry_unique_id_when_not_loaded( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, +) -> None: + """Test history cleanup can derive the statistic id without runtime data.""" + mock_config_entry.add_to_hass(hass) + hass.config_entries.async_update_entry(mock_config_entry, unique_id="chargerid") + + with ( + patch( + "homeassistant.components.ohme.history.async_clear_statistics", + new=AsyncMock(), + ) as mock_clear_statistics, + patch( + "homeassistant.components.ohme.history.async_remove_sync_state", + new=AsyncMock(), + ) as mock_remove_sync_state, + ): + await async_remove_energy_history(hass, mock_config_entry) + + mock_clear_statistics.assert_awaited_once_with( + hass, statistic_id_from_serial("chargerid") + ) + mock_remove_sync_state.assert_awaited_once_with(hass, mock_config_entry) + + +async def test_ensure_energy_history_recovers_from_last_imported_hour( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test startup recovery sync starts from the last imported hour overlap.""" + runtime_data = MagicMock() + runtime_data.charge_session_coordinator.client = mock_client + mock_config_entry.runtime_data = runtime_data + + last_start = datetime(2026, 4, 5, 10, tzinfo=dt_util.UTC) + imported_state = ImportedStatisticsState( + last_start=last_start, + last_sum_kwh=12.345, + ) + + with ( + patch( + "homeassistant.components.ohme.history.async_get_imported_statistics_state", + new=AsyncMock(return_value=imported_state), + ), + patch( + "homeassistant.components.ohme.history.async_load_sync_state", + new=AsyncMock(return_value={CONF_BACKFILL_DAYS: 365}), + ), + patch( + "homeassistant.components.ohme.history.async_sync_energy_history_window", + new=AsyncMock(return_value={"action": "window_sync"}), + ) as mock_sync, + ): + await async_ensure_energy_history(hass, mock_config_entry) + + mock_sync.assert_awaited_once_with( + hass, + mock_config_entry, + window_start=max( + history_window_start(mock_config_entry), + last_start - timedelta(hours=1), + ), + reason="startup_recovery", + ) + + +async def test_ensure_energy_history_rebuilds_if_backfill_days_changes( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test changing backfill days still forces a full rebuild.""" + runtime_data = MagicMock() + runtime_data.charge_session_coordinator.client = mock_client + mock_config_entry.runtime_data = runtime_data + + imported_state = ImportedStatisticsState( + last_start=datetime(2026, 4, 5, 10, tzinfo=dt_util.UTC), + last_sum_kwh=12.345, + ) + + with ( + patch( + "homeassistant.components.ohme.history.async_get_imported_statistics_state", + new=AsyncMock(return_value=imported_state), + ), + patch( + "homeassistant.components.ohme.history.async_load_sync_state", + new=AsyncMock(return_value={CONF_BACKFILL_DAYS: 30}), + ), + patch( + "homeassistant.components.ohme.history.async_full_rebuild_energy_history", + new=AsyncMock(return_value={"action": "full_rebuild"}), + ) as mock_full_rebuild, + ): + await async_ensure_energy_history(hass, mock_config_entry) + + mock_full_rebuild.assert_awaited_once_with( + hass, + mock_config_entry, + reason="backfill_days_changed", + base_sum_kwh_override=None, + ) + + +async def test_ensure_energy_history_backfill_shrink_reuses_existing_baseline( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test shrinking backfill reuses the existing recorder baseline. + + This avoids a large provider-side "total before window" query when we + already have a wider imported statistics window locally. + """ + mock_config_entry.add_to_hass(hass) + runtime_data = MagicMock() + runtime_data.charge_session_coordinator.client = mock_client + mock_config_entry.runtime_data = runtime_data + hass.config_entries.async_update_entry( + mock_config_entry, + options={CONF_BACKFILL_DAYS: 365}, + ) + + imported_state = ImportedStatisticsState( + last_start=datetime(2026, 4, 5, 10, tzinfo=dt_util.UTC), + last_sum_kwh=12.345, + ) + + with ( + patch( + "homeassistant.components.ohme.history.async_get_imported_statistics_state", + new=AsyncMock(return_value=imported_state), + ), + patch( + "homeassistant.components.ohme.history.async_load_sync_state", + new=AsyncMock(return_value={CONF_BACKFILL_DAYS: 0}), + ), + patch( + "homeassistant.components.ohme.history.async_get_sum_before", + new=AsyncMock(return_value=456.789), + ) as mock_get_sum_before, + patch( + "homeassistant.components.ohme.history.async_full_rebuild_energy_history", + new=AsyncMock(return_value={"action": "full_rebuild"}), + ) as mock_full_rebuild, + ): + await async_ensure_energy_history(hass, mock_config_entry) + + mock_get_sum_before.assert_awaited_once_with( + hass, + statistic_id_from_serial(mock_client.serial), + history_window_start(mock_config_entry), + ) + mock_full_rebuild.assert_awaited_once_with( + hass, + mock_config_entry, + reason="backfill_days_changed", + base_sum_kwh_override=456.789, + ) + + +async def test_full_rebuild_does_not_clear_existing_statistics_before_refetch( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test a failed rebuild leaves existing imported statistics untouched.""" + runtime_data = MagicMock() + runtime_data.charge_session_coordinator.client = mock_client + mock_config_entry.runtime_data = runtime_data + + imported_state = ImportedStatisticsState( + last_start=datetime(2026, 4, 5, 10, tzinfo=dt_util.UTC), + last_sum_kwh=12.345, + ) + + with ( + patch( + "homeassistant.components.ohme.history.async_get_imported_statistics_state", + new=AsyncMock(return_value=imported_state), + ), + patch( + "homeassistant.components.ohme.history.async_get_total_before_window", + new=AsyncMock(return_value=5.0), + ), + patch( + "homeassistant.components.ohme.history.async_fetch_hourly_energy_points", + new=AsyncMock(side_effect=RuntimeError("summary fetch failed")), + ), + patch( + "homeassistant.components.ohme.history.async_clear_statistics", + new=AsyncMock(), + ) as mock_clear_statistics, + patch( + "homeassistant.components.ohme.history.async_add_external_statistics" + ) as mock_add_statistics, + pytest.raises(RuntimeError, match="summary fetch failed"), + ): + await async_sync_energy_history_window( + hass, + mock_config_entry, + window_start=history_window_start(mock_config_entry), + reason="test_failed_rebuild", + full_rebuild=True, + ) + + mock_clear_statistics.assert_not_awaited() + mock_add_statistics.assert_not_called() + + +async def test_full_rebuild_uses_override_without_provider_total( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test a rebuild can use a supplied local baseline instead of provider total.""" + runtime_data = MagicMock() + runtime_data.charge_session_coordinator.client = mock_client + mock_config_entry.runtime_data = runtime_data + + imported_state = ImportedStatisticsState( + last_start=datetime(2026, 4, 5, 10, tzinfo=dt_util.UTC), + last_sum_kwh=12.345, + ) + + with ( + patch( + "homeassistant.components.ohme.history.async_get_imported_statistics_state", + new=AsyncMock(return_value=imported_state), + ), + patch( + "homeassistant.components.ohme.history.async_get_total_before_window", + new=AsyncMock(side_effect=AssertionError("should not query provider total")), + ), + patch( + "homeassistant.components.ohme.history.async_fetch_hourly_energy_points", + new=AsyncMock(return_value=[]), + ), + patch( + "homeassistant.components.ohme.history.async_clear_statistics", + new=AsyncMock(), + ) as mock_clear_statistics, + patch( + "homeassistant.components.ohme.history.async_save_sync_state", + new=AsyncMock(), + ), + patch( + "homeassistant.components.ohme.history.async_add_external_statistics" + ) as mock_add_statistics, + ): + await async_sync_energy_history_window( + hass, + mock_config_entry, + window_start=history_window_start(mock_config_entry), + reason="test_override_baseline", + full_rebuild=True, + base_sum_kwh_override=7.5, + ) + + mock_clear_statistics.assert_awaited_once() + rows = mock_add_statistics.call_args.args[2] + assert len(rows) == 1 + assert rows[0]["sum"] == 7.5 + + +async def test_session_finalize_triggers_sync_from_session_start( + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test an explicit finalized marker syncs from the recorded session start. + + Our 2026-04-05 real-session probe showed that finalized summary data only + became visible once Ohme exposed a true completion edge. When the API does + provide both ``session_start`` and ``session_finish``, we can safely anchor + the resync to that completed session. + """ + session_start = datetime(2026, 4, 6, 10, 15, tzinfo=dt_util.UTC) + session_finish = datetime(2026, 4, 6, 12, 5, tzinfo=dt_util.UTC) + + with patch( + "homeassistant.components.ohme.async_ensure_energy_history", + new=AsyncMock(return_value={"action": "noop"}), + ): + await setup_integration(hass, mock_config_entry) + + mock_client.async_get_charge_session = _make_charge_session_side_effect( + mock_client, + [ + { + "status": ChargerStatus.CHARGING, + "session_start": session_start, + "session_finish": None, + }, + { + "status": ChargerStatus.FINISHED, + "session_start": session_start, + "session_finish": session_finish, + }, + ], + ) + + with patch( + "homeassistant.components.ohme.coordinator.async_sync_session_energy_history", + new=AsyncMock(return_value={"action": "window_sync"}), + ) as mock_sync_session: + freezer.tick(timedelta(seconds=30)) + async_fire_time_changed(hass) + await hass.async_block_till_done(wait_background_tasks=True) + + freezer.tick(timedelta(seconds=30)) + async_fire_time_changed(hass) + await hass.async_block_till_done(wait_background_tasks=True) + + mock_sync_session.assert_awaited_once_with( + hass, + mock_config_entry, + session_start=session_start, + reason="session_finalized", + ) + + +async def test_finished_session_does_not_resync_after_restart( + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test seeded finished-session state does not retrigger a finalize sync. + + Startup recovery already reimports from the last imported hour overlap, so + a restart into an already-finalized session should not look like a fresh + completion event and cause a second bounded session sync. + """ + session_start = datetime(2026, 4, 6, 10, 15, tzinfo=dt_util.UTC) + session_finish = datetime(2026, 4, 6, 12, 5, tzinfo=dt_util.UTC) + mock_client.status = ChargerStatus.FINISHED + mock_client.session_start = session_start + mock_client.session_finish = session_finish + mock_client.async_get_charge_session = _make_charge_session_side_effect( + mock_client, + [ + { + "status": ChargerStatus.FINISHED, + "session_start": session_start, + "session_finish": session_finish, + } + ], + ) + + with patch( + "homeassistant.components.ohme.async_ensure_energy_history", + new=AsyncMock(return_value={"action": "noop"}), + ): + await setup_integration(hass, mock_config_entry) + + with ( + patch( + "homeassistant.components.ohme.coordinator.async_sync_session_energy_history", + new=AsyncMock(return_value={"action": "window_sync"}), + ) as mock_sync_session, + patch( + "homeassistant.components.ohme.coordinator.async_sync_repair_energy_history", + new=AsyncMock(return_value={"action": "window_sync"}), + ) as mock_sync_repair, + ): + freezer.tick(timedelta(seconds=30)) + async_fire_time_changed(hass) + await hass.async_block_till_done(wait_background_tasks=True) + + mock_sync_session.assert_not_awaited() + mock_sync_repair.assert_not_awaited() + + +async def test_finished_without_marker_waits_for_disconnect( + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test plain finished state waits for disconnect before syncing. + + The 2026-04-05 probe entered ``FINISHED_CHARGE`` at 07:54 local time, but + ``charge_stats_energy_wh`` and the fixed-range summary total both stayed + stale until the charger became ``DISCONNECTED`` at 09:21. Only that + disconnect edge exposed ``finish_time_local`` and the finalized summary + totals, so a bare finished state without a finish marker must not trigger + an early sync. + """ + session_start = datetime(2026, 4, 6, 10, 15, tzinfo=dt_util.UTC) + session_finish = datetime(2026, 4, 6, 14, 21, tzinfo=dt_util.UTC) + + with patch( + "homeassistant.components.ohme.async_ensure_energy_history", + new=AsyncMock(return_value={"action": "noop"}), + ): + await setup_integration(hass, mock_config_entry) + + mock_client.async_get_charge_session = _make_charge_session_side_effect( + mock_client, + [ + { + "status": ChargerStatus.CHARGING, + "session_start": session_start, + "session_finish": None, + }, + { + "status": ChargerStatus.FINISHED, + "session_start": session_start, + "session_finish": None, + }, + { + "status": ChargerStatus.UNPLUGGED, + "session_start": session_start, + "session_finish": session_finish, + }, + ], + ) + + with ( + patch( + "homeassistant.components.ohme.coordinator.async_sync_session_energy_history", + new=AsyncMock(return_value={"action": "window_sync"}), + ) as mock_sync_session, + patch( + "homeassistant.components.ohme.coordinator.async_sync_repair_energy_history", + new=AsyncMock(return_value={"action": "window_sync"}), + ) as mock_sync_repair, + ): + freezer.tick(timedelta(seconds=30)) + async_fire_time_changed(hass) + await hass.async_block_till_done(wait_background_tasks=True) + + freezer.tick(timedelta(seconds=30)) + async_fire_time_changed(hass) + await hass.async_block_till_done(wait_background_tasks=True) + + freezer.tick(timedelta(seconds=30)) + async_fire_time_changed(hass) + await hass.async_block_till_done(wait_background_tasks=True) + + mock_sync_repair.assert_not_awaited() + mock_sync_session.assert_awaited_once_with( + hass, + mock_config_entry, + session_start=session_start, + reason="session_finalized", + ) + + +async def test_missing_session_marker_falls_back_to_repair_sync( + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test unplug without session markers falls back to bounded repair sync. + + The same probe showed unplug as the point where finalized summary data + becomes visible. If Ohme gives us that disconnect edge but no usable + ``session_start`` / ``session_finish`` anchors, we still repair the recent + bounded window rather than guessing a full-session anchor. + """ + with patch( + "homeassistant.components.ohme.async_ensure_energy_history", + new=AsyncMock(return_value={"action": "noop"}), + ): + await setup_integration(hass, mock_config_entry) + + mock_client.async_get_charge_session = _make_charge_session_side_effect( + mock_client, + [ + { + "status": ChargerStatus.CHARGING, + "session_start": None, + "session_finish": None, + }, + { + "status": ChargerStatus.UNPLUGGED, + "session_start": None, + "session_finish": None, + }, + ], + ) + + with patch( + "homeassistant.components.ohme.coordinator.async_sync_repair_energy_history", + new=AsyncMock(return_value={"action": "window_sync"}), + ) as mock_sync_repair: + freezer.tick(timedelta(seconds=30)) + async_fire_time_changed(hass) + await hass.async_block_till_done(wait_background_tasks=True) + + freezer.tick(timedelta(seconds=30)) + async_fire_time_changed(hass) + await hass.async_block_till_done(wait_background_tasks=True) + + mock_sync_repair.assert_awaited_once_with( + hass, + mock_config_entry, + reason="session_marker_missing", + ) + + +async def test_finalize_schedules_one_delayed_retry( + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test a finalized session schedules exactly one delayed retry. + + Ohme summary data is convergent rather than live. We resync immediately + when a completion edge appears, then schedule one delayed retry so a late + finalized summary bucket can still be picked up without polling summary + endpoints continuously. + """ + session_start = datetime(2026, 4, 6, 10, 15, tzinfo=dt_util.UTC) + session_finish = datetime(2026, 4, 6, 12, 5, tzinfo=dt_util.UTC) + + with patch( + "homeassistant.components.ohme.async_ensure_energy_history", + new=AsyncMock(return_value={"action": "noop"}), + ): + await setup_integration(hass, mock_config_entry) + + mock_client.async_get_charge_session = _make_charge_session_side_effect( + mock_client, + [ + { + "status": ChargerStatus.CHARGING, + "session_start": session_start, + "session_finish": None, + }, + { + "status": ChargerStatus.FINISHED, + "session_start": session_start, + "session_finish": session_finish, + }, + { + "status": ChargerStatus.FINISHED, + "session_start": session_start, + "session_finish": session_finish, + }, + ], + ) + + with patch( + "homeassistant.components.ohme.coordinator.async_sync_session_energy_history", + new=AsyncMock(return_value={"action": "window_sync"}), + ) as mock_sync_session: + freezer.tick(timedelta(seconds=30)) + async_fire_time_changed(hass) + await hass.async_block_till_done(wait_background_tasks=True) + + freezer.tick(timedelta(seconds=30)) + async_fire_time_changed(hass) + await hass.async_block_till_done(wait_background_tasks=True) + + freezer.tick(timedelta(minutes=15)) + async_fire_time_changed(hass) + await hass.async_block_till_done(wait_background_tasks=True) + + assert mock_sync_session.await_count == 2 + first_call, second_call = mock_sync_session.await_args_list + assert first_call.kwargs["reason"] == "session_finalized" + assert second_call.kwargs["reason"] == "session_finalized_retry" + + +async def test_daily_repair_runs_over_bounded_window( + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test the daily repair timer uses the bounded repair helper.""" + with patch( + "homeassistant.components.ohme.async_ensure_energy_history", + new=AsyncMock(return_value={"action": "noop"}), + ): + await setup_integration(hass, mock_config_entry) + + with patch( + "homeassistant.components.ohme.coordinator.async_sync_repair_energy_history", + new=AsyncMock(return_value={"action": "window_sync"}), + ) as mock_sync_repair: + freezer.tick(timedelta(days=1)) + async_fire_time_changed(hass) + await hass.async_block_till_done(wait_background_tasks=True) + + mock_sync_repair.assert_awaited_once_with( + hass, + mock_config_entry, + reason="daily_repair", + ) + + +async def test_regular_charge_session_poll_no_longer_queries_summary( + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test steady-state polling no longer hits the summary endpoint.""" + with patch( + "homeassistant.components.ohme.async_ensure_energy_history", + new=AsyncMock(return_value={"action": "noop"}), + ): + await setup_integration(hass, mock_config_entry) + + await_count_before = mock_client.async_get_charge_summary.await_count + + freezer.tick(timedelta(seconds=30)) + async_fire_time_changed(hass) + await hass.async_block_till_done(wait_background_tasks=True) + + freezer.tick(timedelta(seconds=30)) + async_fire_time_changed(hass) + await hass.async_block_till_done(wait_background_tasks=True) + + assert mock_client.async_get_charge_summary.await_count == await_count_before == 0 + + +async def test_repair_window_uses_recent_sync_days( + mock_config_entry: MockConfigEntry, +) -> None: + """Test the bounded repair window is derived from DEFAULT_RECENT_SYNC_DAYS.""" + window_end = datetime(2026, 4, 6, 12, 0, tzinfo=dt_util.UTC) + assert repair_window_start(mock_config_entry, window_end) == datetime( + 2026, + 3, + 30, + 13, + 0, + tzinfo=dt_util.UTC, + ) diff --git a/tests/components/ohme/test_init.py b/tests/components/ohme/test_init.py index 7d9d388867fc6a..791c0eaedddfe8 100644 --- a/tests/components/ohme/test_init.py +++ b/tests/components/ohme/test_init.py @@ -1,6 +1,6 @@ """Test init of Ohme integration.""" -from unittest.mock import MagicMock +from unittest.mock import AsyncMock, MagicMock, patch from syrupy.assertion import SnapshotAssertion @@ -45,3 +45,39 @@ async def test_device( device = device_registry.async_get_device({(DOMAIN, mock_client.serial)}) assert device assert device == snapshot + + +async def test_remove_entry_cleans_up_imported_history( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test removing the config entry cleans up imported history state.""" + await setup_integration(hass, mock_config_entry) + + with patch( + "homeassistant.components.ohme.async_remove_energy_history", + new=AsyncMock(), + ) as mock_remove_history: + await hass.config_entries.async_remove(mock_config_entry.entry_id) + await hass.async_block_till_done() + + mock_remove_history.assert_awaited_once_with(hass, mock_config_entry) + + +async def test_remove_entry_cleans_up_imported_history_when_not_loaded( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, +) -> None: + """Test removing an unloaded entry still cleans up imported history state.""" + mock_config_entry.add_to_hass(hass) + hass.config_entries.async_update_entry(mock_config_entry, unique_id="chargerid") + + with patch( + "homeassistant.components.ohme.async_remove_energy_history", + new=AsyncMock(), + ) as mock_remove_history: + await hass.config_entries.async_remove(mock_config_entry.entry_id) + await hass.async_block_till_done() + + mock_remove_history.assert_awaited_once_with(hass, mock_config_entry) diff --git a/tests/components/ohme/test_sensor.py b/tests/components/ohme/test_sensor.py index b7c8f82aafc1b5..4b7dc00c894e58 100644 --- a/tests/components/ohme/test_sensor.py +++ b/tests/components/ohme/test_sensor.py @@ -14,7 +14,7 @@ from . import setup_integration -from tests.common import MockConfigEntry, async_fire_time_changed, snapshot_platform +from tests.common import MockConfigEntry, async_fire_time_changed @pytest.mark.usefixtures("entity_registry_enabled_by_default") @@ -29,7 +29,18 @@ async def test_sensors( with patch("homeassistant.components.ohme.PLATFORMS", [Platform.SENSOR]): await setup_integration(hass, mock_config_entry) - await snapshot_platform(hass, entity_registry, snapshot, mock_config_entry.entry_id) + entity_entries = sorted( + er.async_entries_for_config_entry(entity_registry, mock_config_entry.entry_id), + key=lambda entry: entry.entity_id, + ) + assert entity_entries + + for entity_entry in entity_entries: + assert entity_entry == snapshot(name=f"{entity_entry.entity_id}-entry") + assert entity_entry.disabled_by is None, "Please enable all entities." + state = hass.states.get(entity_entry.entity_id) + assert state, f"State not found for {entity_entry.entity_id}" + assert state == snapshot(name=f"{entity_entry.entity_id}-state") async def test_sensors_unavailable( @@ -59,3 +70,19 @@ async def test_sensors_unavailable( state = hass.states.get("sensor.ohme_home_pro_energy") assert state.state == "1.0" + + +async def test_summary_failure_nonfatal( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, + mock_client: MagicMock, +) -> None: + """Test summary failures don't block setup of the live charger sensors.""" + mock_client.async_get_charge_summary.side_effect = ApiException + + await setup_integration(hass, mock_config_entry) + + live_state = hass.states.get("sensor.ohme_home_pro_energy") + assert live_state + assert live_state.state == "1.0" + assert hass.states.get("sensor.ohme_home_pro_total_charged_energy") is None