|
| 1 | +# Copyright (c) 2024, Julien Seguinot (juseg.dev) |
| 2 | +# GNU General Public License v3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt) |
| 3 | + |
| 4 | +""" |
| 5 | +This module provide so-called aggregator classes for computing multiyear means |
| 6 | +and standard deviations needed by hyoga. Aggregator objects are callables that |
| 7 | +may trigger downloads, open multi-file datasets, aggregate statistics, store |
| 8 | +them in hyoga's cache directory, and return a path to that file. |
| 9 | +""" |
| 10 | + |
| 11 | +import os.path |
| 12 | +import xarray as xr |
| 13 | +import hyoga.open.downloader |
| 14 | + |
| 15 | + |
| 16 | +class Aggregator(): |
| 17 | + """A callable that aggregates input files and returns an output path. |
| 18 | +
|
| 19 | + This is a base class for callable aggregator. Customization can be done by |
| 20 | + subclassing and overwriting the following methods: |
| 21 | +
|
| 22 | + * :meth:`inputs`: return local paths of input files. |
| 23 | + * :meth:`output`: return local path of aggregated file. |
| 24 | + * :meth:`check`: check whether output file is present or valid. |
| 25 | + * :meth:`aggregate`: actually aggregate the data from input files. |
| 26 | +
|
| 27 | + Call parameters |
| 28 | + --------------- |
| 29 | + inputs : str |
| 30 | + A list of paths of files to aggregate. |
| 31 | + output : str |
| 32 | + The local path of the aggregated file. |
| 33 | +
|
| 34 | + Returns |
| 35 | + ------- |
| 36 | + output : str |
| 37 | + The local path of the aggregated file. |
| 38 | + """ |
| 39 | + |
| 40 | + def __call__(self, *args, **kwargs): |
| 41 | + """See class documentation for actual signature. |
| 42 | +
|
| 43 | + Parameters |
| 44 | + ---------- |
| 45 | + *args : |
| 46 | + Positional arguments passed to :meth:`inputs` and :meth:`output`. |
| 47 | + These two methods need to have compatible signatures. |
| 48 | + **kwargs : |
| 49 | + Keyword arguments are passed to :meth:`aggregate` to alter the |
| 50 | + aggregation recipe. This is used to provide a custom function. |
| 51 | + """ |
| 52 | + inputs = self.inputs(*args) |
| 53 | + output = self.output(*args) |
| 54 | + if not self.check(output): |
| 55 | + self.aggregate(inputs, output, **kwargs) |
| 56 | + return output |
| 57 | + |
| 58 | + def inputs(self, *args): |
| 59 | + """Return local paths of input files.""" |
| 60 | + return args[0] |
| 61 | + |
| 62 | + def output(self, *args): |
| 63 | + """Return local path of aggregated file.""" |
| 64 | + return args[1] |
| 65 | + |
| 66 | + def check(self, path): |
| 67 | + """Check whether output file is present.""" |
| 68 | + return os.path.isfile(path) |
| 69 | + |
| 70 | + def aggregate(self, inputs, output, recipe='avg'): |
| 71 | + """Aggregate `inputs` into `output` file.""" |
| 72 | + |
| 73 | + # create directory if missing |
| 74 | + os.makedirs(os.path.dirname(output), exist_ok=True) |
| 75 | + |
| 76 | + # open inputs as multi-file dataset |
| 77 | + with xr.open_mfdataset( |
| 78 | + inputs, chunks={'lat': 300, 'lon': 300}, |
| 79 | + # FIXME this is a mixed-precision workaround specific to CW5E5 |
| 80 | + preprocess=lambda ds: ds.assign( |
| 81 | + lat=ds.lat.astype('f4'), lon=ds.lon.astype('f4'))) as ds: |
| 82 | + ds = getattr( |
| 83 | + ds, recipe.replace('avg', 'mean'))('time', keep_attrs=True) |
| 84 | + |
| 85 | + # store output as netcdf and return path |
| 86 | + print(f"aggregating {output} ...") |
| 87 | + ds.to_netcdf(output) |
| 88 | + return output |
| 89 | + |
| 90 | + |
| 91 | +class CW5E5ClimateAggregator(Aggregator): |
| 92 | + """An aggregator to compute CHELSA-W5E5 climatologies from daily means. |
| 93 | +
|
| 94 | + Call parameters |
| 95 | + --------------- |
| 96 | + variable : 'tasmax', 'tas', 'tasmin', 'rsds', 'pr' |
| 97 | + The short name for the CHELSA-W5E5 variable aggregated among: |
| 98 | + - daily mean precipitation ('pr', kg m-2 s-1), |
| 99 | + - daily mean surface downwelling shortwave dadiation ('rsds', W m-2), |
| 100 | + - daily mean near-surface air temperature ('tas', K), |
| 101 | + - daily maximum near surface air temperature ('tasmax', K), |
| 102 | + - daily minimum near surface air temperature ('tasmin', K). |
| 103 | + start : int |
| 104 | + The aggregation start year between 1979 and 2016. |
| 105 | + end : int |
| 106 | + The aggregation end year between 1979 and 2016. |
| 107 | + month : int |
| 108 | + The month for which data is downloaded data between 1 and 12. |
| 109 | + """ |
| 110 | + |
| 111 | + def inputs(self, *args): |
| 112 | + """Return paths of input files, downloading as necessary.""" |
| 113 | + variable, start, end, month = args |
| 114 | + downloader = hyoga.open.downloader.CW5E5DailyDownloader() |
| 115 | + years = range(start, end+1) |
| 116 | + paths = (downloader(variable, year, month) for year in years) |
| 117 | + return paths |
| 118 | + |
| 119 | + def output(self, *args): |
| 120 | + """Return path of downloaded file.""" |
| 121 | + variable, start, end, month = args |
| 122 | + xdg_cache = os.environ.get("XDG_CACHE_HOME", os.path.join( |
| 123 | + os.path.expanduser('~'), '.cache')) |
| 124 | + return os.path.join( |
| 125 | + xdg_cache, 'hyoga', 'cw5e5', 'clim', f'cw5e5.{variable}.mon.' |
| 126 | + f'{start % 100:02d}{end % 100:02d}.avg.{month:02d}.nc') |
0 commit comments