Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions forecast_blend/save.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ async def save_forecast_to_data_platform(

We do the following steps:
1. get Forecaster
3. loop over all gsps: get the location object
4. Forecast the forecast values
5. Save to the data platform
2. loop over all gsps: get the location object
3. Forecast the forecast values
4. Save to the data platform
5. If gsp_id=0 also save adjusted forecast

Args:
forecast_normed_da: DataArray of normalized forecasts for all GSPs
Expand Down Expand Up @@ -63,6 +64,25 @@ async def save_forecast_to_data_platform(
)
tasks.append(asyncio.create_task(client.create_forecast(forecast_request)))

# 5. save adjusted if gsp_id=0
if gsp_id == 0:

forecast_values = map_values_df_to_dp_requests(
forecast_values_by_gsp_id[gsp_id],
init_time_utc=init_time_utc,
capacity_watts=locations_uuid_and_capacity_by_gsp_id[gsp_id]["effective_capacity_watts"],
use_adjuster=True,
)
forecaster = await create_forecaster_if_not_exists(client=client, model_tag=model_tag+"_adjust")
forecast_request = dp.CreateForecastRequest(
forecaster=forecaster,
location_uuid=locations_uuid_and_capacity_by_gsp_id[gsp_id]["location_uuid"],
energy_source=dp.EnergySource.SOLAR,
init_time_utc=init_time_utc.replace(tzinfo=UTC),
values=forecast_values,
)
tasks.append(asyncio.create_task(client.create_forecast(forecast_request)))

logger.info(f"Saving {len(tasks)} forecasts to Data Platform")
list_results = await asyncio.gather(*tasks, return_exceptions=True)
for exc in filter(lambda x: isinstance(x, Exception), list_results):
Expand All @@ -75,6 +95,7 @@ def map_values_df_to_dp_requests(
forecast_values_df: pd.DataFrame,
init_time_utc: datetime,
capacity_watts: int,
use_adjuster: bool = False
) -> list[dp.CreateForecastRequestForecastValue]:
"""Convert a Dataframe for a single GSP to a list of ForecastValue objects.

Expand All @@ -84,14 +105,24 @@ def map_values_df_to_dp_requests(
- p10_mw (optional)
- p50_mw
- p90_mw (optional)
- adjuster_mw
init_time_utc: Forecast initialization time
capacity_watts: Capacity of the location in watts
use_adjuster: Whether to apply the adjuster or not
"""

# create horizon mins
target_datetime_utc = pd.to_datetime(forecast_values_df['target_datetime_utc'].values)
horizons_mins = (target_datetime_utc - init_time_utc).total_seconds() / 60
horizons_mins = horizons_mins.astype(int)

# get adjuster values
if use_adjuster:
for p_col in ['p10_mw', 'p50_mw', 'p90_mw']:
if p_col in forecast_values_df.columns:
forecast_values_df[p_col] = forecast_values_df[p_col] - forecast_values_df['adjust_mw']
forecast_values_df[p_col] = forecast_values_df[p_col].clip(lower=0)

# Reduce singular dimensions
p50s = forecast_values_df['p50_mw'].values.astype(float)
p50s = p50s * 1*10**6 / float(capacity_watts)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ dirty_template = "{tag}"
testpaths = ["tests"]

[tool.uv.sources]
dp-sdk = { url = "https://github.com/openclimatefix/data-platform/releases/download/v0.13.2/dp_sdk-0.13.2-py3-none-any.whl" }
dp-sdk = { url = "https://github.com/openclimatefix/data-platform/releases/download/v0.14.0/dp_sdk-0.14.0-py3-none-any.whl" }
19 changes: 10 additions & 9 deletions tests/integration/test_save_to_data_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ async def test_save_to_generation_to_data_platform(client):
This test uses the `data_platform` fixture to ensure that the Data Platform service
is running and can accept data.
"""
# setup: add location - gsp 1
metadata = Struct(fields={"gsp_id": Value(number_value=1)})
# setup: add location - gsp 0
metadata = Struct(fields={"gsp_id": Value(number_value=0)})
create_location_request = dp.CreateLocationRequest(
location_name="gsp1",
location_name="gsp0",
energy_source=dp.EnergySource.SOLAR,
geometry_wkt="POINT(0 0)",
location_type=dp.LocationType.GSP,
Expand All @@ -79,6 +79,7 @@ async def test_save_to_generation_to_data_platform(client):
"p10_mw": [0.3] * 24,
"p50_mw": [0.5] * 24,
"p90_mw": [0.7] * 24,
"adjust_mw": [0.1] * 24,
"target_datetime_utc": pd.Timestamp("2025-01-01")
+ pd.timedelta_range(
start=0,
Expand All @@ -88,20 +89,18 @@ async def test_save_to_generation_to_data_platform(client):
},
)

# fake_data = fake_data.set_index(["target_datetime_utc"])

# Test the functyion
_ = await save_forecast_to_data_platform(
forecast_values_by_gsp_id={1: fake_data},
locations_uuid_and_capacity_by_gsp_id={1: {'location_uuid': location_uuid, 'effective_capacity_watts': 1_000_000}},
forecast_values_by_gsp_id={0: fake_data},
locations_uuid_and_capacity_by_gsp_id={0: {'location_uuid': location_uuid, 'effective_capacity_watts': 1_000_000}},
client=client,
model_tag="test_model",
init_time_utc=datetime.datetime(2025, 1, 1, tzinfo=datetime.UTC),
)

# check: read from the data platform to check it was saved
list_forecasters_response = await client.list_forecasters(dp.ListForecastersRequest())
assert len(list_forecasters_response.forecasters) == 1
assert len(list_forecasters_response.forecasters) == 2

# check: There is a forecast object
get_latest_forecasts_request = dp.GetLatestForecastsRequest(
Expand All @@ -112,9 +111,11 @@ async def test_save_to_generation_to_data_platform(client):
get_latest_forecasts_response = await client.get_latest_forecasts(
get_latest_forecasts_request,
)
assert len(get_latest_forecasts_response.forecasts) == 1
assert len(get_latest_forecasts_response.forecasts) == 2
forecast = get_latest_forecasts_response.forecasts[0]
assert forecast.forecaster.forecaster_name == "test_model"
forecast_adjust = get_latest_forecasts_response.forecasts[1]
assert forecast_adjust.forecaster.forecaster_name == "test_model_adjust"

# check: the number of forecast values
stream_forecast_data_request = dp.StreamForecastDataRequest(
Expand Down
8 changes: 4 additions & 4 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.