-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcoordinator.py
More file actions
193 lines (167 loc) · 7.94 KB
/
coordinator.py
File metadata and controls
193 lines (167 loc) · 7.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
"""Data update coordinator for EVSEMaster integration."""
from __future__ import annotations
from datetime import timedelta,datetime
import logging
from typing import Any
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_HOST, CONF_PASSWORD
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.exceptions import HomeAssistantError
from .const import DOMAIN
from .evse_loader import evse_protocol, data_types
# Import specific classes from the modules
SimpleEVSEProtocol = evse_protocol.SimpleEVSEProtocol
EvseStatus = data_types.EvseStatus
ChargingStatus = data_types.ChargingStatus
BaseSchema = data_types.BaseSchema
EvseDeviceInfo = data_types.EvseDeviceInfo
now_aware = data_types.now_aware
_LOGGER = logging.getLogger(__name__)
class DeviceSchema(EvseDeviceInfo):
def get_attr_device_info(self) -> dict[str, Any]:
"""Return device info for Home Assistant."""
return {
"identifiers": {(DOMAIN, self.serial_number)},
"name": self.nickname if self.nickname else self.model,
"manufacturer": self.brand,
"model": self.model,
"serial_number": self.serial_number,
"hw_version": self.hardware_version,
}
class DataSchema(BaseSchema):
"""Schema for EVSE data."""
status: EvseStatus | None = None
charging_status: ChargingStatus | None = None
device: DeviceSchema = DeviceSchema()
class EVSEMasterDataUpdateCoordinator(DataUpdateCoordinator):
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
config_entry=entry,
update_interval=timedelta(seconds=60),
update_method=self._async_update_data,
always_update=True,
)
self.entry = entry
self.host = entry.data[CONF_HOST]
self.password = entry.data[CONF_PASSWORD]
self._connected = False
self.data: DataSchema = DataSchema()
self.secondary_timer = datetime.utcnow()
self.proto = SimpleEVSEProtocol(
host=self.host,
password=self.password,
event_callback=self._on_protocol_event,
)
def _ensure_serial(self) -> tuple[str, DataSchema]:
"""Ensure the serial number is set in the data schema."""
proto_device = self.proto.get_latest_device_info()
if proto_device and proto_device.serial_number != self.data.device.serial_number:
self.data.device = DeviceSchema.model_validate(proto_device.model_dump())
def _on_protocol_event(self, event_type: str, payload: Any) -> None:
"""Receive local-push events from protocol and push to HA."""
async def _handle() -> None:
self._ensure_serial()
changed = False
if event_type == EvseStatus.__name__ and isinstance(payload, EvseStatus):
self.data.status = payload
changed = True
elif event_type == ChargingStatus.__name__ and isinstance(payload, ChargingStatus):
self.data.charging_status = payload
changed = True
elif event_type == EvseDeviceInfo.__name__ and isinstance(payload, EvseDeviceInfo):
self.data.device = DeviceSchema.model_validate(payload.model_dump())
changed = True
if changed:
self.async_set_updated_data(self.data)
self.hass.async_create_task(_handle())
async def _async_update_data(self) -> dict[str, Any]:
"""Ensure connection and login; return latest cached snapshot."""
try:
if not self._connected:
ok = await self.proto.connect()
if not ok:
raise UpdateFailed("Failed to create sockets to connect to EVSE")
self._connected = True
_LOGGER.info("Connected to EVSE on %s", self.host)
if not self.proto.is_logged_in:
success = await self.proto.login()
if not success:
raise UpdateFailed("Failed to login to EVSE")
_LOGGER.info("Logged in to EVSE")
# data is pushed via callback; just request an update
await self.proto.request_status()
# every x minutes request full device info to catch changes
if (self.secondary_timer + timedelta(minutes=30) < datetime.utcnow()):
success = await self.proto.request_essentials()
if not success:
_LOGGER.warning("Failed to refresh device info from EVSE")
else:
_LOGGER.info("Refreshed device info from EVSE")
self._ensure_serial()
return self.data
except Exception as err:
_LOGGER.error("Error updating EVSE data: %s", err)
raise UpdateFailed(f"Error communicating with EVSE: {err}") from err
async def async_shutdown(self) -> None:
await self.proto.disconnect()
self._connected = False
_LOGGER.info("EVSE client disconnected")
async def async_start_charging(
self,
max_amps: int | None = None,
start_datetime: datetime | str | None = None,
duration_hours: float | None = None,
) -> bool:
"""Start charging with advanced parameters."""
try:
minutes = None
if duration_hours is not None:
minutes = int(duration_hours * 60)
if isinstance(start_datetime, str):
start_datetime = datetime.fromisoformat(start_datetime)
if start_datetime.tzinfo is None:
start_datetime = start_datetime.replace(tzinfo=datetime.now().astimezone().tzinfo)
if start_datetime and start_datetime > now_aware() + timedelta(hours=24):
raise ValueError("Reservation cannot be scheduled more than 24 hours in the future")
if max_amps is not None:
if max_amps > self.data.device.max_amps:
raise ValueError(
f"Requested max_amps {max_amps} exceeds device hardware limit of {self.data.device.max_amps} A"
)
if max_amps > self.data.device.configured_max_amps:
_LOGGER.warning(
"Requested max_amps %d exceeds configured max %d, clamping",
max_amps, self.data.device.configured_max_amps,
)
max_amps = self.data.device.configured_max_amps
_LOGGER.info(
f"Starting charging on {self.data.device.serial_number}: amps={max_amps}, duration={minutes}m, start={start_datetime}"
)
return await self.proto.start_charging(max_amps, start_datetime, minutes)
except Exception as err:
_LOGGER.error("Error starting charging on %s: %s", self.data.device.serial_number, err)
raise HomeAssistantError(str(err))
async def async_stop_charging(self) -> bool:
try:
return await self.proto.stop_charging()
except Exception as err:
_LOGGER.error("Error stopping charging on %s: %s", self.data.device.serial_number, err)
return False
async def async_set_nickname(self, nickname: str) -> bool:
"""Set device nickname."""
try:
return await self.proto.set_nickname(nickname)
except Exception as err:
_LOGGER.error("Error setting nickname on %s: %s", self.data.device.serial_number, err)
return False
async def async_set_max_amps(self, amperage: int) -> bool:
"""Set maximum output amperage."""
try:
return await self.proto.set_output_amperage(amperage)
except Exception as err:
_LOGGER.error("Error setting max amperage on %s: %s", self.data.device.serial_number, err)
return False