-
-
Notifications
You must be signed in to change notification settings - Fork 37.5k
Expand file tree
/
Copy pathautomation.py
More file actions
347 lines (286 loc) · 11.8 KB
/
automation.py
File metadata and controls
347 lines (286 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
"""Automation related helper methods for the Websocket API."""
from __future__ import annotations
from collections.abc import Mapping
from dataclasses import dataclass
from enum import StrEnum
import logging
from typing import Any, Self
from homeassistant.const import CONF_TARGET
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er, target as target_helpers
from homeassistant.helpers.condition import (
async_get_all_descriptions as async_get_all_condition_descriptions,
)
from homeassistant.helpers.entity import (
entity_sources,
get_device_class,
get_supported_features,
)
from homeassistant.helpers.service import (
async_get_all_descriptions as async_get_all_service_descriptions,
)
from homeassistant.helpers.trigger import (
async_get_all_descriptions as async_get_all_trigger_descriptions,
)
from homeassistant.helpers.typing import ConfigType
from homeassistant.util.hass_dict import HassKey
_LOGGER = logging.getLogger(__name__)
FLATTENED_SERVICE_DESCRIPTIONS_CACHE: HassKey[
tuple[dict[str, dict[str, Any]], dict[str, dict[str, Any]]]
] = HassKey("websocket_automation_flat_service_description_cache")
AUTOMATION_COMPONENT_LOOKUP_CACHE: HassKey[
dict[
AutomationComponentType,
tuple[Mapping[str, Any], _AutomationComponentLookupTable],
]
] = HassKey("websocket_automation_component_lookup_cache")
class AutomationComponentType(StrEnum):
"""Types of automation components."""
TRIGGERS = "triggers"
CONDITIONS = "conditions"
SERVICES = "services"
@dataclass(slots=True, kw_only=True)
class _EntityFilter:
"""Single entity filter configuration."""
integration: str | None
domains: set[str]
device_classes: set[str]
supported_features: set[int]
def matches(
self, hass: HomeAssistant, entity_id: str, domain: str, integration: str
) -> bool:
"""Return if entity matches all criteria in this filter."""
if self.integration and integration != self.integration:
return False
if self.domains and domain not in self.domains:
return False
if self.device_classes:
if (
entity_device_class := get_device_class(hass, entity_id)
) is None or entity_device_class not in self.device_classes:
return False
if self.supported_features:
entity_supported_features = get_supported_features(hass, entity_id)
if not any(
feature & entity_supported_features == feature
for feature in self.supported_features
):
return False
return True
@dataclass(slots=True, kw_only=True)
class _AutomationComponentLookupData:
"""Helper class for looking up automation components."""
component: str
filters: list[_EntityFilter]
primary_entities_only: bool = True
@classmethod
def create(cls, component: str, target_description: dict[str, Any]) -> Self:
"""Build automation component lookup data from target description."""
filters: list[_EntityFilter] = []
primary_entities_only = target_description.get("primary_entities_only", True)
entity_filters_config = target_description.get("entity", [])
for entity_filter_config in entity_filters_config:
entity_filter = _EntityFilter(
integration=entity_filter_config.get("integration"),
domains=set(entity_filter_config.get("domain", [])),
device_classes=set(entity_filter_config.get("device_class", [])),
supported_features=set(
entity_filter_config.get("supported_features", [])
),
)
filters.append(entity_filter)
return cls(
component=component,
filters=filters,
primary_entities_only=primary_entities_only,
)
def matches(
self,
hass: HomeAssistant,
entity_id: str,
domain: str,
integration: str,
check_entity_category: bool,
) -> bool:
"""Return if entity matches ANY of the filters."""
if not self.filters:
return True
if check_entity_category and self.primary_entities_only:
entry = er.async_get(hass).async_get(entity_id)
if entry is None or entry.entity_category is not None:
return False
return any(
f.matches(hass, entity_id, domain, integration) for f in self.filters
)
@dataclass(slots=True, kw_only=True)
class _AutomationComponentLookupTable:
"""Helper class for looking up automation components."""
domain_components: dict[str | None, list[_AutomationComponentLookupData]]
component_count: int
def _get_automation_component_domains(
target_description: dict[str, Any],
) -> set[str | None]:
"""Get a list of domains (including integration domains) of an automation component.
The list of domains is extracted from each target's entity filters.
If a filter is missing both domain and integration keys, None is added to the
returned set.
"""
entity_filters_config = target_description.get("entity", [])
if not entity_filters_config:
return {None}
domains: set[str | None] = set()
for entity_filter_config in entity_filters_config:
filter_integration = entity_filter_config.get("integration")
filter_domains = entity_filter_config.get("domain", [])
if not filter_domains and not filter_integration:
domains.add(None)
continue
if filter_integration:
domains.add(filter_integration)
for domain in filter_domains:
domains.add(domain)
return domains
def _get_automation_component_lookup_table(
hass: HomeAssistant,
component_type: AutomationComponentType,
component_descriptions: Mapping[str, Mapping[str, Any] | None],
) -> _AutomationComponentLookupTable:
"""Get a dict of automation components keyed by domain, along with the total number of components.
Returns a cached object if available.
"""
try:
cache = hass.data[AUTOMATION_COMPONENT_LOOKUP_CACHE]
except KeyError:
cache = hass.data[AUTOMATION_COMPONENT_LOOKUP_CACHE] = {}
if (cached := cache.get(component_type)) is not None:
cached_descriptions, cached_lookup = cached
if cached_descriptions is component_descriptions:
return cached_lookup
_LOGGER.debug(
"Automation component lookup data for %s has no cache yet", component_type
)
lookup_table = _AutomationComponentLookupTable(
domain_components={}, component_count=0
)
for component, description in component_descriptions.items():
if description is None or CONF_TARGET not in description:
_LOGGER.debug("Skipping component %s without target description", component)
continue
domains = _get_automation_component_domains(description[CONF_TARGET])
lookup_data = _AutomationComponentLookupData.create(
component, description[CONF_TARGET]
)
for domain in domains:
lookup_table.domain_components.setdefault(domain, []).append(lookup_data)
lookup_table.component_count += 1
cache[component_type] = (component_descriptions, lookup_table)
return lookup_table
def _async_get_automation_components_for_target(
hass: HomeAssistant,
component_type: AutomationComponentType,
target_selection: ConfigType,
expand_group: bool,
component_descriptions: Mapping[str, Mapping[str, Any] | None],
) -> set[str]:
"""Get automation components (triggers/conditions/services) for a target.
Returns all components that can be used on any entity that are currently part of a target.
"""
extracted = target_helpers.async_extract_referenced_entity_ids(
hass,
target_helpers.TargetSelection(target_selection),
expand_group=expand_group,
primary_entities_only=False,
)
_LOGGER.debug("Extracted entities for lookup: %s", extracted)
lookup_table = _get_automation_component_lookup_table(
hass, component_type, component_descriptions
)
_LOGGER.debug(
"Automation components per domain: %s", lookup_table.domain_components
)
check_entity_category = len(extracted.indirectly_referenced) > 0
entity_infos = entity_sources(hass)
matched_components: set[str] = set()
for entity_id in extracted.referenced | extracted.indirectly_referenced:
if lookup_table.component_count == len(matched_components):
# All automation components matched already, so we don't need to iterate further
break
entity_info = entity_infos.get(entity_id)
if entity_info is None:
_LOGGER.debug("No entity source found for %s", entity_id)
continue
entity_domain = entity_id.split(".")[0]
entity_integration = entity_info["domain"]
for domain in (entity_domain, entity_integration, None):
if not (
domain_component_data := lookup_table.domain_components.get(domain)
):
continue
for component_data in domain_component_data:
if component_data.component in matched_components:
continue
if component_data.matches(
hass,
entity_id,
entity_domain,
entity_integration,
check_entity_category,
):
matched_components.add(component_data.component)
return matched_components
async def async_get_triggers_for_target(
hass: HomeAssistant, target_selector: ConfigType, expand_group: bool
) -> set[str]:
"""Get triggers for a target."""
descriptions = await async_get_all_trigger_descriptions(hass)
return _async_get_automation_components_for_target(
hass,
AutomationComponentType.TRIGGERS,
target_selector,
expand_group,
descriptions,
)
async def async_get_conditions_for_target(
hass: HomeAssistant, target_selector: ConfigType, expand_group: bool
) -> set[str]:
"""Get conditions for a target."""
descriptions = await async_get_all_condition_descriptions(hass)
return _async_get_automation_components_for_target(
hass,
AutomationComponentType.CONDITIONS,
target_selector,
expand_group,
descriptions,
)
async def async_get_services_for_target(
hass: HomeAssistant, target_selector: ConfigType, expand_group: bool
) -> set[str]:
"""Get services for a target."""
descriptions = await async_get_all_service_descriptions(hass)
def get_flattened_service_descriptions() -> dict[str, dict[str, Any]]:
"""Get flattened service descriptions, with caching."""
if FLATTENED_SERVICE_DESCRIPTIONS_CACHE in hass.data:
cached_descriptions, cached_flattened_descriptions = hass.data[
FLATTENED_SERVICE_DESCRIPTIONS_CACHE
]
# If the descriptions are the same, return the cached flattened version
if cached_descriptions is descriptions:
return cached_flattened_descriptions
# Flatten dicts to be keyed by domain.name to match trigger/condition format
flattened_descriptions = {
f"{domain}.{service_name}": desc
for domain, services in descriptions.items()
for service_name, desc in services.items()
}
hass.data[FLATTENED_SERVICE_DESCRIPTIONS_CACHE] = (
descriptions,
flattened_descriptions,
)
return flattened_descriptions
return _async_get_automation_components_for_target(
hass,
AutomationComponentType.SERVICES,
target_selector,
expand_group,
get_flattened_service_descriptions(),
)