Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 94 additions & 5 deletions mikeio1d/result_network/result_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from abc import ABC
from abc import abstractmethod
from warnings import warn

from ..quantities import TimeSeriesId
from ..quantities import TimeSeriesIdGroup
Expand Down Expand Up @@ -94,7 +95,23 @@ def add_query(self, data_item: IDataItem):
query = self.get_query(data_item)
self.res1d.network.add_query(query)

def read(self, column_mode: Optional[str | ColumnMode] = None) -> pd.DataFrame:
def add(self, quantities: str | list[str] | None = None):
"""Add a ResultQuantity to ResultNetwork.read_queue based on the data item.

Parameters
----------
quantities : str | list[str] | None
List of quantities to add for reading later using Res1D.read() method.
"""
result_quantities = self._get_result_quantities(quantities)
for result_quantity in result_quantities:
result_quantity.add()

def read(
self,
column_mode: str | ColumnMode = None,
quantities: str | list[str] | None = None,
) -> pd.DataFrame:
"""Read the time series data for all quantities at this location into a DataFrame.

Parameters
Expand All @@ -104,15 +121,87 @@ def read(self, column_mode: Optional[str | ColumnMode] = None) -> pd.DataFrame:
'all' - column MultiIndex with levels matching TimeSeriesId objects.
'compact' - same as 'all', but removes levels with default values.
'timeseries' - column index of TimeSeriesId objects

quantities : str | list[str] | None
List of quantities to read for this location..
"""
qlists = self._creator.result_quantity_map.values()
result_quantities = [q for qlist in qlists for q in qlist]
timesries_ids = [q.timeseries_id for q in result_quantities]
result_quantities = self._get_result_quantities(quantities)
timesries_ids = [result_quantity.timeseries_id for result_quantity in result_quantities]
reader = self.res1d.reader
df = reader.read(timesries_ids, column_mode=column_mode)
return df

def plot(self, ax=None, quantities: str | list[str] | None = None, **kwargs):
"""Plot the time series data.

Parameters
----------
ax : matplotlib.axes.Axes, optional
Axes object to plot on.
quantities : str | list[str] | None
List of quantities to plot for this location.
**kwargs
Additional keyword arguments passed to pandas.DataFrame.plot.

Returns
-------
matplotlib.axes.Axes
Axes object with the plot.
"""
result_quantities = self._get_result_quantities(quantities)
for result_quantity in result_quantities:
ax = result_quantity.plot(ax=ax, **kwargs)
return ax

def to_dataframe(self, quantities: str | list[str] | None = None):
"""Get a time series as a data frame."""
return self.read(quantities)

def to_csv(
self, file_path, time_step_skipping_number=1, quantities: str | list[str] | None = None
):
"""Extract time series data into a csv file."""
result_quantities = self._get_result_quantities(quantities)
queries = [result_quantity.timeseries_id for result_quantity in result_quantities]
self.res1d.to_csv(file_path, queries, time_step_skipping_number)

def to_dfs0(
self, file_path, time_step_skipping_number=1, quantities: str | list[str] | None = None
):
"""Extract time series data into a dfs0 file."""
result_quantities = self._get_result_quantities(quantities)
queries = [result_quantity.timeseries_id for result_quantity in result_quantities]
self.res1d.to_dfs0(file_path, queries, time_step_skipping_number)

def to_txt(
self, file_path, time_step_skipping_number=1, quantities: str | list[str] | None = None
):
"""Extract time series data into a txt file."""
result_quantities = self._get_result_quantities(quantities)
queries = [result_quantity.timeseries_id for result_quantity in result_quantities]
self.res1d.to_txt(file_path, queries, time_step_skipping_number)

def _get_result_quantities(
self, quantities: str | list[str] | None = None
) -> list[ResultQuantity]:
quantities = self._process_quantities_input(quantities)

result_quantities_all = []
for quantity in quantities:
result_quantities = self._creator.result_quantity_map.get(quantity, None)
if result_quantities is None:
warn(f"{quantity} quantity not found!")
continue

for result_quantity in result_quantities:
result_quantities_all.append(result_quantity)

return result_quantities_all

def _process_quantities_input(self, quantities: str | list[str] | None = None) -> list[str]:
quantities = [quantities] if isinstance(quantities, str) else quantities
quantities = self.quantities if quantities is None else quantities
return quantities


class ResultLocationCreator(ABC):
"""A base helper class for creating ResultLocation.
Expand Down