Skip to content

Commit da8c590

Browse files
committed
Offload Marstek poll handler so a slow powermeter can't stall listener
The MQTT Insights listener is a single async-for loop. When a Marstek poll handler awaited binding.get_values() inline it could block every subsequent message (other CT polls, Insights commands) for as long as the powermeter took to yield a reading. Spawn each response in its own task instead, and track/cancel those tasks on disconnect/shutdown. Also snapshot _marstek_bindings under the lock before scanning in _find_marstek_binding, and drop the type: ignore on the topic helpers by returning an explicit 2-tuple. Adds two integration tests: register-before-start populates subscriptions on first connect, and a slow handler on one binding doesn't block a concurrent fast poll on another binding. https://claude.ai/code/session_01K5ypPxYASWXJewf7Lk9a1e
1 parent 2a8b2ad commit da8c590

3 files changed

Lines changed: 132 additions & 10 deletions

File tree

src/astrameter/mqtt_insights/marstek_mqtt.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,18 @@ def parse_app_topic(topic: str) -> tuple[str, str] | None:
8080

8181

8282
def app_topics_for(binding: MarstekMqttBinding) -> tuple[str, str]:
83-
return tuple( # type: ignore[return-value]
84-
t.format(ct_type=binding.ct_type, mac=binding.mac) for t in APP_TOPIC_TEMPLATES
83+
old, new = APP_TOPIC_TEMPLATES
84+
return (
85+
old.format(ct_type=binding.ct_type, mac=binding.mac),
86+
new.format(ct_type=binding.ct_type, mac=binding.mac),
8587
)
8688

8789

8890
def device_topics_for(binding: MarstekMqttBinding) -> tuple[str, str]:
89-
return tuple( # type: ignore[return-value]
90-
t.format(ct_type=binding.ct_type, mac=binding.mac)
91-
for t in DEVICE_TOPIC_TEMPLATES
91+
old, new = DEVICE_TOPIC_TEMPLATES
92+
return (
93+
old.format(ct_type=binding.ct_type, mac=binding.mac),
94+
new.format(ct_type=binding.ct_type, mac=binding.mac),
9295
)
9396

9497

src/astrameter/mqtt_insights/mqtt_insights_test.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1058,7 +1058,99 @@ async def test_marstek_get_values_failure_suppressed(mqtt_broker):
10581058
await _collect_messages(client, received, timeout=1)
10591059
assert received == []
10601060
# get_values was called but no reply was published
1061+
await _poll(lambda: binding.device_id in service._marstek_get_values_failed)
10611062
assert calls == [("called",)]
1062-
assert binding.device_id in service._marstek_get_values_failed
10631063
finally:
10641064
await service.stop()
1065+
1066+
1067+
@needs_mosquitto
1068+
async def test_marstek_register_before_start_subscribes_on_connect(mqtt_broker):
1069+
"""A binding registered before start() must get its App topics
1070+
subscribed on the first connect."""
1071+
port = mqtt_broker
1072+
service = _make_service(port)
1073+
binding, _ = _make_binding()
1074+
# Register *before* start — the service must pick this up on connect.
1075+
await service.register_marstek(binding)
1076+
await service.start()
1077+
1078+
try:
1079+
await service.wait_connected()
1080+
await _poll(lambda: service._client is not None)
1081+
1082+
async with aiomqtt.Client(hostname="127.0.0.1", port=port) as client:
1083+
await client.subscribe(f"hame_energy/HME-4/device/{binding.mac}/ctrl")
1084+
await client.publish(
1085+
f"hame_energy/HME-4/App/{binding.mac}/ctrl", payload=b"cd=1"
1086+
)
1087+
received = []
1088+
await _collect_messages(
1089+
client, received, timeout=5, stop=lambda _: len(received) >= 1
1090+
)
1091+
assert len(received) == 1
1092+
assert received[0].payload.startswith(
1093+
b"pwr_a=100,pwr_b=200,pwr_c=300,pwr_t=600"
1094+
)
1095+
finally:
1096+
await service.stop()
1097+
1098+
1099+
@needs_mosquitto
1100+
async def test_marstek_slow_handler_does_not_stall_listener(mqtt_broker):
1101+
"""A slow get_values for one binding must not block polls for another.
1102+
1103+
With the offload-to-task design, the listener stays responsive even
1104+
while a prior poll handler is still awaiting its powermeter.
1105+
"""
1106+
port = mqtt_broker
1107+
service = _make_service(port)
1108+
1109+
slow_gate = asyncio.Event()
1110+
1111+
async def _slow_values() -> list[float]:
1112+
# Block until the test explicitly releases this handler.
1113+
await slow_gate.wait()
1114+
return [1.0, 2.0, 3.0]
1115+
1116+
slow = MarstekMqttBinding(
1117+
device_id="slow-ct",
1118+
ct_type="HME-4",
1119+
mac="02b250111111",
1120+
get_values=_slow_values,
1121+
wifi_rssi=-50,
1122+
)
1123+
fast, _ = _make_binding(
1124+
device_id="fast-ct", mac="02b250222222", values=[10.0, 20.0, 30.0]
1125+
)
1126+
1127+
await service.register_marstek(slow)
1128+
await service.register_marstek(fast)
1129+
await service.start()
1130+
1131+
try:
1132+
await service.wait_connected()
1133+
await _poll(lambda: service._client is not None)
1134+
1135+
async with aiomqtt.Client(hostname="127.0.0.1", port=port) as client:
1136+
await client.subscribe(f"hame_energy/HME-4/device/{fast.mac}/ctrl")
1137+
1138+
# Trigger the slow poll first — its handler will block in get_values.
1139+
await client.publish(
1140+
f"hame_energy/HME-4/App/{slow.mac}/ctrl", payload=b"cd=1"
1141+
)
1142+
# Immediately trigger the fast poll — if the listener were
1143+
# stalled, we'd never see its reply.
1144+
await client.publish(
1145+
f"hame_energy/HME-4/App/{fast.mac}/ctrl", payload=b"cd=1"
1146+
)
1147+
received = []
1148+
await _collect_messages(
1149+
client, received, timeout=5, stop=lambda _: len(received) >= 1
1150+
)
1151+
1152+
assert len(received) == 1
1153+
assert received[0].payload.startswith(b"pwr_a=10,pwr_b=20,pwr_c=30,pwr_t=60")
1154+
finally:
1155+
slow_gate.set()
1156+
await service.stop()

src/astrameter/mqtt_insights/service.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ def __init__(self, config: MqttInsightsConfig) -> None:
104104
# Rate-limit per-device get_values failure logging so a broken
105105
# powermeter doesn't flood the log at hm2mqtt's poll cadence.
106106
self._marstek_get_values_failed: set[str] = set()
107+
# In-flight poll handlers — tracked so one slow powermeter doesn't
108+
# block the listener loop, and so we can cancel pending tasks on
109+
# reconnect / shutdown.
110+
self._marstek_tasks: set[asyncio.Task[None]] = set()
107111

108112
# ── Public API (called from device event listeners) ───────────────
109113

@@ -293,6 +297,7 @@ async def _run(self) -> None:
293297
finally:
294298
async with self._marstek_lock:
295299
self._client = None
300+
await self._cancel_marstek_tasks()
296301

297302
except asyncio.CancelledError:
298303
self._connected.clear()
@@ -687,12 +692,14 @@ def _handle_device_command(self, device_id: str, cmd: dict) -> None:
687692
async def _handle_marstek_message(
688693
self, client: aiomqtt.Client, message: aiomqtt.Message
689694
) -> None:
695+
"""Dispatch a poll quickly; offload the response to a task so a
696+
slow powermeter can't stall the listener loop."""
690697
topic = str(message.topic)
691698
parsed = parse_app_topic(topic)
692699
if parsed is None:
693700
return
694701
ct_type, mac = parsed
695-
binding = self._find_marstek_binding(ct_type, mac)
702+
binding = await self._find_marstek_binding(ct_type, mac)
696703
if binding is None:
697704
logger.debug("Marstek MQTT: no binding for %s/%s", ct_type, mac)
698705
return
@@ -702,6 +709,13 @@ async def _handle_marstek_message(
702709
logger.debug("Marstek MQTT: non-poll payload on %s", topic)
703710
return
704711

712+
task = asyncio.create_task(self._serve_marstek_poll(client, binding))
713+
self._marstek_tasks.add(task)
714+
task.add_done_callback(self._marstek_tasks.discard)
715+
716+
async def _serve_marstek_poll(
717+
self, client: aiomqtt.Client, binding: MarstekMqttBinding
718+
) -> None:
705719
try:
706720
watts = await binding.get_values()
707721
except Exception:
@@ -722,10 +736,23 @@ async def _handle_marstek_message(
722736
with contextlib.suppress(aiomqtt.MqttError):
723737
await client.publish(reply_topic, payload=payload, qos=0, retain=False)
724738

725-
def _find_marstek_binding(
739+
async def _cancel_marstek_tasks(self) -> None:
740+
pending = tuple(self._marstek_tasks)
741+
for task in pending:
742+
task.cancel()
743+
if pending:
744+
await asyncio.gather(*pending, return_exceptions=True)
745+
self._marstek_tasks.clear()
746+
747+
async def _find_marstek_binding(
726748
self, ct_type: str, mac: str
727749
) -> MarstekMqttBinding | None:
728-
for binding in self._marstek_bindings.values():
729-
if binding.ct_type == ct_type and binding.mac == mac.lower():
750+
# Snapshot under the lock so a concurrent (un)register can't mutate
751+
# the dict mid-scan.
752+
async with self._marstek_lock:
753+
candidates = tuple(self._marstek_bindings.values())
754+
mac_lower = mac.lower()
755+
for binding in candidates:
756+
if binding.ct_type == ct_type and binding.mac == mac_lower:
730757
return binding
731758
return None

0 commit comments

Comments
 (0)