From 831a2f7adc15836c266eed43271eb8e670af8f9a Mon Sep 17 00:00:00 2001 From: Gediminas Kirsanskas Date: Thu, 19 Dec 2024 16:16:26 +0100 Subject: [PATCH] Implement add, read, etc. on ResultLocation. --- mikeio1d/result_network/result_location.py | 99 ++++++++++++++++++++-- 1 file changed, 94 insertions(+), 5 deletions(-) diff --git a/mikeio1d/result_network/result_location.py b/mikeio1d/result_network/result_location.py index e3d878c2..75370ef3 100644 --- a/mikeio1d/result_network/result_location.py +++ b/mikeio1d/result_network/result_location.py @@ -19,6 +19,7 @@ from abc import ABC from abc import abstractmethod +from warnings import warn from ..quantities import TimeSeriesId from ..quantities import TimeSeriesIdGroup @@ -94,7 +95,23 @@ def add_query(self, data_item: IDataItem): query = self.get_query(data_item) self.res1d.network.add_query(query) - def read(self, column_mode: Optional[str | ColumnMode] = None) -> pd.DataFrame: + def add(self, quantities: str | list[str] | None = None): + """Add a ResultQuantity to ResultNetwork.read_queue based on the data item. + + Parameters + ---------- + quantities : str | list[str] | None + List of quantities to add for reading later using Res1D.read() method. + """ + result_quantities = self._get_result_quantities(quantities) + for result_quantity in result_quantities: + result_quantity.add() + + def read( + self, + column_mode: str | ColumnMode = None, + quantities: str | list[str] | None = None, + ) -> pd.DataFrame: """Read the time series data for all quantities at this location into a DataFrame. Parameters @@ -104,15 +121,87 @@ def read(self, column_mode: Optional[str | ColumnMode] = None) -> pd.DataFrame: 'all' - column MultiIndex with levels matching TimeSeriesId objects. 'compact' - same as 'all', but removes levels with default values. 'timeseries' - column index of TimeSeriesId objects - + quantities : str | list[str] | None + List of quantities to read for this location.. """ - qlists = self._creator.result_quantity_map.values() - result_quantities = [q for qlist in qlists for q in qlist] - timesries_ids = [q.timeseries_id for q in result_quantities] + result_quantities = self._get_result_quantities(quantities) + timesries_ids = [result_quantity.timeseries_id for result_quantity in result_quantities] reader = self.res1d.reader df = reader.read(timesries_ids, column_mode=column_mode) return df + def plot(self, ax=None, quantities: str | list[str] | None = None, **kwargs): + """Plot the time series data. + + Parameters + ---------- + ax : matplotlib.axes.Axes, optional + Axes object to plot on. + quantities : str | list[str] | None + List of quantities to plot for this location. + **kwargs + Additional keyword arguments passed to pandas.DataFrame.plot. + + Returns + ------- + matplotlib.axes.Axes + Axes object with the plot. + """ + result_quantities = self._get_result_quantities(quantities) + for result_quantity in result_quantities: + ax = result_quantity.plot(ax=ax, **kwargs) + return ax + + def to_dataframe(self, quantities: str | list[str] | None = None): + """Get a time series as a data frame.""" + return self.read(quantities) + + def to_csv( + self, file_path, time_step_skipping_number=1, quantities: str | list[str] | None = None + ): + """Extract time series data into a csv file.""" + result_quantities = self._get_result_quantities(quantities) + queries = [result_quantity.timeseries_id for result_quantity in result_quantities] + self.res1d.to_csv(file_path, queries, time_step_skipping_number) + + def to_dfs0( + self, file_path, time_step_skipping_number=1, quantities: str | list[str] | None = None + ): + """Extract time series data into a dfs0 file.""" + result_quantities = self._get_result_quantities(quantities) + queries = [result_quantity.timeseries_id for result_quantity in result_quantities] + self.res1d.to_dfs0(file_path, queries, time_step_skipping_number) + + def to_txt( + self, file_path, time_step_skipping_number=1, quantities: str | list[str] | None = None + ): + """Extract time series data into a txt file.""" + result_quantities = self._get_result_quantities(quantities) + queries = [result_quantity.timeseries_id for result_quantity in result_quantities] + self.res1d.to_txt(file_path, queries, time_step_skipping_number) + + def _get_result_quantities( + self, quantities: str | list[str] | None = None + ) -> list[ResultQuantity]: + quantities = self._process_quantities_input(quantities) + + result_quantities_all = [] + for quantity in quantities: + result_quantities = self._creator.result_quantity_map.get(quantity, None) + if result_quantities is None: + warn(f"{quantity} quantity not found!") + continue + + for result_quantity in result_quantities: + result_quantities_all.append(result_quantity) + + return result_quantities_all + + def _process_quantities_input(self, quantities: str | list[str] | None = None) -> list[str]: + quantities = [quantities] if isinstance(quantities, str) else quantities + quantities = self.quantities if quantities is None else quantities + return quantities + class ResultLocationCreator(ABC): """A base helper class for creating ResultLocation.