Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Next

- **Fixed** Home Assistant sensors falsely reported as stale when their value doesn't change for a while (e.g. solar production on an unloaded phase, or at night) ([#363](https://github.com/tomquist/astrameter/issues/363)).
- **Fixed** false "Home Assistant sensor is stale" errors for sensors that update infrequently or push only on value changes — including constant readings (e.g. solar production at night) and push-based integrations. The Home Assistant powermeter now treats a sensor as stale only when Home Assistant itself marks it `unavailable`/`unknown` or the websocket connection is lost ([#363](https://github.com/tomquist/astrameter/issues/363)).

## 2.0.0

Expand Down
239 changes: 45 additions & 194 deletions src/astrameter/powermeter/homeassistant.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
import contextlib
import json
import logging
import time
from collections.abc import Callable
from datetime import datetime, timezone
from typing import Any

import aiohttp
Expand All @@ -23,22 +20,6 @@
# WebSocket heartbeat (seconds) — same rationale as HomeWizard.
WS_HEARTBEAT_SECONDS = 30.0

# An entity older than this is considered stale by the local push timer.
# Crossing this threshold triggers the REST fallback (see below), not an
# immediate error: HA's ``subscribe_entities`` only forwards
# ``state_changed`` events, so a sensor with a constant value (e.g. solar
# production on an unloaded phase) produces no pushes even when HA itself
# is up to date.
DEFAULT_MAX_STATE_AGE_SECONDS = 60.0

# Total wall-clock budget for the REST staleness fallback. When local push
# silence exceeds ``max_state_age_seconds`` for any tracked entity, we
# fan out parallel ``GET /api/states/{entity}`` requests bounded by this
# deadline; HA returns ``last_reported`` (mutated on every state write,
# including same-value reports), which we use as the authoritative
# freshness signal. Bounded so a battery's UDP request never stalls.
REST_REFRESH_TIMEOUT_SECONDS = 1.0


class HomeAssistant(Powermeter):
def __init__(
Expand All @@ -52,9 +33,6 @@ def __init__(
power_input_alias: str | list[str],
power_output_alias: str | list[str],
path_prefix: str | None,
*,
max_state_age_seconds: float = DEFAULT_MAX_STATE_AGE_SECONDS,
clock: Callable[[], float] | None = None,
):
self.ip = ip
self.port = port
Expand All @@ -77,13 +55,21 @@ def __init__(
else power_output_alias
)
self.path_prefix = path_prefix
self._max_state_age_seconds = max(0.0, max_state_age_seconds)
self._clock = clock or time.monotonic

if self.power_calculate and len(self.power_input_alias) != len(
self.power_output_alias
):
raise ValueError(
"Home Assistant power_input_alias and power_output_alias lengths differ"
)

# ``None`` = no usable value (never received, or the integration
# reported ``unavailable`` / ``unknown``). Freshness is owned by
# the integration: it sets sensors to ``unavailable`` when its
# upstream source dies, and aiohttp's websocket heartbeat catches
# a dead TCP connection on our side. A constant numeric value is
# therefore legitimate and must not be treated as stale.
self._entity_values: dict[str, float | None] = {}
# Per-entity timestamp of the most recent state update, used
# for staleness detection (None = never received).
self._entity_update_time: dict[str, float | None] = {}
self._tracked_entities = self._collect_entities()
self._msg_id = 0
self._subscribe_entities_id: int | None = None
Expand All @@ -104,11 +90,6 @@ def _build_ws_url(self) -> str:
prefix = self.path_prefix or ""
return f"{scheme}://{self.ip}:{self.port}{prefix}/api/websocket"

def _build_rest_state_url(self, entity_id: str) -> str:
scheme = "https" if self.use_https else "http"
prefix = self.path_prefix or ""
return f"{scheme}://{self.ip}:{self.port}{prefix}/api/states/{entity_id}"

def _next_id(self) -> int:
self._msg_id += 1
return self._msg_id
Expand Down Expand Up @@ -153,22 +134,21 @@ async def _ws_loop(self) -> None:
raise
except Exception as e:
logger.error("Home Assistant WebSocket error: %s", e, exc_info=True)
# Reset protocol state for reconnection; keep _entity_values
# as a courtesy, but mark them all stale so the staleness
# check in _get_entity_value falls back to REST (or raises)
# until fresh state pushes arrive from the reconnect.
# ``_entities_ready`` must also clear, otherwise
# ``wait_for_message()`` would return immediately for any
# caller relying on it as a readiness signal even though
# every entity is effectively stale until the next
# ``subscribe_entities`` snapshot.
self._msg_id = 0
self._subscribe_entities_id = None
for eid in list(self._entity_update_time):
self._entity_update_time[eid] = None
self._entities_ready.clear()
self._reset_for_reconnect()
await asyncio.sleep(5)

def _reset_for_reconnect(self) -> None:
"""Reset protocol state and invalidate cached values so
``get_powermeter_watts`` raises (and ``wait_for_message`` blocks)
until the reconnected ``subscribe_entities`` snapshot repopulates
them.
"""
self._msg_id = 0
self._subscribe_entities_id = None
for eid in list(self._entity_values):
self._entity_values[eid] = None
self._entities_ready.clear()

def _handle_compressed_entity_event(self, ev: dict[str, Any]) -> None:
"""Apply subscribe_entities payloads (initial + diffs)."""
additions = ev.get("a")
Expand All @@ -190,13 +170,13 @@ def _handle_compressed_entity_event(self, ev: dict[str, Any]) -> None:
continue
if _HA_S in plus:
self._update_entity_value(eid, plus.get(_HA_S))
elif _HA_LU in plus or _HA_LC in plus:
# state_reported (value unchanged): HA omits ``s`` and
# sends only ``lu``. Treat as a keepalive so the
# staleness check does not fire on sensors whose value
# is legitimately constant (e.g. solar production at
# night, an unloaded phase).
self._mark_entity_alive(eid)
elif (_HA_LU in plus or _HA_LC in plus) and self._entity_values.get(
eid
) is not None:
# state_reported (value unchanged) — wake
# ``wait_for_next_message`` so callers don't time
# out waiting for a push on a constant sensor.
self._message_event.set()
removals = ev.get("r")
if isinstance(removals, list):
for eid in removals:
Expand Down Expand Up @@ -248,177 +228,48 @@ def _update_entity_value(self, entity_id: str, state_val: object) -> None:
logger.debug(f"Home Assistant: update_entity_value: {entity_id}, {state_val}")
if state_val is None:
self._entity_values[entity_id] = None
self._entity_update_time[entity_id] = None
self._check_entities_ready()
return
try:
value = float(state_val) # type: ignore[arg-type]
self._entity_values[entity_id] = value
self._entity_update_time[entity_id] = self._clock()
self._entity_values[entity_id] = float(state_val) # type: ignore[arg-type]
except (ValueError, TypeError):
# ``unavailable`` / ``unknown`` (or any non-numeric state) —
# the integration is telling us the value isn't usable.
logger.warning(
f"Home Assistant sensor {entity_id} state '{state_val}' is not numeric"
)
self._entity_values[entity_id] = None
self._entity_update_time[entity_id] = None
self._check_entities_ready()
self._message_event.set()

def _mark_entity_alive(self, entity_id: str) -> None:
if self._entity_values.get(entity_id) is None:
return
self._entity_update_time[entity_id] = self._clock()
self._message_event.set()

def _check_entities_ready(self) -> None:
ready = all(
self._entity_values.get(e) is not None
and self._entity_update_time.get(e) is not None
for e in self._tracked_entities
self._entity_values.get(e) is not None for e in self._tracked_entities
)
if ready:
self._entities_ready.set()
else:
self._entities_ready.clear()

def _locally_stale_entities(self) -> list[str]:
if self._max_state_age_seconds <= 0:
return []
now = self._clock()
stale: list[str] = []
for eid in self._tracked_entities:
if self._entity_values.get(eid) is None:
stale.append(eid)
continue
last = self._entity_update_time.get(eid)
if last is None or (now - last) > self._max_state_age_seconds:
stale.append(eid)
return stale

async def _refresh_stale_via_rest(
self, timeout: float = REST_REFRESH_TIMEOUT_SECONDS
) -> None:
"""REST-poll any entity whose local push timer has crossed the
staleness threshold, bounded by ``timeout`` total wall-clock.

``subscribe_entities`` only forwards ``state_changed``; sensors with
a constant value (e.g. solar production on an unloaded phase) never
push, so the per-entity timer is not a reliable freshness signal.
``GET /api/states/{eid}`` returns HA's ``last_reported``, which is
mutated on every state write — including same-value reports — and
is the authoritative source of truth.
"""
if self._session is None:
return
stale = self._locally_stale_entities()
if not stale:
return
# Whatever finishes in-budget is already applied; anything still
# stale after the timeout will be caught by ``_get_entity_value``.
with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(
asyncio.gather(
*(self._fetch_rest_state(eid) for eid in stale),
return_exceptions=True,
),
timeout=timeout,
)

async def _fetch_rest_state(self, entity_id: str) -> None:
assert self._session is not None
# Snapshot the local update time so we can detect a concurrent
# websocket push (or another in-flight REST refresh) and avoid
# clobbering newer data with a potentially-older REST response.
pre_update = self._entity_update_time.get(entity_id)
url = self._build_rest_state_url(entity_id)
headers = {"Authorization": f"Bearer {self.access_token}"}
try:
async with self._session.get(url, headers=headers) as resp:
if resp.status != 200:
logger.debug(
"Home Assistant REST refresh for %s: HTTP %s",
entity_id,
resp.status,
)
return
data = await resp.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
logger.debug(
"Home Assistant REST refresh for %s failed: %s", entity_id, exc
)
return
if self._entity_update_time.get(entity_id) != pre_update:
return
if isinstance(data, dict):
self._apply_rest_state(entity_id, data)

def _apply_rest_state(self, entity_id: str, data: dict[str, Any]) -> None:
state_val = data.get("state")
if state_val in (None, "unknown", "unavailable"):
return
try:
value = float(state_val) # type: ignore[arg-type]
except (ValueError, TypeError):
return
# Trust HA's ``last_reported`` (mutated on every state write).
# If HA itself hasn't seen an update within the staleness window,
# don't refresh local cache — let the staleness check raise.
if self._max_state_age_seconds > 0:
reported_iso = data.get("last_reported") or data.get("last_updated")
if not isinstance(reported_iso, str):
return
try:
reported_dt = datetime.fromisoformat(reported_iso)
except ValueError:
return
if reported_dt.tzinfo is None:
reported_dt = reported_dt.replace(tzinfo=timezone.utc)
ha_age = (datetime.now(timezone.utc) - reported_dt).total_seconds()
if ha_age > self._max_state_age_seconds:
return
self._entity_values[entity_id] = value
self._entity_update_time[entity_id] = self._clock()
self._check_entities_ready()
self._message_event.set()

def _get_entity_value(self, entity_id: str) -> float:
val = self._entity_values.get(entity_id)
if val is None:
raise ValueError(f"Home Assistant sensor {entity_id} has no state")
if self._max_state_age_seconds > 0:
last = self._entity_update_time.get(entity_id)
if last is None:
raise ValueError(
f"Home Assistant sensor {entity_id} has no update timestamp"
)
age = self._clock() - last
if age > self._max_state_age_seconds:
raise ValueError(
f"Home Assistant sensor {entity_id} is stale "
f"({age:.1f}s old, max {self._max_state_age_seconds:.1f}s)"
)
return val

async def get_powermeter_watts(self) -> list[float]:
await self._refresh_stale_via_rest()
if not self.power_calculate:
return [
self._get_entity_value(entity) for entity in self.current_power_entity
]
else:
if len(self.power_input_alias) != len(self.power_output_alias):
raise ValueError(
"Home Assistant power_input_alias and"
" power_output_alias lengths differ"
)
results = []
for in_entity, out_entity in zip(
self.power_input_alias, self.power_output_alias, strict=False
):
power_in = self._get_entity_value(in_entity)
power_out = self._get_entity_value(out_entity)
results.append(power_in - power_out)
return results
results = []
for in_entity, out_entity in zip(
self.power_input_alias, self.power_output_alias, strict=False
):
power_in = self._get_entity_value(in_entity)
power_out = self._get_entity_value(out_entity)
results.append(power_in - power_out)
return results

async def wait_for_message(self, timeout: float = 5) -> None:
try:
Expand Down
Loading
Loading