Treat Home Assistant state_reported keepalives as liveness signals#364
Conversation
HA's subscribe_entities feed omits the state field from compressed diffs when a sensor is reported with an unchanged value, sending only an updated timestamp. The previous code skipped those events entirely, so the per-entity update_time never refreshed and the 60 s staleness check fired falsely for sensors whose value is legitimately constant (e.g. solar production on an unloaded phase, an empty production sensor at night). Recognize bare ``lu``/``lc`` diffs as keepalives so the staleness check only trips when the websocket has truly gone silent. Fixes #363.
…ensors
HA's subscribe_entities only forwards EVENT_STATE_CHANGED; sensors whose
value doesn't change (e.g. solar production on an unloaded phase) fire
EVENT_STATE_REPORTED instead and never reach our websocket, so the
per-entity push timer drifts past the 60 s staleness threshold even
though HA itself is current.
When local push silence crosses the threshold, fan out parallel
GET /api/states/{entity} requests bounded by a 1 s total wall-clock
budget (so a battery's UDP request never stalls). HA's response includes
last_reported, which is mutated on every state write including same-value
reports — use that as the authoritative freshness signal. If HA's own
last_reported is also stale, leave the local cache untouched and let the
existing staleness check raise.
Also recognize bare lu/lc diffs from subscribe_entities (state unchanged
but attributes changed) as liveness keepalives, sparing the REST round
trip in that narrower case.
Fixes #363.
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
WalkthroughThis PR fixes false stale-state errors in Home Assistant sensor integration by detecting websocket keepalives and adding a REST fallback. Sensors with constant values no longer incorrectly report as stale. The fix includes compressed-diff keepalive recognition, a bounded parallel REST refresh mechanism, and comprehensive test coverage. ChangesHome Assistant stale state detection fix via websocket keepalives and REST fallback
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/astrameter/powermeter/homeassistant.py`:
- Around line 327-376: The REST handler currently writes the fetched value and
sets _entity_update_time to self._clock(), which discards Home Assistant's
reported timestamp and can overwrite newer websocket updates; in
_apply_rest_state use the parsed reported_dt (from "last_reported" or
"last_updated") as the source-of-truth timestamp and only apply the REST
snapshot if reported_dt is within _max_state_age_seconds and is strictly newer
than any existing cached timestamp for that entity (compare reported_dt to the
existing _entity_update_time converted to a compatible timezone-aware datetime
or store timestamps consistently), then set _entity_values[entity_id] and
_entity_update_time[entity_id] from reported_dt (not self._clock()), and finally
call _check_entities_ready() and _message_event.set() as before; reference
symbols: _apply_rest_state, _entity_values, _entity_update_time,
_max_state_age_seconds, _clock, _check_entities_ready, _message_event.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0d9eb536-14e8-40d0-9ba1-11b3c5461945
📒 Files selected for processing (3)
CHANGELOG.mdsrc/astrameter/powermeter/homeassistant.pysrc/astrameter/powermeter/homeassistant_test.py
| async def _fetch_rest_state(self, entity_id: str) -> None: | ||
| assert self._session is not None | ||
| url = self._build_rest_state_url(entity_id) | ||
| headers = {"Authorization": f"Bearer {self.access_token}"} | ||
| try: | ||
| async with self._session.get(url, headers=headers) as resp: | ||
| if resp.status != 200: | ||
| logger.debug( | ||
| "Home Assistant REST refresh for %s: HTTP %s", | ||
| entity_id, | ||
| resp.status, | ||
| ) | ||
| return | ||
| data = await resp.json() | ||
| except (aiohttp.ClientError, asyncio.TimeoutError) as exc: | ||
| logger.debug( | ||
| "Home Assistant REST refresh for %s failed: %s", entity_id, exc | ||
| ) | ||
| return | ||
| if isinstance(data, dict): | ||
| self._apply_rest_state(entity_id, data) | ||
|
|
||
| def _apply_rest_state(self, entity_id: str, data: dict[str, Any]) -> None: | ||
| state_val = data.get("state") | ||
| if state_val in (None, "unknown", "unavailable"): | ||
| return | ||
| try: | ||
| value = float(state_val) # type: ignore[arg-type] | ||
| except (ValueError, TypeError): | ||
| return | ||
| # Trust HA's ``last_reported`` (mutated on every state write). | ||
| # If HA itself hasn't seen an update within the staleness window, | ||
| # don't refresh local cache — let the staleness check raise. | ||
| if self._max_state_age_seconds > 0: | ||
| reported_iso = data.get("last_reported") or data.get("last_updated") | ||
| if not isinstance(reported_iso, str): | ||
| return | ||
| try: | ||
| reported_dt = datetime.fromisoformat(reported_iso) | ||
| except ValueError: | ||
| return | ||
| if reported_dt.tzinfo is None: | ||
| reported_dt = reported_dt.replace(tzinfo=timezone.utc) | ||
| ha_age = (datetime.now(timezone.utc) - reported_dt).total_seconds() | ||
| if ha_age > self._max_state_age_seconds: | ||
| return | ||
| self._entity_values[entity_id] = value | ||
| self._entity_update_time[entity_id] = self._clock() | ||
| self._check_entities_ready() | ||
| self._message_event.set() |
There was a problem hiding this comment.
Apply REST refreshes using HA’s actual age, and only if the cache is still older than the request.
Lines 373-374 currently reset _entity_update_time to self._clock() after any fresh REST response. That discards HA’s reported age, so a value that was already almost stale can get another full freshness window after one REST hit. The same path also unconditionally overwrites the cache after async I/O, so a websocket update that lands while the GET is in flight can be replaced by an older REST snapshot.
Suggested direction
- async def _fetch_rest_state(self, entity_id: str) -> None:
+ async def _fetch_rest_state(self, entity_id: str) -> None:
assert self._session is not None
+ request_started = self._clock()
url = self._build_rest_state_url(entity_id)
headers = {"Authorization": f"Bearer {self.access_token}"}
try:
async with self._session.get(url, headers=headers) as resp:
if resp.status != 200:
@@
- if isinstance(data, dict):
- self._apply_rest_state(entity_id, data)
+ if isinstance(data, dict):
+ self._apply_rest_state(entity_id, data, request_started)
- def _apply_rest_state(self, entity_id: str, data: dict[str, Any]) -> None:
+ def _apply_rest_state(
+ self, entity_id: str, data: dict[str, Any], request_started: float
+ ) -> None:
state_val = data.get("state")
if state_val in (None, "unknown", "unavailable"):
return
try:
value = float(state_val) # type: ignore[arg-type]
except (ValueError, TypeError):
return
+ current_update = self._entity_update_time.get(entity_id)
+ if current_update is not None and current_update > request_started:
+ return
+
+ ha_age = 0.0
if self._max_state_age_seconds > 0:
reported_iso = data.get("last_reported") or data.get("last_updated")
if not isinstance(reported_iso, str):
return
try:
reported_dt = datetime.fromisoformat(reported_iso)
except ValueError:
return
if reported_dt.tzinfo is None:
reported_dt = reported_dt.replace(tzinfo=timezone.utc)
- ha_age = (datetime.now(timezone.utc) - reported_dt).total_seconds()
+ ha_age = max(
+ 0.0, (datetime.now(timezone.utc) - reported_dt).total_seconds()
+ )
if ha_age > self._max_state_age_seconds:
return
self._entity_values[entity_id] = value
- self._entity_update_time[entity_id] = self._clock()
+ self._entity_update_time[entity_id] = self._clock() - ha_age
self._check_entities_ready()
self._message_event.set()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def _fetch_rest_state(self, entity_id: str) -> None: | |
| assert self._session is not None | |
| url = self._build_rest_state_url(entity_id) | |
| headers = {"Authorization": f"Bearer {self.access_token}"} | |
| try: | |
| async with self._session.get(url, headers=headers) as resp: | |
| if resp.status != 200: | |
| logger.debug( | |
| "Home Assistant REST refresh for %s: HTTP %s", | |
| entity_id, | |
| resp.status, | |
| ) | |
| return | |
| data = await resp.json() | |
| except (aiohttp.ClientError, asyncio.TimeoutError) as exc: | |
| logger.debug( | |
| "Home Assistant REST refresh for %s failed: %s", entity_id, exc | |
| ) | |
| return | |
| if isinstance(data, dict): | |
| self._apply_rest_state(entity_id, data) | |
| def _apply_rest_state(self, entity_id: str, data: dict[str, Any]) -> None: | |
| state_val = data.get("state") | |
| if state_val in (None, "unknown", "unavailable"): | |
| return | |
| try: | |
| value = float(state_val) # type: ignore[arg-type] | |
| except (ValueError, TypeError): | |
| return | |
| # Trust HA's ``last_reported`` (mutated on every state write). | |
| # If HA itself hasn't seen an update within the staleness window, | |
| # don't refresh local cache — let the staleness check raise. | |
| if self._max_state_age_seconds > 0: | |
| reported_iso = data.get("last_reported") or data.get("last_updated") | |
| if not isinstance(reported_iso, str): | |
| return | |
| try: | |
| reported_dt = datetime.fromisoformat(reported_iso) | |
| except ValueError: | |
| return | |
| if reported_dt.tzinfo is None: | |
| reported_dt = reported_dt.replace(tzinfo=timezone.utc) | |
| ha_age = (datetime.now(timezone.utc) - reported_dt).total_seconds() | |
| if ha_age > self._max_state_age_seconds: | |
| return | |
| self._entity_values[entity_id] = value | |
| self._entity_update_time[entity_id] = self._clock() | |
| self._check_entities_ready() | |
| self._message_event.set() | |
| async def _fetch_rest_state(self, entity_id: str) -> None: | |
| assert self._session is not None | |
| request_started = self._clock() | |
| url = self._build_rest_state_url(entity_id) | |
| headers = {"Authorization": f"Bearer {self.access_token}"} | |
| try: | |
| async with self._session.get(url, headers=headers) as resp: | |
| if resp.status != 200: | |
| logger.debug( | |
| "Home Assistant REST refresh for %s: HTTP %s", | |
| entity_id, | |
| resp.status, | |
| ) | |
| return | |
| data = await resp.json() | |
| except (aiohttp.ClientError, asyncio.TimeoutError) as exc: | |
| logger.debug( | |
| "Home Assistant REST refresh for %s failed: %s", entity_id, exc | |
| ) | |
| return | |
| if isinstance(data, dict): | |
| self._apply_rest_state(entity_id, data, request_started) | |
| def _apply_rest_state( | |
| self, entity_id: str, data: dict[str, Any], request_started: float | |
| ) -> None: | |
| state_val = data.get("state") | |
| if state_val in (None, "unknown", "unavailable"): | |
| return | |
| try: | |
| value = float(state_val) # type: ignore[arg-type] | |
| except (ValueError, TypeError): | |
| return | |
| current_update = self._entity_update_time.get(entity_id) | |
| if current_update is not None and current_update > request_started: | |
| return | |
| ha_age = 0.0 | |
| # Trust HA's ``last_reported`` (mutated on every state write). | |
| # If HA itself hasn't seen an update within the staleness window, | |
| # don't refresh local cache — let the staleness check raise. | |
| if self._max_state_age_seconds > 0: | |
| reported_iso = data.get("last_reported") or data.get("last_updated") | |
| if not isinstance(reported_iso, str): | |
| return | |
| try: | |
| reported_dt = datetime.fromisoformat(reported_iso) | |
| except ValueError: | |
| return | |
| if reported_dt.tzinfo is None: | |
| reported_dt = reported_dt.replace(tzinfo=timezone.utc) | |
| ha_age = max( | |
| 0.0, (datetime.now(timezone.utc) - reported_dt).total_seconds() | |
| ) | |
| if ha_age > self._max_state_age_seconds: | |
| return | |
| self._entity_values[entity_id] = value | |
| self._entity_update_time[entity_id] = self._clock() - ha_age | |
| self._check_entities_ready() | |
| self._message_event.set() |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/astrameter/powermeter/homeassistant.py` around lines 327 - 376, The REST
handler currently writes the fetched value and sets _entity_update_time to
self._clock(), which discards Home Assistant's reported timestamp and can
overwrite newer websocket updates; in _apply_rest_state use the parsed
reported_dt (from "last_reported" or "last_updated") as the source-of-truth
timestamp and only apply the REST snapshot if reported_dt is within
_max_state_age_seconds and is strictly newer than any existing cached timestamp
for that entity (compare reported_dt to the existing _entity_update_time
converted to a compatible timezone-aware datetime or store timestamps
consistently), then set _entity_values[entity_id] and
_entity_update_time[entity_id] from reported_dt (not self._clock()), and finally
call _check_entities_ready() and _message_event.set() as before; reference
symbols: _apply_rest_state, _entity_values, _entity_update_time,
_max_state_age_seconds, _clock, _check_entities_ready, _message_event.
If a state_changed push arrives while a REST staleness fallback is in flight for the same entity, the WS value is at least as fresh as anything the REST round-trip can return. Snapshot the local update timestamp before the await and skip applying the REST response if it changed, so the push wins instead of being clobbered by a stale REST read.
https://claude.ai/code/session_01LwA9fBRZcLjRi7fEU4fHyw
Summary by CodeRabbit
Bug Fixes
Tests