Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next

- **Fixed** Home Assistant sensors falsely reported as stale when their value doesn't change for a while (e.g. solar production on an unloaded phase, or at night) ([#363](https://github.com/tomquist/astrameter/issues/363)).

## 2.0.0

Expand Down
151 changes: 140 additions & 11 deletions src/astrameter/powermeter/homeassistant.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import time
from collections.abc import Callable
from datetime import datetime, timezone
from typing import Any

import aiohttp
Expand All @@ -15,17 +16,29 @@

# Home Assistant websocket subscribe_entities compressed state (homeassistant.const)
_HA_S = "s"
_HA_LU = "lu"
_HA_LC = "lc"
_HA_DIFF_ADD = "+"

# WebSocket heartbeat (seconds) — same rationale as HomeWizard.
WS_HEARTBEAT_SECONDS = 30.0

# An entity older than this is considered stale. HA typically pushes
# state changes on every update plus periodic keepalives; anything
# past 60 s without a push from a power sensor is strongly suspicious
# and consistent with a stalled websocket stream.
# An entity older than this is considered stale by the local push timer.
# Crossing this threshold triggers the REST fallback (see below), not an
# immediate error: HA's ``subscribe_entities`` only forwards
# ``state_changed`` events, so a sensor with a constant value (e.g. solar
# production on an unloaded phase) produces no pushes even when HA itself
# is up to date.
DEFAULT_MAX_STATE_AGE_SECONDS = 60.0

# Total wall-clock budget for the REST staleness fallback. When local push
# silence exceeds ``max_state_age_seconds`` for any tracked entity, we
# fan out parallel ``GET /api/states/{entity}`` requests bounded by this
# deadline; HA returns ``last_reported`` (mutated on every state write,
# including same-value reports), which we use as the authoritative
# freshness signal. Bounded so a battery's UDP request never stalls.
REST_REFRESH_TIMEOUT_SECONDS = 1.0


class HomeAssistant(Powermeter):
def __init__(
Expand Down Expand Up @@ -91,6 +104,11 @@ def _build_ws_url(self) -> str:
prefix = self.path_prefix or ""
return f"{scheme}://{self.ip}:{self.port}{prefix}/api/websocket"

def _build_rest_state_url(self, entity_id: str) -> str:
scheme = "https" if self.use_https else "http"
prefix = self.path_prefix or ""
return f"{scheme}://{self.ip}:{self.port}{prefix}/api/states/{entity_id}"

def _next_id(self) -> int:
self._msg_id += 1
return self._msg_id
Expand Down Expand Up @@ -137,12 +155,13 @@ async def _ws_loop(self) -> None:
logger.error("Home Assistant WebSocket error: %s", e, exc_info=True)
# Reset protocol state for reconnection; keep _entity_values
# as a courtesy, but mark them all stale so the staleness
# check in _get_entity_value will raise until fresh state
# pushes arrive from the reconnect. ``_entities_ready``
# must also clear, otherwise ``wait_for_message()`` would
# return immediately for any caller relying on it as a
# readiness signal even though every entity is effectively
# stale until the next ``subscribe_entities`` snapshot.
# check in _get_entity_value falls back to REST (or raises)
# until fresh state pushes arrive from the reconnect.
# ``_entities_ready`` must also clear, otherwise
# ``wait_for_message()`` would return immediately for any
# caller relying on it as a readiness signal even though
# every entity is effectively stale until the next
# ``subscribe_entities`` snapshot.
self._msg_id = 0
self._subscribe_entities_id = None
for eid in list(self._entity_update_time):
Expand All @@ -167,8 +186,17 @@ def _handle_compressed_entity_event(self, ev: dict[str, Any]) -> None:
if eid not in self._tracked_entities or not isinstance(diff, dict):
continue
plus = diff.get(_HA_DIFF_ADD)
if isinstance(plus, dict) and _HA_S in plus:
if not isinstance(plus, dict):
continue
if _HA_S in plus:
self._update_entity_value(eid, plus.get(_HA_S))
elif _HA_LU in plus or _HA_LC in plus:
# state_reported (value unchanged): HA omits ``s`` and
# sends only ``lu``. Treat as a keepalive so the
# staleness check does not fire on sensors whose value
# is legitimately constant (e.g. solar production at
# night, an unloaded phase).
self._mark_entity_alive(eid)
removals = ev.get("r")
if isinstance(removals, list):
for eid in removals:
Expand Down Expand Up @@ -236,6 +264,12 @@ def _update_entity_value(self, entity_id: str, state_val: object) -> None:
self._check_entities_ready()
self._message_event.set()

def _mark_entity_alive(self, entity_id: str) -> None:
if self._entity_values.get(entity_id) is None:
return
self._entity_update_time[entity_id] = self._clock()
self._message_event.set()

def _check_entities_ready(self) -> None:
ready = all(
self._entity_values.get(e) is not None
Expand All @@ -247,6 +281,100 @@ def _check_entities_ready(self) -> None:
else:
self._entities_ready.clear()

def _locally_stale_entities(self) -> list[str]:
if self._max_state_age_seconds <= 0:
return []
now = self._clock()
stale: list[str] = []
for eid in self._tracked_entities:
if self._entity_values.get(eid) is None:
stale.append(eid)
continue
last = self._entity_update_time.get(eid)
if last is None or (now - last) > self._max_state_age_seconds:
stale.append(eid)
return stale

async def _refresh_stale_via_rest(
self, timeout: float = REST_REFRESH_TIMEOUT_SECONDS
) -> None:
"""REST-poll any entity whose local push timer has crossed the
staleness threshold, bounded by ``timeout`` total wall-clock.

``subscribe_entities`` only forwards ``state_changed``; sensors with
a constant value (e.g. solar production on an unloaded phase) never
push, so the per-entity timer is not a reliable freshness signal.
``GET /api/states/{eid}`` returns HA's ``last_reported``, which is
mutated on every state write — including same-value reports — and
is the authoritative source of truth.
"""
if self._session is None:
return
stale = self._locally_stale_entities()
if not stale:
return
# Whatever finishes in-budget is already applied; anything still
# stale after the timeout will be caught by ``_get_entity_value``.
with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(
asyncio.gather(
*(self._fetch_rest_state(eid) for eid in stale),
return_exceptions=True,
),
timeout=timeout,
)

async def _fetch_rest_state(self, entity_id: str) -> None:
assert self._session is not None
url = self._build_rest_state_url(entity_id)
headers = {"Authorization": f"Bearer {self.access_token}"}
try:
async with self._session.get(url, headers=headers) as resp:
if resp.status != 200:
logger.debug(
"Home Assistant REST refresh for %s: HTTP %s",
entity_id,
resp.status,
)
return
data = await resp.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
logger.debug(
"Home Assistant REST refresh for %s failed: %s", entity_id, exc
)
return
if isinstance(data, dict):
self._apply_rest_state(entity_id, data)

def _apply_rest_state(self, entity_id: str, data: dict[str, Any]) -> None:
state_val = data.get("state")
if state_val in (None, "unknown", "unavailable"):
return
try:
value = float(state_val) # type: ignore[arg-type]
except (ValueError, TypeError):
return
# Trust HA's ``last_reported`` (mutated on every state write).
# If HA itself hasn't seen an update within the staleness window,
# don't refresh local cache — let the staleness check raise.
if self._max_state_age_seconds > 0:
reported_iso = data.get("last_reported") or data.get("last_updated")
if not isinstance(reported_iso, str):
return
try:
reported_dt = datetime.fromisoformat(reported_iso)
except ValueError:
return
if reported_dt.tzinfo is None:
reported_dt = reported_dt.replace(tzinfo=timezone.utc)
ha_age = (datetime.now(timezone.utc) - reported_dt).total_seconds()
if ha_age > self._max_state_age_seconds:
return
self._entity_values[entity_id] = value
self._entity_update_time[entity_id] = self._clock()
self._check_entities_ready()
self._message_event.set()
Comment on lines +327 to +382
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Apply REST refreshes using HA’s actual age, and only if the cache is still older than the request.

Lines 373-374 currently reset _entity_update_time to self._clock() after any fresh REST response. That discards HA’s reported age, so a value that was already almost stale can get another full freshness window after one REST hit. The same path also unconditionally overwrites the cache after async I/O, so a websocket update that lands while the GET is in flight can be replaced by an older REST snapshot.

Suggested direction
-    async def _fetch_rest_state(self, entity_id: str) -> None:
+    async def _fetch_rest_state(self, entity_id: str) -> None:
         assert self._session is not None
+        request_started = self._clock()
         url = self._build_rest_state_url(entity_id)
         headers = {"Authorization": f"Bearer {self.access_token}"}
         try:
             async with self._session.get(url, headers=headers) as resp:
                 if resp.status != 200:
@@
-        if isinstance(data, dict):
-            self._apply_rest_state(entity_id, data)
+        if isinstance(data, dict):
+            self._apply_rest_state(entity_id, data, request_started)

-    def _apply_rest_state(self, entity_id: str, data: dict[str, Any]) -> None:
+    def _apply_rest_state(
+        self, entity_id: str, data: dict[str, Any], request_started: float
+    ) -> None:
         state_val = data.get("state")
         if state_val in (None, "unknown", "unavailable"):
             return
         try:
             value = float(state_val)  # type: ignore[arg-type]
         except (ValueError, TypeError):
             return
+        current_update = self._entity_update_time.get(entity_id)
+        if current_update is not None and current_update > request_started:
+            return
+
+        ha_age = 0.0
         if self._max_state_age_seconds > 0:
             reported_iso = data.get("last_reported") or data.get("last_updated")
             if not isinstance(reported_iso, str):
                 return
             try:
                 reported_dt = datetime.fromisoformat(reported_iso)
             except ValueError:
                 return
             if reported_dt.tzinfo is None:
                 reported_dt = reported_dt.replace(tzinfo=timezone.utc)
-            ha_age = (datetime.now(timezone.utc) - reported_dt).total_seconds()
+            ha_age = max(
+                0.0, (datetime.now(timezone.utc) - reported_dt).total_seconds()
+            )
             if ha_age > self._max_state_age_seconds:
                 return
         self._entity_values[entity_id] = value
-        self._entity_update_time[entity_id] = self._clock()
+        self._entity_update_time[entity_id] = self._clock() - ha_age
         self._check_entities_ready()
         self._message_event.set()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def _fetch_rest_state(self, entity_id: str) -> None:
assert self._session is not None
url = self._build_rest_state_url(entity_id)
headers = {"Authorization": f"Bearer {self.access_token}"}
try:
async with self._session.get(url, headers=headers) as resp:
if resp.status != 200:
logger.debug(
"Home Assistant REST refresh for %s: HTTP %s",
entity_id,
resp.status,
)
return
data = await resp.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
logger.debug(
"Home Assistant REST refresh for %s failed: %s", entity_id, exc
)
return
if isinstance(data, dict):
self._apply_rest_state(entity_id, data)
def _apply_rest_state(self, entity_id: str, data: dict[str, Any]) -> None:
state_val = data.get("state")
if state_val in (None, "unknown", "unavailable"):
return
try:
value = float(state_val) # type: ignore[arg-type]
except (ValueError, TypeError):
return
# Trust HA's ``last_reported`` (mutated on every state write).
# If HA itself hasn't seen an update within the staleness window,
# don't refresh local cache — let the staleness check raise.
if self._max_state_age_seconds > 0:
reported_iso = data.get("last_reported") or data.get("last_updated")
if not isinstance(reported_iso, str):
return
try:
reported_dt = datetime.fromisoformat(reported_iso)
except ValueError:
return
if reported_dt.tzinfo is None:
reported_dt = reported_dt.replace(tzinfo=timezone.utc)
ha_age = (datetime.now(timezone.utc) - reported_dt).total_seconds()
if ha_age > self._max_state_age_seconds:
return
self._entity_values[entity_id] = value
self._entity_update_time[entity_id] = self._clock()
self._check_entities_ready()
self._message_event.set()
async def _fetch_rest_state(self, entity_id: str) -> None:
assert self._session is not None
request_started = self._clock()
url = self._build_rest_state_url(entity_id)
headers = {"Authorization": f"Bearer {self.access_token}"}
try:
async with self._session.get(url, headers=headers) as resp:
if resp.status != 200:
logger.debug(
"Home Assistant REST refresh for %s: HTTP %s",
entity_id,
resp.status,
)
return
data = await resp.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
logger.debug(
"Home Assistant REST refresh for %s failed: %s", entity_id, exc
)
return
if isinstance(data, dict):
self._apply_rest_state(entity_id, data, request_started)
def _apply_rest_state(
self, entity_id: str, data: dict[str, Any], request_started: float
) -> None:
state_val = data.get("state")
if state_val in (None, "unknown", "unavailable"):
return
try:
value = float(state_val) # type: ignore[arg-type]
except (ValueError, TypeError):
return
current_update = self._entity_update_time.get(entity_id)
if current_update is not None and current_update > request_started:
return
ha_age = 0.0
# Trust HA's ``last_reported`` (mutated on every state write).
# If HA itself hasn't seen an update within the staleness window,
# don't refresh local cache — let the staleness check raise.
if self._max_state_age_seconds > 0:
reported_iso = data.get("last_reported") or data.get("last_updated")
if not isinstance(reported_iso, str):
return
try:
reported_dt = datetime.fromisoformat(reported_iso)
except ValueError:
return
if reported_dt.tzinfo is None:
reported_dt = reported_dt.replace(tzinfo=timezone.utc)
ha_age = max(
0.0, (datetime.now(timezone.utc) - reported_dt).total_seconds()
)
if ha_age > self._max_state_age_seconds:
return
self._entity_values[entity_id] = value
self._entity_update_time[entity_id] = self._clock() - ha_age
self._check_entities_ready()
self._message_event.set()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/astrameter/powermeter/homeassistant.py` around lines 327 - 376, The REST
handler currently writes the fetched value and sets _entity_update_time to
self._clock(), which discards Home Assistant's reported timestamp and can
overwrite newer websocket updates; in _apply_rest_state use the parsed
reported_dt (from "last_reported" or "last_updated") as the source-of-truth
timestamp and only apply the REST snapshot if reported_dt is within
_max_state_age_seconds and is strictly newer than any existing cached timestamp
for that entity (compare reported_dt to the existing _entity_update_time
converted to a compatible timezone-aware datetime or store timestamps
consistently), then set _entity_values[entity_id] and
_entity_update_time[entity_id] from reported_dt (not self._clock()), and finally
call _check_entities_ready() and _message_event.set() as before; reference
symbols: _apply_rest_state, _entity_values, _entity_update_time,
_max_state_age_seconds, _clock, _check_entities_ready, _message_event.


def _get_entity_value(self, entity_id: str) -> float:
val = self._entity_values.get(entity_id)
if val is None:
Expand All @@ -266,6 +394,7 @@ def _get_entity_value(self, entity_id: str) -> float:
return val

async def get_powermeter_watts(self) -> list[float]:
await self._refresh_stale_via_rest()
if not self.power_calculate:
return [
self._get_entity_value(entity) for entity in self.current_power_entity
Expand Down
Loading
Loading