-
-
Notifications
You must be signed in to change notification settings - Fork 35.8k
Add energy price calendar platform to Teslemetry #145848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
a0cc4e8
1416b3d
90e7d85
37ed601
09c50ce
39c0dbe
6f086ce
c905f03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,288 @@ | ||
| """Calendar platform for Teslemetry integration.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from datetime import datetime, timedelta | ||
| from typing import Any | ||
|
|
||
| from attr import dataclass | ||
|
|
||
| from homeassistant.components.calendar import CalendarEntity, CalendarEvent | ||
| from homeassistant.core import HomeAssistant | ||
| from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback | ||
| from homeassistant.util import dt as dt_util | ||
|
|
||
| from . import TeslemetryConfigEntry | ||
| from .entity import TeslemetryEnergyInfoEntity | ||
|
|
||
|
|
||
| async def async_setup_entry( | ||
| hass: HomeAssistant, | ||
| entry: TeslemetryConfigEntry, | ||
| async_add_entities: AddConfigEntryEntitiesCallback, | ||
| ) -> None: | ||
| """Set up the Teslemetry Calendar platform from a config entry.""" | ||
|
|
||
| entities_to_add: list[CalendarEntity] = [] | ||
|
|
||
| # Add buy tariff calendar entities | ||
| entities_to_add.extend( | ||
| TeslemetryTariffSchedule(energy, "tariff_content_v2") | ||
| for energy in entry.runtime_data.energysites | ||
| if energy.info_coordinator.data.get("tariff_content_v2_seasons") | ||
| ) | ||
|
|
||
| # Add sell tariff calendar entities | ||
| entities_to_add.extend( | ||
| TeslemetryTariffSchedule(energy, "tariff_content_v2_sell_tariff") | ||
| for energy in entry.runtime_data.energysites | ||
| if energy.info_coordinator.data.get("tariff_content_v2_sell_tariff_seasons") | ||
| ) | ||
|
|
||
| async_add_entities(entities_to_add) | ||
|
|
||
|
|
||
| @dataclass | ||
| class TariffPeriod: | ||
| """A single tariff period.""" | ||
|
|
||
| name: str | ||
| price: float | ||
| from_hour: int = 0 | ||
| from_minute: int = 0 | ||
| to_hour: int = 0 | ||
| to_minute: int = 0 | ||
|
|
||
|
|
||
| class TeslemetryTariffSchedule(TeslemetryEnergyInfoEntity, CalendarEntity): | ||
| """Energy Site Tariff Schedule Calendar.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| data: Any, | ||
| key_base: str, | ||
| ) -> None: | ||
| """Initialize the tariff schedule calendar.""" | ||
| self.key_base: str = key_base | ||
| self.seasons: dict[str, dict[str, Any]] = {} | ||
| self.charges: dict[str, dict[str, Any]] = {} | ||
| super().__init__(data, key_base) | ||
|
|
||
| @property | ||
| def event(self) -> CalendarEvent | None: | ||
| """Return the current active tariff event.""" | ||
| now = dt_util.now() | ||
| current_season_name = self._get_current_season(now) | ||
|
|
||
| if not current_season_name or not self.seasons.get(current_season_name): | ||
| return None | ||
|
|
||
| # Get the time of use periods for the current season | ||
| tou_periods = self.seasons[current_season_name].get("tou_periods", {}) | ||
|
|
||
| for period_name, period_group in tou_periods.items(): | ||
| for period_def in period_group.get("periods", []): | ||
| # Check if today is within the period's day of week range | ||
| day_of_week = now.weekday() # Monday is 0, Sunday is 6 | ||
| from_day = period_def.get("fromDayOfWeek", 0) | ||
| to_day = period_def.get("toDayOfWeek", 6) | ||
|
Comment on lines
+87
to
+88
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it correct that the API doesn't always set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the data outside of the API is a protobuf, which treats missing values as their defaults, so we have to fill this back in, as the default is the full week. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. Can you add a comment in the code then with a reference to where the defaults are defined? |
||
| if from_day > day_of_week > to_day: | ||
| continue | ||
|
|
||
| # Calculate start and end times for today | ||
| from_hour = period_def.get("fromHour", 0) % 24 | ||
| from_minute = period_def.get("fromMinute", 0) % 60 | ||
| to_hour = period_def.get("toHour", 0) % 24 | ||
| to_minute = period_def.get("toMinute", 0) % 60 | ||
|
Comment on lines
+93
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The defaults here differ from the defaults for weekdays, is that correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dont know what your referring to, this code has no concept of weekdays and weekends, its just start days of week and end days of week. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So for example, the default here is all values as zero, which equals a full day due to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean that the default for |
||
|
|
||
| start_time = now.replace( | ||
| hour=from_hour, minute=from_minute, second=0, microsecond=0 | ||
| ) | ||
| end_time = now.replace( | ||
| hour=to_hour, minute=to_minute, second=0, microsecond=0 | ||
| ) | ||
|
|
||
| # Handle periods that cross midnight | ||
| if end_time <= start_time: | ||
| potential_end_time = end_time + timedelta(days=1) | ||
| if start_time <= now < potential_end_time: | ||
| end_time = potential_end_time | ||
| elif (start_time - timedelta(days=1)) <= now < end_time: | ||
| start_time -= timedelta(days=1) | ||
| else: | ||
| continue | ||
| elif not (start_time <= now < end_time): | ||
| continue | ||
|
Comment on lines
+105
to
+115
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add test cases which assert the behavior here. It's very easy to mess up, and even if your code is correct now it might break tomorrow if the code is changed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure which behavior specifically you are referring to, as this would already be tested with the fixture and snapshot. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a single, non-parametrized, test case each to test the state of the calendar entity as well as testing the get_events service. What I mean is that you should add parametrization or separate test cases testing the different branches to account for crossing midnight, crossing weeks and so on, since that logic is non trivial and very easy to mess up. |
||
|
|
||
| # Create calendar event for the active period | ||
| price = self._get_price_for_period(current_season_name, period_name) | ||
| price_str = f"{price:.2f}/kWh" if price is not None else "Unknown Price" | ||
|
|
||
| return CalendarEvent( | ||
| start=start_time, | ||
| end=end_time, | ||
| summary=f"{period_name.capitalize().replace('_', ' ')}: {price_str}", | ||
| description=( | ||
| f"Season: {current_season_name.capitalize()}\n" | ||
| f"Period: {period_name.capitalize().replace('_', ' ')}\n" | ||
| f"Price: {price_str}" | ||
| ), | ||
| uid=f"{self.key_base}_{current_season_name}_{period_name}_{start_time.isoformat()}", | ||
| ) | ||
|
|
||
| return None # No active period found for the current time and season | ||
|
|
||
| async def async_get_events( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comments as for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have tried to address feedback in multiple places where I noticed. |
||
| self, | ||
| hass: HomeAssistant, | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| ) -> list[CalendarEvent]: | ||
| """Return calendar events (tariff periods) within a datetime range.""" | ||
| events: list[CalendarEvent] = [] | ||
|
|
||
| # Convert dates to local timezone | ||
| start_date = dt_util.as_local(start_date) | ||
| end_date = dt_util.as_local(end_date) | ||
|
|
||
| # Process each day in the requested range | ||
| current_day = dt_util.start_of_local_day(start_date) | ||
| while current_day < end_date: | ||
| season_name = self._get_current_season(current_day) | ||
| if not season_name or not self.seasons.get(season_name): | ||
| current_day += timedelta(days=1) | ||
| continue | ||
|
|
||
| # Get the time of use periods for the season | ||
| tou_periods = self.seasons[season_name].get("tou_periods", {}) | ||
| day_of_week = current_day.weekday() | ||
|
|
||
| for period_name, period_group in tou_periods.items(): | ||
| for period_def in period_group.get("periods", []): | ||
| # Check if current day falls within the period's day range | ||
| from_day = period_def.get("fromDayOfWeek", 0) | ||
| to_day = period_def.get("toDayOfWeek", 6) | ||
| if from_day > day_of_week > to_day: | ||
| continue | ||
|
|
||
| # Extract period timing for current day | ||
| from_hour = period_def.get("fromHour", 0) % 24 | ||
| from_minute = period_def.get("fromMinute", 0) % 60 | ||
| to_hour = period_def.get("toHour", 0) % 24 | ||
| to_minute = period_def.get("toMinute", 0) % 60 | ||
|
|
||
| start_time = current_day.replace( | ||
| hour=from_hour, minute=from_minute, second=0, microsecond=0 | ||
| ) | ||
| end_time = current_day.replace( | ||
| hour=to_hour, minute=to_minute, second=0, microsecond=0 | ||
| ) | ||
|
|
||
| # Adjust for periods crossing midnight | ||
| if end_time <= start_time: | ||
| end_time += timedelta(days=1) | ||
|
|
||
| # Check for overlap with requested date range | ||
| if start_time < end_date and end_time > start_date: | ||
| price = self._get_price_for_period(season_name, period_name) | ||
| price_str = ( | ||
| f"{price:.2f}/kWh" if price is not None else "Unknown Price" | ||
| ) | ||
| events.append( | ||
| CalendarEvent( | ||
| start=start_time, | ||
| end=end_time, | ||
| summary=f"{period_name.capitalize().replace('_', ' ')}: {price_str}", | ||
| description=( | ||
| f"Season: {season_name.capitalize()}\n" | ||
| f"Period: {period_name.capitalize().replace('_', ' ')}\n" | ||
| f"Price: {price_str}" | ||
| ), | ||
| uid=f"{self.key_base}_{season_name}_{period_name}_{start_time.isoformat()}", | ||
| ) | ||
| ) | ||
|
|
||
| current_day += timedelta(days=1) | ||
|
|
||
| # Sort events chronologically | ||
| events.sort(key=lambda x: x.start) | ||
| return events | ||
|
|
||
| def _get_current_season(self, date_to_check: datetime) -> str | None: | ||
| """Determine the active season for a given date.""" | ||
| local_date = dt_util.as_local(date_to_check) | ||
| year = local_date.year | ||
|
|
||
| for season_name, season_data in self.seasons.items(): | ||
| if not season_data: | ||
| continue | ||
|
|
||
| # Extract season date boundaries | ||
| try: | ||
| from_month = season_data["fromMonth"] | ||
| from_day = season_data["fromDay"] | ||
| to_month = season_data["toMonth"] | ||
| to_day = season_data["toDay"] | ||
|
|
||
| # Handle seasons that cross year boundaries | ||
| start_year = year | ||
| end_year = year | ||
|
|
||
| # Determine if season crosses year boundary | ||
| if from_month > to_month or ( | ||
| from_month == to_month and from_day > to_day | ||
| ): | ||
| if local_date.month > from_month or ( | ||
| local_date.month == from_month and local_date.day >= from_day | ||
| ): | ||
| end_year = year + 1 | ||
| else: | ||
| start_year = year - 1 | ||
|
|
||
| season_start = local_date.replace( | ||
| year=start_year, | ||
| month=from_month, | ||
| day=from_day, | ||
| hour=0, | ||
| minute=0, | ||
| second=0, | ||
| microsecond=0, | ||
| ) | ||
| # Create exclusive end date | ||
| season_end = local_date.replace( | ||
| year=end_year, | ||
| month=to_month, | ||
| day=to_day, | ||
| hour=0, | ||
| minute=0, | ||
| second=0, | ||
| microsecond=0, | ||
| ) + timedelta(days=1) | ||
|
|
||
| if season_start <= local_date < season_end: | ||
| return season_name | ||
| except (KeyError, ValueError): | ||
| continue | ||
|
|
||
| return None # No matching season found | ||
|
|
||
| def _get_price_for_period(self, season_name: str, period_name: str) -> float | None: | ||
| """Get the price for a specific season and period name.""" | ||
| try: | ||
| # Get rates for the season with fallback to "ALL" | ||
| season_charges = self.charges.get(season_name, self.charges.get("ALL", {})) | ||
| rates = season_charges.get("rates", {}) | ||
| # Get price for the period with fallback to "ALL" | ||
| price = rates.get(period_name, rates.get("ALL")) | ||
| return float(price) if price is not None else None | ||
| except (KeyError, ValueError, TypeError): | ||
| return None | ||
|
|
||
| def _async_update_attrs(self) -> None: | ||
| """Update the Calendar attributes from coordinator data.""" | ||
| # Load tariff data from coordinator | ||
| self.seasons = self.coordinator.data.get(f"{self.key_base}_seasons", {}) | ||
| self.charges = self.coordinator.data.get(f"{self.key_base}_energy_charges", {}) | ||
|
|
||
| # Set availability based on data presence | ||
| self._attr_available = bool(self.seasons and self.charges) | ||
Uh oh!
There was an error while loading. Please reload this page.