Skip to content

Commit dc20442

Browse files
tomquistclaude
andauthored
Add configurable wait-for-fresh-push behavior for event-driven powermeters (#330)
* Stop wait_for_next_message regressions from breaking battery responses PR #322 wired wait_for_next_message into update_readings so push-based powermeters serve fresh data on every CT002 request. Two regressions followed: 1. Slow event-driven meters (e.g. Home Assistant wrapping a P1 that ticks every 10s, or any source with high THROTTLE_INTERVAL) raise TimeoutError on every request, which surfaces as "CT002 before_send failed" warnings and skips the cached value that would have been perfectly usable (issue #327). 2. MQTT multi-topic kept clearing the event whenever the waker happened to be a phase that had already published, so any topic still at None would force the full 5s timeout even though a fresh message had arrived. Treat wait_for_next_message as best-effort freshness: - update_readings caps the wait at 2s and swallows TimeoutError (cached value is served, which matches behaviour pre-#322). - Add WAIT_FOR_NEXT_MESSAGE (global [GENERAL] + per-section) so users can opt out of the wait entirely for sources that always update slower than the cap. Defaults to true. - MQTT wait_for_next_message returns on any next message instead of re-arming the event when not all topics have ever published. Refactor the closure inside run_device into a module-level read_ct_powermeter helper so the swallow-timeout behaviour is unit-testable. * Revert CHANGELOG entry for wait_for_next_message fix * Expose WAIT_FOR_NEXT_MESSAGE in the Home Assistant app config Adds a "Wait For Fresh Home Assistant Reading" toggle (default true) to the HA app so users with slow-updating source sensors (e.g. P1 smart meters) can opt out of the up-to-2s freshness wait and serve the last-known value immediately. * Honor WAIT_FOR_NEXT_MESSAGE in the Shelly emulator path The Shelly UDP handler was ignoring the per-powermeter wait flag, so a Shelly-emulated device would never wait for a fresh push from an event-driven powermeter (behaviour matched pre-#322). Mirror the CT path: read the flag when matching addr to powermeter, then best-effort await ``wait_for_next_message(timeout=2)`` with ``TimeoutError`` swallowed. Also consolidate the duplicate ``[HOMEASSISTANT]`` per-powermeter throttle example in config.ini.example: keep the more informative ``THROTTLE_INTERVAL = 2`` block (with the "2-3 seconds due to network latency" note) and drop the stray ``= 1`` duplicate that predated the ``WAIT_FOR_NEXT_MESSAGE`` example. --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 30cc695 commit dc20442

14 files changed

Lines changed: 222 additions & 55 deletions

File tree

README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,20 @@ SKIP_POWERMETER_TEST = False
174174
# Set to 0 to disable throttling (default). Recommended: 1-3 seconds for slow data sources
175175
# Can be overridden per powermeter section
176176
THROTTLE_INTERVAL = 0
177+
# Briefly wait (up to 2s) for a fresh push from event-driven powermeters
178+
# (MQTT, Home Assistant, HomeWizard, SMA, ...) before responding to the
179+
# battery. Set to false to skip the wait and always serve the last-known
180+
# value — recommended when the underlying meter updates slower than 2s
181+
# (e.g. P1 smart meter behind Home Assistant) so that the inevitable timeout
182+
# doesn't add latency to every CT002 response. Default: true.
183+
# Can be overridden per powermeter section.
184+
#WAIT_FOR_NEXT_MESSAGE = true
177185
```
178186

179-
Per-powermeter options (e.g. in `[TASMOTA]`):
187+
Per-powermeter options (apply in any powermeter section, e.g. `[TASMOTA]` or `[HOMEASSISTANT]`):
180188
- **THROTTLE_INTERVAL** — Override global throttling for this powermeter
189+
- **WAIT_FOR_NEXT_MESSAGE** — Override the global wait-for-fresh-push behaviour
190+
for this powermeter (set to `false` to opt out of the wait entirely)
181191

182192
CT002/CT003 active-steering options (all under `[CT002]` or `[CT003]`):
183193
- **ACTIVE_CONTROL** — When true (default), the emulator smooths the grid reading, splits

config.ini.example

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@ SKIP_POWERMETER_TEST = False
1717
# Recommended values: 1-3 seconds for slower data sources
1818
# Can be overridden per powermeter section
1919
THROTTLE_INTERVAL = 0
20+
# Wait briefly (up to 2s) for a fresh push from event-driven powermeters
21+
# (MQTT, Home Assistant, HomeWizard, SMA energy meter, ...) before responding
22+
# to the battery. Keeps battery control loops on the freshest data.
23+
# Set to false to skip the wait entirely and always serve the last-known
24+
# value — recommended for sources that update slower than 2s (e.g. a P1 smart
25+
# meter behind Home Assistant) or whenever you'd rather have an instant
26+
# response than a fresh one. Defaults to true.
27+
# Can be overridden per powermeter section.
28+
#WAIT_FOR_NEXT_MESSAGE = true
2029

2130
#[CT002]
2231
## CT type is derived from the emulated device (ct002 -> HME-4, ct003 -> HME-3).
@@ -194,12 +203,15 @@ THROTTLE_INTERVAL = 0
194203
#POWER_INPUT_ALIAS = sensor.phase1_input, sensor.phase2_input, sensor.phase3_input
195204
#POWER_OUTPUT_ALIAS = sensor.phase1_output, sensor.phase2_output, sensor.phase3_output
196205

197-
## Per-powermeter throttling override (optional)
198-
#THROTTLE_INTERVAL = 1
199-
200206
## Per-powermeter throttling override (optional)
201207
## HomeAssistant typically needs 2-3 seconds due to network latency
202208
#THROTTLE_INTERVAL = 2
209+
## Skip the up-to-2s wait for a fresh push from this powermeter and always
210+
## serve the last-known value. Useful when the underlying sensor (e.g. P1
211+
## smart meter) updates slower than 2s, so the wait would always time out
212+
## and the cached value would be served anyway. Defaults to the global
213+
## [GENERAL] WAIT_FOR_NEXT_MESSAGE setting (true).
214+
#WAIT_FOR_NEXT_MESSAGE = false
203215

204216
#[VZLOGGER]
205217
#IP = 192.168.1.106

ha_addon/config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ options:
3434
power_output_alias: ""
3535
device_types: "shellypro3em"
3636
throttle_interval: 0
37+
wait_for_next_message: true
3738
marstek_auto_register_ct_device: false
3839
marstek_mailbox: ""
3940
marstek_password: ""
@@ -45,6 +46,7 @@ schema:
4546
power_output_alias: str?
4647
device_types: str
4748
throttle_interval: float(0,)
49+
wait_for_next_message: bool
4850
ct_mac: str?
4951
marstek_auto_register_ct_device: bool?
5052
marstek_mailbox: str?

ha_addon/run.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ else
126126
echo "PORT=80"
127127
echo "API_PATH_PREFIX=/core"
128128
echo "ACCESSTOKEN=$SUPERVISOR_TOKEN"
129+
echo "WAIT_FOR_NEXT_MESSAGE=$(bashio::config 'wait_for_next_message')"
129130
if bashio::config.has_value 'power_output_alias'; then
130131
echo "POWER_CALCULATE=True"
131132
echo "POWER_INPUT_ALIAS=$(bashio::config 'power_input_alias')"

ha_addon/translations/en.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ configuration:
1414
throttle_interval:
1515
name: Throttling Interval
1616
description: "Minimum time interval (in seconds) between power meter readings. When set to 0, throttling is disabled for maximum performance. For slower data sources or to prevent control instability, set to 1-3 seconds. Supports decimal values (e.g., 1.5 for 1.5 seconds). Increase this value if you're seeing oscillations or instability with slow data sources."
17+
wait_for_next_message:
18+
name: Wait For Fresh Home Assistant Reading
19+
description: "When enabled (default), the app waits briefly (up to 2 seconds) for Home Assistant to push a fresh value before responding to the battery, keeping control loops on the freshest data. Disable this if your source sensor (e.g. a P1 smart meter) updates slower than 2 seconds — turning it off avoids the needless wait and always serves the last-known value immediately."
1720
log_level:
1821
name: Log Level
1922
description: "Controls the verbosity of log messages. Select 'critical' for only critical errors, 'error' for error messages and above, 'warning' for warnings and above, 'info' for informational messages and above (recommended), or 'debug' for detailed debugging information. Use 'debug' for troubleshooting connection or configuration issues."

src/astrameter/config/config_loader.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,14 @@ def parse_float_list(value: str, key_name: str, section: str) -> list[float]:
145145

146146
def read_all_powermeter_configs(
147147
config: configparser.ConfigParser,
148-
) -> list[tuple[Powermeter, ClientFilter]]:
149-
powermeters = []
148+
) -> list[tuple[Powermeter, ClientFilter, bool]]:
149+
powermeters: list[tuple[Powermeter, ClientFilter, bool]] = []
150150
global_throttle_interval = config.getfloat(
151151
"GENERAL", "THROTTLE_INTERVAL", fallback=0.0
152152
)
153+
global_wait_for_next_message = config.getboolean(
154+
"GENERAL", "WAIT_FOR_NEXT_MESSAGE", fallback=True
155+
)
153156
global_pid_kp = config.getfloat("GENERAL", "PID_KP", fallback=0.0)
154157
global_pid_ki = config.getfloat("GENERAL", "PID_KI", fallback=0.0)
155158
global_pid_kd = config.getfloat("GENERAL", "PID_KD", fallback=0.0)
@@ -237,7 +240,10 @@ def read_all_powermeter_configs(
237240
)
238241

239242
client_filter = create_client_filter(section, config)
240-
powermeters.append((powermeter, client_filter))
243+
wait_for_next_message = config.getboolean(
244+
section, "WAIT_FOR_NEXT_MESSAGE", fallback=global_wait_for_next_message
245+
)
246+
powermeters.append((powermeter, client_filter, wait_for_next_message))
241247
return powermeters
242248

243249

src/astrameter/config/config_loader_test.py

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ def test_read_all_configs_with_power_transform():
583583

584584
powermeters = read_all_powermeter_configs(config)
585585
assert len(powermeters) == 1
586-
pm, _ = powermeters[0]
586+
pm, _, _ = powermeters[0]
587587
assert isinstance(pm, TransformedPowermeter)
588588
assert pm.offsets == [-50.0]
589589
assert pm.multipliers == [1.05]
@@ -600,7 +600,7 @@ def test_read_all_configs_with_per_phase_transform():
600600

601601
powermeters = read_all_powermeter_configs(config)
602602
assert len(powermeters) == 1
603-
pm, _ = powermeters[0]
603+
pm, _, _ = powermeters[0]
604604
assert isinstance(pm, TransformedPowermeter)
605605
assert pm.offsets == [-10.0, -20.0, -30.0]
606606
assert pm.multipliers == [1.05, 1.02, 1.03]
@@ -616,7 +616,7 @@ def test_read_all_configs_offset_only():
616616

617617
powermeters = read_all_powermeter_configs(config)
618618
assert len(powermeters) == 1
619-
pm, _ = powermeters[0]
619+
pm, _, _ = powermeters[0]
620620
assert isinstance(pm, TransformedPowermeter)
621621
assert pm.offsets == [10.0]
622622
assert pm.multipliers == [1.0]
@@ -632,7 +632,7 @@ def test_read_all_configs_zero_multiplier_accepted():
632632

633633
powermeters = read_all_powermeter_configs(config)
634634
assert len(powermeters) == 1
635-
pm, _ = powermeters[0]
635+
pm, _, _ = powermeters[0]
636636
assert isinstance(pm, TransformedPowermeter)
637637
assert pm.multipliers == [0.0]
638638

@@ -646,5 +646,42 @@ def test_read_all_configs_no_transform_when_not_configured():
646646

647647
powermeters = read_all_powermeter_configs(config)
648648
assert len(powermeters) == 1
649-
pm, _ = powermeters[0]
649+
pm, _, _ = powermeters[0]
650650
assert not isinstance(pm, TransformedPowermeter)
651+
652+
653+
def test_read_all_configs_wait_for_next_message_default_true():
654+
"""No config means waiting is enabled (preserves PR #322 behaviour)."""
655+
config = configparser.ConfigParser()
656+
config["SCRIPT_1"] = {"COMMAND": 'echo "100"'}
657+
powermeters = read_all_powermeter_configs(config)
658+
assert len(powermeters) == 1
659+
_, _, wait_for_next = powermeters[0]
660+
assert wait_for_next is True
661+
662+
663+
def test_read_all_configs_wait_for_next_message_global_off():
664+
"""[GENERAL] WAIT_FOR_NEXT_MESSAGE=false applies to every section."""
665+
config = configparser.ConfigParser()
666+
config["GENERAL"] = {"WAIT_FOR_NEXT_MESSAGE": "false"}
667+
config["SCRIPT_1"] = {"COMMAND": 'echo "100"'}
668+
config["SCRIPT_2"] = {"COMMAND": 'echo "200"'}
669+
powermeters = read_all_powermeter_configs(config)
670+
assert len(powermeters) == 2
671+
assert all(wait is False for _, _, wait in powermeters)
672+
673+
674+
def test_read_all_configs_wait_for_next_message_section_override():
675+
"""Per-section WAIT_FOR_NEXT_MESSAGE overrides the global default."""
676+
config = configparser.ConfigParser()
677+
config["GENERAL"] = {"WAIT_FOR_NEXT_MESSAGE": "true"}
678+
config["SCRIPT_1"] = {
679+
"COMMAND": 'echo "100"',
680+
"WAIT_FOR_NEXT_MESSAGE": "false",
681+
}
682+
config["SCRIPT_2"] = {"COMMAND": 'echo "200"'}
683+
powermeters = read_all_powermeter_configs(config)
684+
assert len(powermeters) == 2
685+
# Section order is preserved by configparser, so SCRIPT_1 (override=false)
686+
# is first and SCRIPT_2 (inherits global true) is second.
687+
assert [wait for _, _, wait in powermeters] == [False, True]

src/astrameter/main.py

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,43 @@ def get_ct_section(device_type: str, cfg: configparser.ConfigParser) -> str:
3333
return section
3434

3535

36+
async def read_ct_powermeter(
37+
addr: tuple[str, int],
38+
powermeters: list[tuple[Powermeter, ClientFilter, bool]],
39+
) -> list[float] | None:
40+
"""Pick the powermeter matching *addr* and return up to three phase values.
41+
42+
Optionally awaits a fresh push (with a 2 s cap) when the matched
43+
powermeter has ``WAIT_FOR_NEXT_MESSAGE`` enabled. A timeout there is
44+
swallowed so the cached value is still served — `update_readings`
45+
callers should never see a stale-meter `TimeoutError`.
46+
"""
47+
powermeter = None
48+
wait_for_next = False
49+
for pm, client_filter, wait_flag in powermeters:
50+
if client_filter.matches(addr[0]):
51+
powermeter = pm
52+
wait_for_next = wait_flag
53+
break
54+
if powermeter is None:
55+
logger.debug(f"No powermeter found for client {addr[0]}")
56+
return None
57+
if wait_for_next:
58+
try:
59+
await powermeter.wait_for_next_message(timeout=2)
60+
except TimeoutError:
61+
logger.debug(
62+
"Powermeter %s produced no fresh message within 2s; "
63+
"serving last known value",
64+
type(powermeter).__name__,
65+
)
66+
values = await powermeter.get_powermeter_watts()
67+
value1 = values[0] if len(values) > 0 else 0
68+
value2 = values[1] if len(values) > 1 else 0
69+
value3 = values[2] if len(values) > 2 else 0
70+
return [value1, value2, value3]
71+
72+
3673
async def test_powermeter(powermeter: Powermeter, client_filter: ClientFilter):
3774
"""Test powermeter configuration with minimal retry logic for edge cases."""
3875
max_retries = 3
@@ -70,7 +107,7 @@ async def run_device(
70107
device_type: str,
71108
cfg: configparser.ConfigParser,
72109
args: argparse.Namespace,
73-
powermeters: list[tuple[Powermeter, ClientFilter]],
110+
powermeters: list[tuple[Powermeter, ClientFilter, bool]],
74111
device_id: str | None = None,
75112
insights: MqttInsightsService | None = None,
76113
):
@@ -203,21 +240,7 @@ async def run_device(
203240
)
204241

205242
async def update_readings(addr, _fields=None, _consumer_id=None):
206-
powermeter = None
207-
for pm, client_filter in powermeters:
208-
if client_filter.matches(addr[0]):
209-
powermeter = pm
210-
break
211-
if powermeter is None:
212-
logger.debug(f"No powermeter found for client {addr[0]}")
213-
return None
214-
await powermeter.wait_for_next_message()
215-
values = await powermeter.get_powermeter_watts()
216-
value1 = values[0] if len(values) > 0 else 0
217-
value2 = values[1] if len(values) > 1 else 0
218-
value3 = values[2] if len(values) > 2 else 0
219-
220-
return [value1, value2, value3]
243+
return await read_ct_powermeter(addr, powermeters)
221244

222245
device.before_send = update_readings
223246

@@ -336,19 +359,19 @@ async def async_main(
336359
await web_server.stop()
337360
web_server = None
338361

339-
powermeters: list[tuple[Powermeter, ClientFilter]] = []
362+
powermeters: list[tuple[Powermeter, ClientFilter, bool]] = []
340363
insights: MqttInsightsService | None = None
341364

342365
try:
343366
# Create powermeters
344367
powermeters = read_all_powermeter_configs(cfg)
345368

346369
# Start powermeter lifecycle
347-
for pm, _ in powermeters:
370+
for pm, _, _ in powermeters:
348371
await pm.start()
349372

350373
if not skip_test:
351-
for powermeter, client_filter in powermeters:
374+
for powermeter, client_filter, _ in powermeters:
352375
await test_powermeter(powermeter, client_filter)
353376

354377
# MQTT Insights (optional)
@@ -379,7 +402,7 @@ async def async_main(
379402
logger.info("MQTT Insights service stopped")
380403
except Exception:
381404
logger.exception("Error stopping MQTT Insights service")
382-
for pm, _ in powermeters:
405+
for pm, _, _ in powermeters:
383406
try:
384407
await pm.stop()
385408
except Exception:

src/astrameter/main_test.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from ipaddress import IPv4Network
2+
3+
from astrameter.config.config_loader import ClientFilter
4+
from astrameter.main import read_ct_powermeter
5+
from astrameter.powermeter import Powermeter
6+
7+
8+
class _StubPowermeter(Powermeter):
9+
"""Minimal powermeter stub for testing ``read_ct_powermeter``."""
10+
11+
def __init__(
12+
self,
13+
values: list[float],
14+
wait_raises: BaseException | None = None,
15+
wait_calls: list[float] | None = None,
16+
):
17+
self._values = values
18+
self._wait_raises = wait_raises
19+
self._wait_calls = wait_calls if wait_calls is not None else []
20+
21+
async def get_powermeter_watts(self) -> list[float]:
22+
return list(self._values)
23+
24+
async def wait_for_next_message(self, timeout=5):
25+
self._wait_calls.append(timeout)
26+
if self._wait_raises is not None:
27+
raise self._wait_raises
28+
29+
30+
_LOCAL = ClientFilter([IPv4Network("127.0.0.1/32")])
31+
32+
33+
async def test_read_ct_powermeter_returns_none_when_no_match():
34+
pm = _StubPowermeter([10.0])
35+
powermeters = [(pm, _LOCAL, True)]
36+
assert await read_ct_powermeter(("10.0.0.1", 0), powermeters) is None
37+
38+
39+
async def test_read_ct_powermeter_pads_to_three_phases():
40+
pm = _StubPowermeter([42.0])
41+
powermeters = [(pm, _LOCAL, False)]
42+
assert await read_ct_powermeter(("127.0.0.1", 0), powermeters) == [42.0, 0, 0]
43+
44+
45+
async def test_read_ct_powermeter_skips_wait_when_disabled():
46+
pm = _StubPowermeter([1.0, 2.0, 3.0])
47+
powermeters = [(pm, _LOCAL, False)]
48+
result = await read_ct_powermeter(("127.0.0.1", 0), powermeters)
49+
assert result == [1.0, 2.0, 3.0]
50+
assert pm._wait_calls == []
51+
52+
53+
async def test_read_ct_powermeter_calls_wait_with_2s_when_enabled():
54+
pm = _StubPowermeter([1.0, 2.0, 3.0])
55+
powermeters = [(pm, _LOCAL, True)]
56+
await read_ct_powermeter(("127.0.0.1", 0), powermeters)
57+
assert pm._wait_calls == [2]
58+
59+
60+
async def test_read_ct_powermeter_swallows_timeout_and_serves_cached():
61+
"""Issue #327: a slow push meter must not break CT002 responses."""
62+
pm = _StubPowermeter(
63+
[11.0, 22.0, 33.0],
64+
wait_raises=TimeoutError("simulated slow meter"),
65+
)
66+
powermeters = [(pm, _LOCAL, True)]
67+
result = await read_ct_powermeter(("127.0.0.1", 0), powermeters)
68+
assert result == [11.0, 22.0, 33.0]

src/astrameter/powermeter/mqtt.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -175,16 +175,7 @@ async def wait_for_message(self, timeout=5):
175175

176176
async def wait_for_next_message(self, timeout=5):
177177
self._message_event.clear()
178-
loop = asyncio.get_running_loop()
179-
deadline = loop.time() + timeout
180-
while True:
181-
remaining = deadline - loop.time()
182-
if remaining <= 0:
183-
raise TimeoutError("Timeout waiting for MQTT message")
184-
try:
185-
await asyncio.wait_for(self._message_event.wait(), timeout=remaining)
186-
except asyncio.TimeoutError:
187-
raise TimeoutError("Timeout waiting for MQTT message") from None
188-
if all(v is not None for v in self.values):
189-
return
190-
self._message_event.clear()
178+
try:
179+
await asyncio.wait_for(self._message_event.wait(), timeout=timeout)
180+
except asyncio.TimeoutError:
181+
raise TimeoutError("Timeout waiting for MQTT message") from None

0 commit comments

Comments
 (0)