1- """The Harvester integration."""
1+ """The Harvst WaterMate integration."""
22
33from __future__ import annotations
44
5+ import asyncio
6+ import logging
7+ from contextlib import suppress
8+ from enum import Enum
9+ from typing import Any
10+
511from homeassistant .config_entries import ConfigEntry
6- from homeassistant .const import Platform
12+ from homeassistant .const import CONF_HOST , Platform
713from homeassistant .core import HomeAssistant
14+ from homeassistant .exceptions import ConfigEntryAuthFailed , ConfigEntryNotReady
15+ from homeassistant .helpers .aiohttp_client import async_get_clientsession
16+ from homeassistant .helpers .update_coordinator import DataUpdateCoordinator , UpdateFailed
17+
18+ from .api import (
19+ HarvstWatermateApiClient ,
20+ HarvstWatermateApiClientAuthenticationError ,
21+ HarvstWatermateApiClientCommunicationError ,
22+ HarvstWatermateApiClientError ,
23+ SSEMessage ,
24+ )
25+ from .const import DEFAULT_NAME , DOMAIN
826
9- from . const import DOMAIN
27+ _LOGGER = logging . getLogger ( __name__ )
1028
11- # TODO List the platforms that you want to support.
12- # For your initial PR, limit it to 1 platform.
1329PLATFORMS : list [Platform ] = [Platform .BINARY_SENSOR , Platform .SWITCH , Platform .SENSOR ]
1430
1531
32+ class _ListenerState (Enum ):
33+ IDLE = "idle"
34+ HEALTHY = "healthy"
35+ ERROR = "error"
36+ STOPPED = "stopped"
37+
38+
39+ class _ReconnectBackoff :
40+ """Simple exponential backoff helper respecting SSE retry hints."""
41+
42+ def __init__ (self , * , base : float = 1.0 , factor : float = 2.0 , maximum : float = 60.0 ) -> None :
43+ self ._base = base
44+ self ._factor = factor
45+ self ._maximum = maximum
46+ self ._attempt = 0
47+ self ._override : float | None = None
48+
49+ def reset (self ) -> None :
50+ self ._attempt = 0
51+
52+ def apply_retry_hint (self , retry_ms : int | None ) -> None :
53+ if retry_ms is None :
54+ return
55+ self ._override = max (retry_ms / 1000.0 , self ._base )
56+
57+ def next_delay (self ) -> float :
58+ if self ._override is not None :
59+ delay = self ._override
60+ self ._override = None
61+ self ._attempt = 0
62+ return delay
63+ delay = min (self ._base * (self ._factor ** self ._attempt ), self ._maximum )
64+ self ._attempt += 1
65+ return delay
66+
67+
1668async def async_setup_entry (hass : HomeAssistant , entry : ConfigEntry ) -> bool :
1769 """Set up Harvester from a config entry."""
18-
1970 hass .data .setdefault (DOMAIN , {})
20- # TODO 1. Create API instance
21- # TODO 2. Validate the API connection (and authentication)
22- # TODO 3. Store an API object for your platforms to access
23- # hass.data[DOMAIN][entry.entry_id] = MyApi(...)
71+
72+ session = async_get_clientsession (hass )
73+ api_client = HarvstWatermateApiClient (entry .data [CONF_HOST ], session )
74+
75+ coordinator = HarvstWatermateDataUpdateCoordinator (hass , api_client )
76+
77+ _LOGGER .info (
78+ "Setting up Harvst WaterMate entry %s for host %s" ,
79+ entry .entry_id ,
80+ entry .data [CONF_HOST ],
81+ )
82+
83+ try :
84+ await coordinator .async_config_entry_first_refresh ()
85+ except HarvstWatermateApiClientAuthenticationError as err :
86+ await coordinator .async_shutdown ()
87+ raise ConfigEntryAuthFailed (str (err )) from err
88+ except HarvstWatermateApiClientCommunicationError as err :
89+ await coordinator .async_shutdown ()
90+ raise ConfigEntryNotReady (err ) from err
91+ except HarvstWatermateApiClientError as err :
92+ await coordinator .async_shutdown ()
93+ raise ConfigEntryNotReady (err ) from err
94+
95+ hass .data [DOMAIN ][entry .entry_id ] = {
96+ "coordinator" : coordinator ,
97+ "api" : api_client ,
98+ }
2499
25100 await hass .config_entries .async_forward_entry_setups (entry , PLATFORMS )
26101
@@ -29,7 +104,166 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
29104
30105async def async_unload_entry (hass : HomeAssistant , entry : ConfigEntry ) -> bool :
31106 """Unload a config entry."""
107+ data = hass .data [DOMAIN ].get (entry .entry_id )
108+ coordinator : HarvstWatermateDataUpdateCoordinator | None = None
109+ if data :
110+ coordinator = data .get ("coordinator" )
111+
32112 if unload_ok := await hass .config_entries .async_unload_platforms (entry , PLATFORMS ):
113+ if coordinator :
114+ await coordinator .async_shutdown ()
115+ _LOGGER .info ("Unloaded Harvst WaterMate entry %s" , entry .entry_id )
33116 hass .data [DOMAIN ].pop (entry .entry_id )
34117
35118 return unload_ok
119+
120+
121+ class HarvstWatermateDataUpdateCoordinator (DataUpdateCoordinator [dict [str , Any ]]):
122+ """Coordinate push updates from the WaterMate device."""
123+
124+ _REFRESH_TIMEOUT = 30
125+
126+ def __init__ (self , hass : HomeAssistant , api_client : HarvstWatermateApiClient ) -> None :
127+ """Initialize the coordinator."""
128+ super ().__init__ (
129+ hass ,
130+ _LOGGER ,
131+ name = DEFAULT_NAME ,
132+ update_interval = None ,
133+ )
134+ self .api_client = api_client
135+ self ._listener_task : asyncio .Task [None ] | None = None
136+ self ._stop_event = asyncio .Event ()
137+ self ._startup_future : asyncio .Future [None ] = hass .loop .create_future ()
138+ self ._listener_state = _ListenerState .IDLE
139+ self ._backoff = _ReconnectBackoff ()
140+ self ._refresh_waiter : asyncio .Future [dict [str , Any ]] | None = None
141+
142+ async def async_config_entry_first_refresh (self ) -> None :
143+ """Start the listener and wait for the first payload."""
144+ self ._ensure_listener_running ()
145+ try :
146+ await self ._startup_future
147+ except HarvstWatermateApiClientAuthenticationError as err :
148+ raise ConfigEntryAuthFailed (str (err )) from err
149+ except HarvstWatermateApiClientError as err :
150+ raise ConfigEntryNotReady (err ) from err
151+
152+ def _ensure_listener_running (self ) -> None :
153+ if self ._listener_task and not self ._listener_task .done ():
154+ return
155+ if self ._listener_task and self ._listener_task .done ():
156+ self ._listener_task = None
157+ self ._stop_event .clear ()
158+ self ._listener_task = self .hass .loop .create_task (
159+ self ._run_listener (),
160+ name = "harvst_watermate_sse_listener" ,
161+ )
162+
163+ async def _run_listener (self ) -> None :
164+ retry_backoff = self ._backoff
165+
166+ async def _handle_message (message : SSEMessage ) -> None :
167+ retry_backoff .apply_retry_hint (message .retry )
168+ if message .payload is None :
169+ return
170+ retry_backoff .reset ()
171+ if self ._listener_state is not _ListenerState .HEALTHY :
172+ self ._update_listener_state (_ListenerState .HEALTHY )
173+ await self .async_set_updated_data (message .payload )
174+ if not self ._startup_future .done ():
175+ self ._startup_future .set_result (None )
176+ if self ._refresh_waiter and not self ._refresh_waiter .done ():
177+ self ._refresh_waiter .set_result (message .payload )
178+
179+ while not self ._stop_event .is_set ():
180+ try :
181+ await self .api_client .async_listen_events (_handle_message , stop_event = self ._stop_event )
182+ if self ._stop_event .is_set ():
183+ break
184+ raise HarvstWatermateApiClientCommunicationError ("Events stream closed unexpectedly" )
185+ except HarvstWatermateApiClientAuthenticationError as err :
186+ self ._handle_listener_error (err , fatal = True )
187+ return
188+ except HarvstWatermateApiClientError as err :
189+ self ._handle_listener_error (err , fatal = False )
190+ except asyncio .CancelledError :
191+ break
192+
193+ if self ._stop_event .is_set ():
194+ break
195+
196+ delay = retry_backoff .next_delay ()
197+ try :
198+ await asyncio .wait_for (self ._stop_event .wait (), delay )
199+ except asyncio .TimeoutError :
200+ continue
201+ self ._update_listener_state (_ListenerState .STOPPED )
202+
203+ def _handle_listener_error (
204+ self ,
205+ err : HarvstWatermateApiClientError ,
206+ * ,
207+ fatal : bool ,
208+ ) -> None :
209+ if not self ._startup_future .done ():
210+ self ._startup_future .set_exception (err )
211+ if self ._refresh_waiter and not self ._refresh_waiter .done ():
212+ self ._refresh_waiter .set_exception (err )
213+ self ._update_listener_state (_ListenerState .ERROR , err = err )
214+ if fatal :
215+ self ._stop_event .set ()
216+
217+ def _update_listener_state (
218+ self ,
219+ new_state : _ListenerState ,
220+ * ,
221+ err : Exception | None = None ,
222+ ) -> None :
223+ previous = self ._listener_state
224+ if previous == new_state :
225+ return
226+ self ._listener_state = new_state
227+
228+ if new_state == _ListenerState .HEALTHY :
229+ if previous == _ListenerState .ERROR :
230+ _LOGGER .info ("WaterMate events listener recovered" )
231+ elif previous == _ListenerState .IDLE :
232+ _LOGGER .debug ("WaterMate events listener established connection" )
233+ elif new_state == _ListenerState .ERROR :
234+ if previous == _ListenerState .HEALTHY :
235+ detail = f": { err } " if err else ""
236+ _LOGGER .warning ("WaterMate events listener entered error state%s" , detail )
237+ elif new_state == _ListenerState .STOPPED :
238+ _LOGGER .debug ("WaterMate events listener stopped" )
239+
240+ async def _async_update_data (self ) -> dict [str , Any ]:
241+ """Allow manual refresh calls to await the next push update."""
242+ self ._ensure_listener_running ()
243+ if self ._refresh_waiter and not self ._refresh_waiter .done ():
244+ waiter = self ._refresh_waiter
245+ else :
246+ waiter = self .hass .loop .create_future ()
247+ self ._refresh_waiter = waiter
248+
249+ try :
250+ return await asyncio .wait_for (waiter , timeout = self ._REFRESH_TIMEOUT )
251+ except asyncio .TimeoutError as err :
252+ if self ._refresh_waiter is waiter and not waiter .done ():
253+ waiter .cancel ()
254+ self ._refresh_waiter = None
255+ raise UpdateFailed ("Timed out waiting for WaterMate push update" ) from err
256+ finally :
257+ if self ._refresh_waiter is waiter and waiter .done ():
258+ self ._refresh_waiter = None
259+
260+ async def async_shutdown (self ) -> None :
261+ """Stop the listener and clean up resources."""
262+ self ._stop_event .set ()
263+ if not self ._listener_task :
264+ return
265+ listener = self ._listener_task
266+ self ._listener_task = None
267+ listener .cancel ()
268+ with suppress (asyncio .CancelledError ):
269+ await listener
0 commit comments