Skip to content

Commit 86dadd4

Browse files
authored
Speed up coordinator refresh with parallel fetches and landscape lock (#401)
Periodic refreshes were taking ~2s because independent endpoints were awaited sequentially and parallel per-zone landscape lookups each fired a duplicate HTTP request to the same device-level endpoint. - Gather `devices` and `timer_programs` concurrently in the coordinator - Fan out per-device history/landscape bundles across all devices in parallel (previously the outer device loop was sequential) - Add a per-device `asyncio.Lock` around `_refresh_landscapes` so concurrent callers share one HTTP request instead of duplicating N
1 parent 20180b9 commit 86dadd4

3 files changed

Lines changed: 112 additions & 64 deletions

File tree

custom_components/bhyve/coordinator.py

Lines changed: 56 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from homeassistant.core import HomeAssistant
3131

3232
from .pybhyve import BHyveClient
33-
from .pybhyve.typings import BHyveDevice, BHyveTimerProgram
33+
from .pybhyve.typings import BHyveDevice
3434

3535
_LOGGER = logging.getLogger(__name__)
3636

@@ -58,63 +58,34 @@ def __init__(
5858
async def _async_update_data(self) -> dict[str, Any]:
5959
"""Fetch data from API (periodic polling)."""
6060
try:
61-
# Fetch devices
62-
devices: list[BHyveDevice] = await self.client.devices
61+
# devices and timer_programs are independent endpoints; fetch in
62+
# parallel so the refresh is bounded by the slower of the two.
63+
devices, programs = await asyncio.gather(
64+
self.client.devices,
65+
self.client.timer_programs,
66+
)
67+
68+
# Fan out per-device history/landscape fetches across all devices in
69+
# parallel. Previously this loop was sequential across devices, so
70+
# multi-device accounts paid N times the per-device roundtrip.
71+
device_results = await asyncio.gather(
72+
*[self._fetch_device_bundle(device) for device in devices],
73+
return_exceptions=True,
74+
)
6375

64-
# Build data structure
6576
data: dict[str, Any] = {
6677
"devices": {},
6778
"programs": {},
6879
}
6980

70-
# Process each device - fetch history and landscapes in parallel per device
71-
for device in devices:
72-
device_id = device.get("id")
73-
device_type = device.get("type")
74-
if not device_id:
81+
for result in device_results:
82+
if isinstance(result, BaseException):
83+
_LOGGER.debug("Error fetching device bundle: %s", result)
7584
continue
85+
if result is None:
86+
continue
87+
data["devices"][result["device"]["id"]] = result
7688

77-
# Only fetch history and landscapes for sprinkler_timer devices
78-
# Flood sensors and other device types don't have these features
79-
if device_type == DEVICE_SPRINKLER:
80-
# Fetch history and landscapes in parallel for this device
81-
history_task = self._fetch_device_history(device_id)
82-
landscapes_task = self._fetch_landscapes(device_id, device)
83-
84-
history, landscapes = await asyncio.gather(
85-
history_task,
86-
landscapes_task,
87-
return_exceptions=True,
88-
)
89-
90-
# Handle exceptions from parallel tasks
91-
if isinstance(history, Exception):
92-
_LOGGER.debug(
93-
"Error fetching history for device %s: %s",
94-
device_id,
95-
history,
96-
)
97-
history = []
98-
if isinstance(landscapes, Exception):
99-
_LOGGER.debug(
100-
"Error fetching landscapes for device %s: %s",
101-
device_id,
102-
landscapes,
103-
)
104-
landscapes = {}
105-
else:
106-
# Non-sprinkler devices don't have history or landscapes
107-
history = []
108-
landscapes = {}
109-
110-
data["devices"][device_id] = {
111-
"device": device,
112-
"history": history,
113-
"landscapes": landscapes,
114-
}
115-
116-
# Fetch programs
117-
programs: list[BHyveTimerProgram] = await self.client.timer_programs
11889
for program in programs:
11990
program_id = program.get("id")
12091
if program_id:
@@ -129,6 +100,41 @@ async def _async_update_data(self) -> dict[str, Any]:
129100
msg = f"Error communicating with API: {err}"
130101
raise UpdateFailed(msg) from err
131102

103+
async def _fetch_device_bundle(self, device: BHyveDevice) -> dict[str, Any] | None:
104+
"""Fetch history + landscapes for a single device."""
105+
device_id = device.get("id")
106+
if not device_id:
107+
return None
108+
109+
if device.get("type") == DEVICE_SPRINKLER:
110+
history, landscapes = await asyncio.gather(
111+
self._fetch_device_history(device_id),
112+
self._fetch_landscapes(device_id, device),
113+
return_exceptions=True,
114+
)
115+
116+
if isinstance(history, Exception):
117+
_LOGGER.debug(
118+
"Error fetching history for device %s: %s", device_id, history
119+
)
120+
history = []
121+
if isinstance(landscapes, Exception):
122+
_LOGGER.debug(
123+
"Error fetching landscapes for device %s: %s",
124+
device_id,
125+
landscapes,
126+
)
127+
landscapes = {}
128+
else:
129+
history = []
130+
landscapes = {}
131+
132+
return {
133+
"device": device,
134+
"history": history,
135+
"landscapes": landscapes,
136+
}
137+
132138
async def _fetch_device_history(self, device_id: str) -> list[dict[str, Any]]:
133139
"""Fetch watering history for a device."""
134140
try:

custom_components/bhyve/pybhyve/client.py

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Define an object to interact with the REST API."""
22

3+
import asyncio
34
import logging
45
import re
56
import time
@@ -50,6 +51,7 @@ def __init__(self, username: str, password: str, session: ClientSession) -> None
5051

5152
self._landscapes: dict[str, list[BHyveZoneLandscape]] = {}
5253
self._last_poll_landscapes: dict[str, float] = {}
54+
self._landscape_locks: dict[str, asyncio.Lock] = {}
5355

5456
async def _request(
5557
self,
@@ -149,20 +151,29 @@ async def _refresh_device_history(
149151
async def _refresh_landscapes(
150152
self, device_id: str, *, force_update: bool = False
151153
) -> None:
152-
now = time.time()
153-
if force_update:
154-
_LOGGER.debug("Forcing landscape refresh %s", device_id)
155-
elif now - self._last_poll_landscapes.get(device_id, 0) < API_POLL_PERIOD:
156-
return
157-
158-
device_landscapes = await self._request(
159-
"get",
160-
f"{LANDSCAPE_DESCRIPTIONS_PATH}/{device_id}",
161-
params={"t": str(time.time())},
162-
)
163-
164-
self._landscapes[device_id] = device_landscapes or []
165-
self._last_poll_landscapes[device_id] = now
154+
# Single-flight: concurrent callers for the same device share one HTTP
155+
# request. Without this, a coordinator refresh that fans out per-zone
156+
# landscape lookups for N zones fires N duplicate requests.
157+
lock = self._landscape_locks.get(device_id)
158+
if lock is None:
159+
lock = asyncio.Lock()
160+
self._landscape_locks[device_id] = lock
161+
162+
async with lock:
163+
now = time.time()
164+
if force_update:
165+
_LOGGER.debug("Forcing landscape refresh %s", device_id)
166+
elif now - self._last_poll_landscapes.get(device_id, 0) < API_POLL_PERIOD:
167+
return
168+
169+
device_landscapes = await self._request(
170+
"get",
171+
f"{LANDSCAPE_DESCRIPTIONS_PATH}/{device_id}",
172+
params={"t": str(time.time())},
173+
)
174+
175+
self._landscapes[device_id] = device_landscapes or []
176+
self._last_poll_landscapes[device_id] = now
166177

167178
async def _async_ws_handler(self, async_callback: Callable, data: Any) -> None:
168179
"""Process incoming websocket message."""

tests/test_client.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Test BHyve pybhyve client."""
22

3+
import asyncio
34
from unittest.mock import AsyncMock, MagicMock
45

56
import pytest
@@ -106,3 +107,33 @@ async def test_rate_limits_same_device(self, client: BHyveClient) -> None:
106107
await client.get_landscape("device-a", 1)
107108

108109
assert client._request.await_count == 1
110+
111+
async def test_concurrent_calls_dedupe_to_single_request(
112+
self, client: BHyveClient
113+
) -> None:
114+
"""
115+
Concurrent get_landscape calls for the same device share one HTTP request.
116+
117+
The coordinator fans out per-zone landscape lookups in parallel. Without
118+
single-flight locking, N zones trigger N duplicate requests to the same
119+
endpoint.
120+
"""
121+
call_started = asyncio.Event()
122+
release = asyncio.Event()
123+
124+
async def slow_request(*_args: object, **_kwargs: object) -> list:
125+
call_started.set()
126+
await release.wait()
127+
return [{"station": 1}, {"station": 2}, {"station": 3}]
128+
129+
client._request = AsyncMock(side_effect=slow_request)
130+
131+
tasks = [
132+
asyncio.create_task(client.get_landscape("device-a", zone_id))
133+
for zone_id in (1, 2, 3)
134+
]
135+
await call_started.wait()
136+
release.set()
137+
await asyncio.gather(*tasks)
138+
139+
assert client._request.await_count == 1

0 commit comments

Comments
 (0)