Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/climate_data/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
MODEL_ROOT = Path("/mnt/share/erf/climate_downscale/")
# Aggregation working directory
AGGREGATE_ROOT = RRA_ROOT / "climate-aggregates"

# Floodinf Directory
FLOOD_ROOT = Path("/mnt/team/rapidresponse/pub/flooding")

######################
# Pipeline variables #
Expand Down
263 changes: 263 additions & 0 deletions src/climate_data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,269 @@ def load_results(
return pd.read_parquet(path)


class FloodingData:
"""Class for managing the flooding data used in the project."""

def __init__(
self,
root: str | Path = cdc.FLOOD_ROOT,
*,
read_only: bool = False,
) -> None:
self._root = Path(root)
self._credentials_root = self._root / "credentials"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This object doesn't need credentials. The credentials are for accessing external services (e.g. the ERA5 dbs) or storage we share with partners (e.g. Azure blobs for buildings/populations)

self._read_only = read_only
if not read_only:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't want this here. The flooding directory is always read-only in this repository. I have it for the ClimateData class because this repo has pipeline stages where climate data is an output directory and other stages that only consume from the climate data and write to the aggregates directory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines 997-1009

self._create_flooding_root()

def _create_flooding_root(self) -> None:
mkdir(self.root, exist_ok=True)
mkdir(self.credentials_root, exist_ok=True)

mkdir(self.results, exist_ok=True)
mkdir(self.results_metadata, exist_ok=True)
mkdir(self.daily_results, exist_ok=True)
mkdir(self.raw_daily_results, exist_ok=True)
mkdir(self.annual_results, exist_ok=True)
mkdir(self.raw_annual_results, exist_ok=True)

@property
def root(self) -> Path:
return self._root

@property
def credentials_root(self) -> Path:
return self._credentials_root

###########
# Results #
###########

@property
def results(self) -> Path:
return self.root / "results"

@property
def results_metadata(self) -> Path:
return self.results / "metadata"

def save_scenario_metadata(self, df: pd.DataFrame) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No save functions on this object.

if self._read_only:
msg = "Cannot save scenario metadata to read-only data"
raise ValueError(msg)
path = self.results_metadata / "scenario_metadata.parquet"
save_parquet(df, path)

def load_scenario_metadata(self) -> pd.DataFrame:
path = self.results_metadata / "scenario_metadata.parquet"
return pd.read_parquet(path)

def save_scenario_inclusion_metadata(self, df: pd.DataFrame) -> None:
if self._read_only:
msg = "Cannot save scenario inclusion metadata to read-only data"
raise ValueError(msg)
# Need to save to our scripts directory for doc building
scripts_root = Path(__file__).parent.parent.parent / "scripts"
for root_dir in [self.results_metadata, scripts_root]:
path = root_dir / "scenario_inclusion_metadata.parquet"
save_parquet(df, path)

def load_scenario_inclusion_metadata(self) -> pd.DataFrame:
path = self.results_metadata / "scenario_inclusion_metadata.parquet"
return pd.read_parquet(path)

@property
def daily_results(self) -> Path:
return self.results / "daily"

@property
def raw_daily_results(self) -> Path:
return self.daily_results / "raw"

def raw_daily_results_path(
self,
scenario: str,
variable: str,
year: int | str,
gcm_member: str,
) -> Path:
return self.raw_daily_results / scenario / variable / f"{year}_{gcm_member}.nc"

def save_raw_daily_results(
self,
results_ds: xr.Dataset,
scenario: str,
variable: str,
year: int | str,
gcm_member: str,
encoding_kwargs: dict[str, Any],
) -> None:
if self._read_only:
msg = "Cannot save raw daily results to read-only data"
raise ValueError(msg)
path = self.raw_daily_results_path(scenario, variable, year, gcm_member)
mkdir(path.parent, exist_ok=True, parents=True)
save_xarray(results_ds, path, encoding_kwargs)

def daily_results_path(
self,
scenario: str,
variable: str,
year: int | str,
) -> Path:
return self.daily_results / scenario / variable / f"{year}.nc"

def save_daily_results(
self,
results_ds: xr.Dataset,
scenario: str,
variable: str,
year: int | str,
encoding_kwargs: dict[str, Any],
) -> None:
if self._read_only:
msg = "Cannot save daily results to read-only data"
raise ValueError(msg)
path = self.daily_results_path(scenario, variable, year)
mkdir(path.parent, exist_ok=True, parents=True)
save_xarray(results_ds, path, encoding_kwargs)

def load_daily_results(
self,
scenario: str,
variable: str,
year: int | str,
) -> xr.Dataset:
results_path = self.daily_results_path(scenario, variable, year)
return xr.open_dataset(results_path)

@property
def annual_results(self) -> Path:
return self.results / "annual"

@property
def raw_annual_results(self) -> Path:
return self.annual_results / "raw"

def raw_annual_results_path(
self,
scenario: str,
variable: str,
year: int | str,
gcm_member: str,
) -> Path:
return self.raw_annual_results / scenario / variable / f"{year}_{gcm_member}.nc"

def save_raw_annual_results(
self,
results_ds: xr.Dataset,
scenario: str,
variable: str,
year: int | str,
gcm_member: str,
encoding_kwargs: dict[str, Any],
) -> None:
if self._read_only:
msg = "Cannot save raw annual results to read-only data"
raise ValueError(msg)
path = self.raw_annual_results_path(scenario, variable, year, gcm_member)
mkdir(path.parent, exist_ok=True, parents=True)
save_xarray(results_ds, path, encoding_kwargs)

@property
def compiled_annual_results(self) -> Path:
return self.raw_annual_results / "compiled"

def compiled_annual_results_path(
self,
scenario: str,
variable: str,
gcm_member: str,
) -> Path:
return self.compiled_annual_results / scenario / variable / f"{gcm_member}.nc"

def save_compiled_annual_results(
self,
results_ds: xr.Dataset,
scenario: str,
variable: str,
gcm_member: str,
) -> None:
if self._read_only:
msg = "Cannot save compiled annual results to read-only data"
raise ValueError(msg)
path = self.compiled_annual_results_path(scenario, variable, gcm_member)
mkdir(path.parent, exist_ok=True, parents=True)
touch(path, clobber=True)
results_ds.to_netcdf(path)

def annual_results_path(
self,
scenario: str,
variable: str,
draw: int | str,
) -> Path:
return self.annual_results / scenario / variable / f"{draw:0>3}.nc"

def link_annual_draw(
self,
draw: int | str,
scenario: str,
variable: str,
gcm_member: str,
) -> None:
if self._read_only:
msg = "Cannot link annual draw to read-only data"
raise ValueError(msg)
source_path = self.compiled_annual_results_path(scenario, variable, gcm_member)
dest_path = self.annual_results_path(scenario, variable, draw)
mkdir(dest_path.parent, exist_ok=True, parents=True)
if dest_path.exists():
dest_path.unlink()
dest_path.symlink_to(source_path)

def draw_results_path(self, scenario: str, measure: str, draw: str) -> Path:
"""Get the path to annual results for a specific scenario, measure, and draw.

Parameters
----------
scenario
The climate scenario (e.g. "ssp126")
measure
The climate measure (e.g. "mean_temperature")
draw
The draw of the climate data to load (e.g. "000")

Returns
-------
Path
The path to the results file
"""
return self.annual_results / scenario / measure / f"{draw}.nc"

def load_draw_results(self, scenario: str, measure: str, draw: str) -> xr.Dataset:
"""Load annual climate results for a specific scenario, measure, and draw.

Parameters
----------
scenario
The climate scenario (e.g. "ssp126")
measure
The climate measure (e.g. "mean_temperature")
draw
The draw of the climate data to load (e.g. "000")

Returns
-------
xr.Dataset
The climate data in xarray format
"""
path = self.annual_results_path(scenario, measure, draw)
ds = xr.open_dataset(path, decode_coords="all")
ds = ds.rio.write_crs("EPSG:4326")
return ds


def save_xarray(
ds: xr.Dataset,
output_path: str | Path,
Expand Down
Loading