Skip to content

Commit d950bb6

Browse files
committed
Drop per-entity staleness check and REST fallback
The 60 s per-entity timer and its REST-based last_reported fallback were chasing a problem the integration layer already owns. HA integrations set their sensors to unavailable/unknown when their upstream source goes silent (ESPHome device offline, ha-tibber-pulse-local websocket dead, etc.); _update_entity_value already converts those to None, which _get_entity_value already raises on. Constant numeric values are legitimate steady-state behaviour and must not be flagged. This also covers the push-based integration case (issue #363 follow-up): when a smart meter pushes only on value changes, HA's last_reported stays old too, so the merged REST fallback bailed and re-raised the same false stale error. Trusting unavailable/unknown is the only correct freshness signal here. aiohttp's websocket heartbeat (30 s) still catches dead TCP and triggers reconnect; the reconnect block now invalidates cached values so callers can't serve them across an extended outage. Net: ~600 lines removed (staleness machinery + REST fallback + tests).
1 parent 847d9cd commit d950bb6

2 files changed

Lines changed: 59 additions & 666 deletions

File tree

src/astrameter/powermeter/homeassistant.py

Lines changed: 35 additions & 191 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22
import contextlib
33
import json
44
import logging
5-
import time
6-
from collections.abc import Callable
7-
from datetime import datetime, timezone
85
from typing import Any
96

107
import aiohttp
@@ -23,22 +20,6 @@
2320
# WebSocket heartbeat (seconds) — same rationale as HomeWizard.
2421
WS_HEARTBEAT_SECONDS = 30.0
2522

26-
# An entity older than this is considered stale by the local push timer.
27-
# Crossing this threshold triggers the REST fallback (see below), not an
28-
# immediate error: HA's ``subscribe_entities`` only forwards
29-
# ``state_changed`` events, so a sensor with a constant value (e.g. solar
30-
# production on an unloaded phase) produces no pushes even when HA itself
31-
# is up to date.
32-
DEFAULT_MAX_STATE_AGE_SECONDS = 60.0
33-
34-
# Total wall-clock budget for the REST staleness fallback. When local push
35-
# silence exceeds ``max_state_age_seconds`` for any tracked entity, we
36-
# fan out parallel ``GET /api/states/{entity}`` requests bounded by this
37-
# deadline; HA returns ``last_reported`` (mutated on every state write,
38-
# including same-value reports), which we use as the authoritative
39-
# freshness signal. Bounded so a battery's UDP request never stalls.
40-
REST_REFRESH_TIMEOUT_SECONDS = 1.0
41-
4223

4324
class HomeAssistant(Powermeter):
4425
def __init__(
@@ -52,9 +33,6 @@ def __init__(
5233
power_input_alias: str | list[str],
5334
power_output_alias: str | list[str],
5435
path_prefix: str | None,
55-
*,
56-
max_state_age_seconds: float = DEFAULT_MAX_STATE_AGE_SECONDS,
57-
clock: Callable[[], float] | None = None,
5836
):
5937
self.ip = ip
6038
self.port = port
@@ -77,13 +55,14 @@ def __init__(
7755
else power_output_alias
7856
)
7957
self.path_prefix = path_prefix
80-
self._max_state_age_seconds = max(0.0, max_state_age_seconds)
81-
self._clock = clock or time.monotonic
8258

59+
# ``None`` = no usable value (never received, or the integration
60+
# reported ``unavailable`` / ``unknown``). Freshness is owned by
61+
# the integration: it sets sensors to ``unavailable`` when its
62+
# upstream source dies, and aiohttp's websocket heartbeat catches
63+
# a dead TCP connection on our side. A constant numeric value is
64+
# therefore legitimate and must not be treated as stale.
8365
self._entity_values: dict[str, float | None] = {}
84-
# Per-entity timestamp of the most recent state update, used
85-
# for staleness detection (None = never received).
86-
self._entity_update_time: dict[str, float | None] = {}
8766
self._tracked_entities = self._collect_entities()
8867
self._msg_id = 0
8968
self._subscribe_entities_id: int | None = None
@@ -104,11 +83,6 @@ def _build_ws_url(self) -> str:
10483
prefix = self.path_prefix or ""
10584
return f"{scheme}://{self.ip}:{self.port}{prefix}/api/websocket"
10685

107-
def _build_rest_state_url(self, entity_id: str) -> str:
108-
scheme = "https" if self.use_https else "http"
109-
prefix = self.path_prefix or ""
110-
return f"{scheme}://{self.ip}:{self.port}{prefix}/api/states/{entity_id}"
111-
11286
def _next_id(self) -> int:
11387
self._msg_id += 1
11488
return self._msg_id
@@ -153,19 +127,14 @@ async def _ws_loop(self) -> None:
153127
raise
154128
except Exception as e:
155129
logger.error("Home Assistant WebSocket error: %s", e, exc_info=True)
156-
# Reset protocol state for reconnection; keep _entity_values
157-
# as a courtesy, but mark them all stale so the staleness
158-
# check in _get_entity_value falls back to REST (or raises)
159-
# until fresh state pushes arrive from the reconnect.
160-
# ``_entities_ready`` must also clear, otherwise
161-
# ``wait_for_message()`` would return immediately for any
162-
# caller relying on it as a readiness signal even though
163-
# every entity is effectively stale until the next
164-
# ``subscribe_entities`` snapshot.
130+
# Reset protocol state and invalidate cached values so
131+
# ``get_powermeter_watts`` raises (and ``wait_for_message``
132+
# blocks) until the reconnected ``subscribe_entities``
133+
# snapshot repopulates them.
165134
self._msg_id = 0
166135
self._subscribe_entities_id = None
167-
for eid in list(self._entity_update_time):
168-
self._entity_update_time[eid] = None
136+
for eid in list(self._entity_values):
137+
self._entity_values[eid] = None
169138
self._entities_ready.clear()
170139
await asyncio.sleep(5)
171140

@@ -190,13 +159,13 @@ def _handle_compressed_entity_event(self, ev: dict[str, Any]) -> None:
190159
continue
191160
if _HA_S in plus:
192161
self._update_entity_value(eid, plus.get(_HA_S))
193-
elif _HA_LU in plus or _HA_LC in plus:
194-
# state_reported (value unchanged): HA omits ``s`` and
195-
# sends only ``lu``. Treat as a keepalive so the
196-
# staleness check does not fire on sensors whose value
197-
# is legitimately constant (e.g. solar production at
198-
# night, an unloaded phase).
199-
self._mark_entity_alive(eid)
162+
elif (_HA_LU in plus or _HA_LC in plus) and self._entity_values.get(
163+
eid
164+
) is not None:
165+
# state_reported (value unchanged) — wake
166+
# ``wait_for_next_message`` so callers don't time
167+
# out waiting for a push on a constant sensor.
168+
self._message_event.set()
200169
removals = ev.get("r")
201170
if isinstance(removals, list):
202171
for eid in removals:
@@ -248,177 +217,52 @@ def _update_entity_value(self, entity_id: str, state_val: object) -> None:
248217
logger.debug(f"Home Assistant: update_entity_value: {entity_id}, {state_val}")
249218
if state_val is None:
250219
self._entity_values[entity_id] = None
251-
self._entity_update_time[entity_id] = None
252220
self._check_entities_ready()
253221
return
254222
try:
255-
value = float(state_val) # type: ignore[arg-type]
256-
self._entity_values[entity_id] = value
257-
self._entity_update_time[entity_id] = self._clock()
223+
self._entity_values[entity_id] = float(state_val) # type: ignore[arg-type]
258224
except (ValueError, TypeError):
225+
# ``unavailable`` / ``unknown`` (or any non-numeric state) —
226+
# the integration is telling us the value isn't usable.
259227
logger.warning(
260228
f"Home Assistant sensor {entity_id} state '{state_val}' is not numeric"
261229
)
262230
self._entity_values[entity_id] = None
263-
self._entity_update_time[entity_id] = None
264231
self._check_entities_ready()
265232
self._message_event.set()
266233

267-
def _mark_entity_alive(self, entity_id: str) -> None:
268-
if self._entity_values.get(entity_id) is None:
269-
return
270-
self._entity_update_time[entity_id] = self._clock()
271-
self._message_event.set()
272-
273234
def _check_entities_ready(self) -> None:
274235
ready = all(
275-
self._entity_values.get(e) is not None
276-
and self._entity_update_time.get(e) is not None
277-
for e in self._tracked_entities
236+
self._entity_values.get(e) is not None for e in self._tracked_entities
278237
)
279238
if ready:
280239
self._entities_ready.set()
281240
else:
282241
self._entities_ready.clear()
283242

284-
def _locally_stale_entities(self) -> list[str]:
285-
if self._max_state_age_seconds <= 0:
286-
return []
287-
now = self._clock()
288-
stale: list[str] = []
289-
for eid in self._tracked_entities:
290-
if self._entity_values.get(eid) is None:
291-
stale.append(eid)
292-
continue
293-
last = self._entity_update_time.get(eid)
294-
if last is None or (now - last) > self._max_state_age_seconds:
295-
stale.append(eid)
296-
return stale
297-
298-
async def _refresh_stale_via_rest(
299-
self, timeout: float = REST_REFRESH_TIMEOUT_SECONDS
300-
) -> None:
301-
"""REST-poll any entity whose local push timer has crossed the
302-
staleness threshold, bounded by ``timeout`` total wall-clock.
303-
304-
``subscribe_entities`` only forwards ``state_changed``; sensors with
305-
a constant value (e.g. solar production on an unloaded phase) never
306-
push, so the per-entity timer is not a reliable freshness signal.
307-
``GET /api/states/{eid}`` returns HA's ``last_reported``, which is
308-
mutated on every state write — including same-value reports — and
309-
is the authoritative source of truth.
310-
"""
311-
if self._session is None:
312-
return
313-
stale = self._locally_stale_entities()
314-
if not stale:
315-
return
316-
# Whatever finishes in-budget is already applied; anything still
317-
# stale after the timeout will be caught by ``_get_entity_value``.
318-
with contextlib.suppress(asyncio.TimeoutError):
319-
await asyncio.wait_for(
320-
asyncio.gather(
321-
*(self._fetch_rest_state(eid) for eid in stale),
322-
return_exceptions=True,
323-
),
324-
timeout=timeout,
325-
)
326-
327-
async def _fetch_rest_state(self, entity_id: str) -> None:
328-
assert self._session is not None
329-
# Snapshot the local update time so we can detect a concurrent
330-
# websocket push (or another in-flight REST refresh) and avoid
331-
# clobbering newer data with a potentially-older REST response.
332-
pre_update = self._entity_update_time.get(entity_id)
333-
url = self._build_rest_state_url(entity_id)
334-
headers = {"Authorization": f"Bearer {self.access_token}"}
335-
try:
336-
async with self._session.get(url, headers=headers) as resp:
337-
if resp.status != 200:
338-
logger.debug(
339-
"Home Assistant REST refresh for %s: HTTP %s",
340-
entity_id,
341-
resp.status,
342-
)
343-
return
344-
data = await resp.json()
345-
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
346-
logger.debug(
347-
"Home Assistant REST refresh for %s failed: %s", entity_id, exc
348-
)
349-
return
350-
if self._entity_update_time.get(entity_id) != pre_update:
351-
return
352-
if isinstance(data, dict):
353-
self._apply_rest_state(entity_id, data)
354-
355-
def _apply_rest_state(self, entity_id: str, data: dict[str, Any]) -> None:
356-
state_val = data.get("state")
357-
if state_val in (None, "unknown", "unavailable"):
358-
return
359-
try:
360-
value = float(state_val) # type: ignore[arg-type]
361-
except (ValueError, TypeError):
362-
return
363-
# Trust HA's ``last_reported`` (mutated on every state write).
364-
# If HA itself hasn't seen an update within the staleness window,
365-
# don't refresh local cache — let the staleness check raise.
366-
if self._max_state_age_seconds > 0:
367-
reported_iso = data.get("last_reported") or data.get("last_updated")
368-
if not isinstance(reported_iso, str):
369-
return
370-
try:
371-
reported_dt = datetime.fromisoformat(reported_iso)
372-
except ValueError:
373-
return
374-
if reported_dt.tzinfo is None:
375-
reported_dt = reported_dt.replace(tzinfo=timezone.utc)
376-
ha_age = (datetime.now(timezone.utc) - reported_dt).total_seconds()
377-
if ha_age > self._max_state_age_seconds:
378-
return
379-
self._entity_values[entity_id] = value
380-
self._entity_update_time[entity_id] = self._clock()
381-
self._check_entities_ready()
382-
self._message_event.set()
383-
384243
def _get_entity_value(self, entity_id: str) -> float:
385244
val = self._entity_values.get(entity_id)
386245
if val is None:
387246
raise ValueError(f"Home Assistant sensor {entity_id} has no state")
388-
if self._max_state_age_seconds > 0:
389-
last = self._entity_update_time.get(entity_id)
390-
if last is None:
391-
raise ValueError(
392-
f"Home Assistant sensor {entity_id} has no update timestamp"
393-
)
394-
age = self._clock() - last
395-
if age > self._max_state_age_seconds:
396-
raise ValueError(
397-
f"Home Assistant sensor {entity_id} is stale "
398-
f"({age:.1f}s old, max {self._max_state_age_seconds:.1f}s)"
399-
)
400247
return val
401248

402249
async def get_powermeter_watts(self) -> list[float]:
403-
await self._refresh_stale_via_rest()
404250
if not self.power_calculate:
405251
return [
406252
self._get_entity_value(entity) for entity in self.current_power_entity
407253
]
408-
else:
409-
if len(self.power_input_alias) != len(self.power_output_alias):
410-
raise ValueError(
411-
"Home Assistant power_input_alias and"
412-
" power_output_alias lengths differ"
413-
)
414-
results = []
415-
for in_entity, out_entity in zip(
416-
self.power_input_alias, self.power_output_alias, strict=False
417-
):
418-
power_in = self._get_entity_value(in_entity)
419-
power_out = self._get_entity_value(out_entity)
420-
results.append(power_in - power_out)
421-
return results
254+
if len(self.power_input_alias) != len(self.power_output_alias):
255+
raise ValueError(
256+
"Home Assistant power_input_alias and power_output_alias lengths differ"
257+
)
258+
results = []
259+
for in_entity, out_entity in zip(
260+
self.power_input_alias, self.power_output_alias, strict=False
261+
):
262+
power_in = self._get_entity_value(in_entity)
263+
power_out = self._get_entity_value(out_entity)
264+
results.append(power_in - power_out)
265+
return results
422266

423267
async def wait_for_message(self, timeout: float = 5) -> None:
424268
try:

0 commit comments

Comments
 (0)