Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions src/emhass/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import pickle as cPickle
import re
from datetime import datetime, timedelta
from itertools import zip_longest
from urllib.parse import quote

import aiofiles
Expand Down Expand Up @@ -410,11 +409,11 @@ async def _get_weather_solcast(self, w_forecast_cache_path: str) -> pd.DataFrame
headers = {
"User-Agent": "EMHASS",
"Authorization": "Bearer " + self.retrieve_hass_conf["solcast_api_key"],
"content-type": header_accept,
"Accept": header_accept,
}
days_solcast = int(len(self.forecast_dates) * self.freq.seconds / 3600)
roof_ids = re.split(r"[,\s]+", self.retrieve_hass_conf["solcast_rooftop_id"].strip())
total_data_list = [0] * len(self.forecast_dates)
total_data = pd.DataFrame()

async with aiohttp.ClientSession() as session:
for roof_id in roof_ids:
Expand All @@ -432,21 +431,35 @@ async def _get_weather_solcast(self, w_forecast_cache_path: str) -> pd.DataFrame
"Solcast error: Issue with request, check API key and rooftop ID."
)
return False
data_list = []
for elm in data["forecasts"]:
data_list.append(elm["pv_estimate"] * 1000)
if len(data_list) < len(self.forecast_dates):
self.logger.error("Not enough data retrieved from Solcast service.")
if len(data["forecasts"]) == 0:
self.logger.error("No data retrieved from Solcast service.")
return False
total_data_list = [
total + current
for total, current in zip_longest(total_data_list, data_list, fillvalue=0)
# Build a timestamped DataFrame from Solcast period_end timestamps
solcast_timestamps = [
pd.Timestamp(elm["period_end"]) for elm in data["forecasts"]
]
data_list = [elm["pv_estimate"] * 1000 for elm in data["forecasts"]]
data_tmp = pd.DataFrame(
{"yhat": data_list},
index=pd.DatetimeIndex(solcast_timestamps, name="ts"),
)
if data_tmp.index.tz is None:
data_tmp.index = data_tmp.index.tz_localize("UTC")
data_tmp.index = data_tmp.index.tz_convert(self.forecast_dates.tz)
# Reindex to target forecast dates and interpolate
# (handles Solcast 30-min data -> any optimization_time_step)
combined_index = data_tmp.index.union(self.forecast_dates).sort_values()
data_tmp = data_tmp.reindex(combined_index)
data_tmp.interpolate(method="time", inplace=True)
data_tmp = data_tmp.reindex(self.forecast_dates)
# Zero-fill edges beyond Solcast data range
data_tmp = data_tmp.fillna(0.0)
if len(total_data) == 0:
total_data = data_tmp.copy()
else:
total_data = total_data + data_tmp

total_data_list = total_data_list[0 : len(self.forecast_dates)]
data_dict = {"ts": self.forecast_dates, "yhat": total_data_list}
data = pd.DataFrame.from_dict(data_dict)
data.set_index("ts", inplace=True)
data = total_data
if self.params["passed_data"].get("weather_forecast_cache", False):
data = await self.set_cached_forecast_data(w_forecast_cache_path, data)
return data
Expand Down
80 changes: 80 additions & 0 deletions tests/test_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,86 @@ async def test_get_weather_forecast_solcast_multiroofs_method_mock(self):
emhass_conf["data_path"] / "weather_forecast_data.pkl",
)

# Test Solcast resampling: 30-min Solcast data → 15-min optimization_time_step
async def test_get_weather_forecast_solcast_15min_resampling_mock(self):
"""Verify Solcast data is correctly resampled when optimization_time_step < 30 min."""
# Override freq to 15 minutes (default test uses 30 min)
original_freq = self.fcst.freq
original_forecast_dates = self.fcst.forecast_dates
self.fcst.freq = pd.Timedelta("15min")
self.fcst.retrieve_hass_conf["optimization_time_step"] = pd.Timedelta("15min")
# Rebuild forecast_dates at 15-min intervals (same time window → 2× more slots)
self.fcst.forecast_dates = pd.date_range(
start=original_forecast_dates[0],
end=original_forecast_dates[-1],
freq=self.fcst.freq,
)
self.fcst.params = {
"passed_data": {
"weather_forecast_cache": False,
"weather_forecast_cache_only": False,
}
}
self.fcst.retrieve_hass_conf["solcast_api_key"] = "123456"
self.fcst.retrieve_hass_conf["solcast_rooftop_id"] = "123456"
if os.path.isfile(emhass_conf["data_path"] / "weather_forecast_data.pkl"):
os.rename(
emhass_conf["data_path"] / "weather_forecast_data.pkl",
emhass_conf["data_path"] / "temp_weather_forecast_data.pkl",
)

test_data_path = str(emhass_conf["data_path"] / "test_response_solcast_get_method.pbz2")
async with aiofiles.open(test_data_path, "rb") as f:
compressed = await f.read()
data = bz2.decompress(compressed)
data = cPickle.loads(data)
data = orjson.loads(data.content)

days_solcast = int(len(self.fcst.forecast_dates) * self.fcst.freq.seconds / 3600)
get_url = f"https://api.solcast.com.au/rooftop_sites/123456/forecasts?hours={days_solcast}"

with aioresponses() as mocked:
mocked.get(get_url, payload=data)
df_weather_scrap = await self.fcst.get_weather_forecast(method="solcast")

self.assertIsInstance(df_weather_scrap, type(pd.DataFrame()))
self.assertIsInstance(df_weather_scrap.index, pd.core.indexes.datetimes.DatetimeIndex)
self.assertEqual(df_weather_scrap.index.tz, self.fcst.time_zone)
# Key assertion: output length must match the 15-min forecast_dates
self.assertEqual(len(df_weather_scrap), len(self.fcst.forecast_dates))
# Verify no NaN values after interpolation
self.assertFalse(df_weather_scrap["yhat"].isna().any())

# Verify interpolation correctness at a midpoint between two 30-min source timestamps
# Pick a midpoint index to avoid edge effects
midpoint_idx = len(df_weather_scrap.index) // 2
ts_mid = df_weather_scrap.index[midpoint_idx]
ts_prev = ts_mid - pd.Timedelta(minutes=15)
ts_next = ts_mid + pd.Timedelta(minutes=15)

# Ensure the neighboring timestamps exist in the index
self.assertIn(ts_prev, df_weather_scrap.index)
self.assertIn(ts_next, df_weather_scrap.index)

y_prev = df_weather_scrap.loc[ts_prev, "yhat"]
y_mid = df_weather_scrap.loc[ts_mid, "yhat"]
y_next = df_weather_scrap.loc[ts_next, "yhat"]

# Expected linear interpolation at the midpoint
expected_mid = (y_prev + y_next) / 2.0

# Check that the interpolated midpoint matches the expected linear value
self.assertAlmostEqual(y_mid, expected_mid, places=6)

# Restore original freq/forecast_dates
Comment thread
davidusb-geek marked this conversation as resolved.
self.fcst.freq = original_freq
self.fcst.forecast_dates = original_forecast_dates
if os.path.isfile(emhass_conf["data_path"] / "temp_weather_forecast_data.pkl"):
os.rename(
emhass_conf["data_path"] / "temp_weather_forecast_data.pkl",
emhass_conf["data_path"] / "weather_forecast_data.pkl",
)

# Test output weather forecast using Forecast.Solar with mock get request data
async def test_get_weather_forecast_solarforecast_method_mock(self):
test_data_path = str(
Expand Down
Loading