Skip to content

Commit 30cc695

Browse files
tomquistclaude
andauthored
Add wait_for_next_message so CT002 serves fresh powermeter data (#322)
* Add wait_for_next_message so CT002 serves fresh powermeter data The CT002's before_send callback now awaits wait_for_next_message() before reading the powermeter, ensuring push-based meters (MQTT, SMA, HomeWizard, Home Assistant) deliver a fresh measurement for every battery request instead of re-serving stale cached values. * Fix wait_for_next_message returning early in multi-topic mode In multi-topic configurations, wait_for_next_message could return after receiving a message on just one topic, leaving other phase values as None and causing get_powermeter_watts() to fail. Now loops with a deadline until all subscribed values are populated, matching wait_for_message semantics but always waiting for a new message first. https://claude.ai/code/session_013EAvo4q396i36rhT45LNkp * Revert changelog entry for wait_for_next_message fix https://claude.ai/code/session_013EAvo4q396i36rhT45LNkp * Assert wait_for_next_message actually blocks in test Replace `await task` with `assert task.done()` after wait_for_next_message returns, so the test fails if the method returns before the setter task completes. https://claude.ai/code/session_013EAvo4q396i36rhT45LNkp * Format mqtt_test.py with ruff https://claude.ai/code/session_013EAvo4q396i36rhT45LNkp --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent fdc8056 commit 30cc695

17 files changed

Lines changed: 226 additions & 9 deletions

src/astrameter/ct002/ct002.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -457,8 +457,9 @@ def _build_response_fields(self, request_fields, values):
457457
]
458458

459459
phase_values = self._collect_reports_by_phase()
460+
phase_power = [phase_a, phase_b, phase_c]
460461
for phase, idx in (("A", 0), ("B", 1), ("C", 2)):
461-
if phase_values[phase]["active"]:
462+
if phase_values[phase]["active"] or phase_power[idx] != 0:
462463
response_fields[8 + idx] = "1"
463464
response_fields[15 + idx] = str(phase_values[phase]["chrg_power"])
464465
response_fields[20 + idx] = str(phase_values[phase]["dchrg_power"])
@@ -579,14 +580,13 @@ async def _handle_request(self, data, addr, transport):
579580
return
580581
self._last_response_time[addr] = current_time
581582

582-
if not in_inspection_mode:
583-
meter_dev_type = fields[0] if len(fields) > 0 else ""
584-
self._update_consumer_report(
585-
consumer_id,
586-
phase=reported_phase,
587-
power=reported_power,
588-
device_type=meter_dev_type,
589-
)
583+
meter_dev_type = fields[0] if len(fields) > 0 else ""
584+
self._update_consumer_report(
585+
consumer_id,
586+
phase=reported_phase if not in_inspection_mode else "A",
587+
power=reported_power,
588+
device_type=meter_dev_type,
589+
)
590590

591591
updated = await self._call_before_send(addr, fields, consumer_id)
592592
if updated is not None:

src/astrameter/main.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ async def update_readings(addr, _fields=None, _consumer_id=None):
211211
if powermeter is None:
212212
logger.debug(f"No powermeter found for client {addr[0]}")
213213
return None
214+
await powermeter.wait_for_next_message()
214215
values = await powermeter.get_powermeter_watts()
215216
value1 = values[0] if len(values) > 0 else 0
216217
value2 = values[1] if len(values) > 1 else 0

src/astrameter/powermeter/base.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,15 @@ async def get_powermeter_watts(self) -> list[float]:
66
async def wait_for_message(self, timeout=5):
77
pass
88

9+
async def wait_for_next_message(self, timeout=5):
10+
"""Block until a *new* measurement arrives (push-based powermeters).
11+
12+
Unlike ``wait_for_message`` (which returns immediately once data has
13+
been received *at least once*), this method waits for the *next*
14+
update, ensuring callers always get fresh data. Polling-based
15+
powermeters leave the default no-op.
16+
"""
17+
918
# --- Lifecycle (no-op by default, override for push-based powermeters) ---
1019

1120
async def start(self):

src/astrameter/powermeter/homeassistant.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def __init__(
7777
self._session: aiohttp.ClientSession | None = None
7878
self._ws_task: asyncio.Task[None] | None = None
7979
self._entities_ready = asyncio.Event()
80+
self._message_event = asyncio.Event()
8081

8182
def _collect_entities(self) -> set[str]:
8283
if self.power_calculate:
@@ -233,6 +234,7 @@ def _update_entity_value(self, entity_id: str, state_val: object) -> None:
233234
self._entity_values[entity_id] = None
234235
self._entity_update_time[entity_id] = None
235236
self._check_entities_ready()
237+
self._message_event.set()
236238

237239
def _check_entities_ready(self) -> None:
238240
ready = all(
@@ -288,3 +290,10 @@ async def wait_for_message(self, timeout: float = 5) -> None:
288290
await asyncio.wait_for(self._entities_ready.wait(), timeout=timeout)
289291
except asyncio.TimeoutError:
290292
raise TimeoutError("Timeout waiting for Home Assistant state") from None
293+
294+
async def wait_for_next_message(self, timeout: float = 5) -> None:
295+
self._message_event.clear()
296+
try:
297+
await asyncio.wait_for(self._message_event.wait(), timeout=timeout)
298+
except asyncio.TimeoutError:
299+
raise TimeoutError("Timeout waiting for Home Assistant state") from None

src/astrameter/powermeter/homeassistant_test.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import json
23
from unittest.mock import AsyncMock, patch
34

@@ -359,6 +360,34 @@ async def test_wait_for_message_timeout():
359360
await pm.wait_for_message(timeout=0)
360361

361362

363+
# wait_for_next_message tests
364+
365+
366+
async def test_wait_for_next_message_blocks_until_new():
367+
pm = _create_powermeter()
368+
await _simulate_auth_and_states(
369+
pm, [{"entity_id": "sensor.current_power", "state": "100"}]
370+
)
371+
372+
async def _push_later():
373+
await asyncio.sleep(0.05)
374+
pm._update_entity_value("sensor.current_power", "200")
375+
376+
task = asyncio.create_task(_push_later())
377+
await pm.wait_for_next_message(timeout=2)
378+
await task
379+
assert await pm.get_powermeter_watts() == [200.0]
380+
381+
382+
async def test_wait_for_next_message_timeout():
383+
pm = _create_powermeter()
384+
await _simulate_auth_and_states(
385+
pm, [{"entity_id": "sensor.current_power", "state": "100"}]
386+
)
387+
with pytest.raises(TimeoutError):
388+
await pm.wait_for_next_message(timeout=0)
389+
390+
362391
# subscribe_entities entity list test
363392

364393

src/astrameter/powermeter/homewizard.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,3 +238,10 @@ async def wait_for_message(self, timeout: float = 5) -> None:
238238
await asyncio.wait_for(self._message_event.wait(), timeout=timeout)
239239
except asyncio.TimeoutError:
240240
raise TimeoutError("Timeout waiting for HomeWizard measurement") from None
241+
242+
async def wait_for_next_message(self, timeout: float = 5) -> None:
243+
self._message_event.clear()
244+
try:
245+
await asyncio.wait_for(self._message_event.wait(), timeout=timeout)
246+
except asyncio.TimeoutError:
247+
raise TimeoutError("Timeout waiting for HomeWizard measurement") from None

src/astrameter/powermeter/homewizard_test.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,27 @@ async def test_wait_for_message_timeout():
252252
await pm.wait_for_message(timeout=0)
253253

254254

255+
async def test_wait_for_next_message_blocks_until_new():
256+
pm = _create_powermeter()
257+
pm._handle_measurement({"power_w": 100})
258+
259+
async def _push_later():
260+
await asyncio.sleep(0.05)
261+
pm._handle_measurement({"power_w": 200})
262+
263+
task = asyncio.create_task(_push_later())
264+
await pm.wait_for_next_message(timeout=2)
265+
await task
266+
assert await pm.get_powermeter_watts() == [200]
267+
268+
269+
async def test_wait_for_next_message_timeout():
270+
pm = _create_powermeter()
271+
pm._handle_measurement({"power_w": 100})
272+
with pytest.raises(TimeoutError):
273+
await pm.wait_for_next_message(timeout=0)
274+
275+
255276
# --- Category F: Lifecycle ---
256277

257278

src/astrameter/powermeter/mqtt.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,19 @@ async def wait_for_message(self, timeout=5):
172172
raise TimeoutError("Timeout waiting for MQTT message") from None
173173
if all(v is not None for v in self.values):
174174
return
175+
176+
async def wait_for_next_message(self, timeout=5):
177+
self._message_event.clear()
178+
loop = asyncio.get_running_loop()
179+
deadline = loop.time() + timeout
180+
while True:
181+
remaining = deadline - loop.time()
182+
if remaining <= 0:
183+
raise TimeoutError("Timeout waiting for MQTT message")
184+
try:
185+
await asyncio.wait_for(self._message_event.wait(), timeout=remaining)
186+
except asyncio.TimeoutError:
187+
raise TimeoutError("Timeout waiting for MQTT message") from None
188+
if all(v is not None for v in self.values):
189+
return
190+
self._message_event.clear()

src/astrameter/powermeter/mqtt_test.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,50 @@ async def _set_later():
100100
assert pm.value == 99.0
101101

102102

103+
async def test_wait_for_next_message_blocks_until_new():
104+
pm = _make_pm()
105+
pm.value = 1.0
106+
pm._message_event.set()
107+
108+
async def _set_later():
109+
await asyncio.sleep(0.05)
110+
pm.value = 42.0
111+
pm._message_event.set()
112+
113+
task = asyncio.create_task(_set_later())
114+
await pm.wait_for_next_message(timeout=2)
115+
assert task.done(), (
116+
"wait_for_next_message returned before the setter task completed"
117+
)
118+
assert pm.value == 42.0
119+
120+
121+
async def test_wait_for_next_message_times_out():
122+
pm = _make_pm()
123+
pm.value = 1.0
124+
pm._message_event.set()
125+
with pytest.raises(TimeoutError, match="Timeout waiting"):
126+
await pm.wait_for_next_message(timeout=0.1)
127+
128+
129+
async def test_wait_for_next_message_multi_topic_cold_start():
130+
pm = _make_pm(topic=["t1", "t2"])
131+
# Cold start: no values set, event not set
132+
133+
async def _set_staggered():
134+
await asyncio.sleep(0.05)
135+
pm.values[0] = 100.0
136+
pm._message_event.set()
137+
await asyncio.sleep(0.05)
138+
pm.values[1] = 200.0
139+
pm._message_event.set()
140+
141+
task = asyncio.create_task(_set_staggered())
142+
await pm.wait_for_next_message(timeout=2)
143+
await task
144+
assert await pm.get_powermeter_watts() == [100.0, 200.0]
145+
146+
103147
# ---------------------------------------------------------------------------
104148
# Multi-phase constructor unit tests
105149
# ---------------------------------------------------------------------------

src/astrameter/powermeter/pid.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ async def wait_for_message(self, timeout=5):
9595
"""Pass through to wrapped powermeter."""
9696
return await self.wrapped_powermeter.wait_for_message(timeout)
9797

98+
async def wait_for_next_message(self, timeout=5):
99+
return await self.wrapped_powermeter.wait_for_next_message(timeout)
100+
98101
async def start(self):
99102
"""Pass through to wrapped powermeter."""
100103
await self.wrapped_powermeter.start()

0 commit comments

Comments
 (0)