Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions src/pypromice/pipeline/get_l2.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ def parse_arguments_l2():

parser.add_argument('-c', '--config_file', type=str, required=True,
help='Path to config (TOML) file')
parser.add_argument('-i', '--inpath', type=str, required=True,
parser.add_argument('-i', '--inpath', type=str, required=True,
help='Path to input data')
parser.add_argument('-o', '--outpath', default=None, type=str, required=False,
parser.add_argument('-o', '--outpath', default=None, type=str, required=False,
help='Path where to write output')
parser.add_argument('-v', '--variables', default=None, type=str,
parser.add_argument('-v', '--variables', default=None, type=str,
required=False, help='File path to variables look-up table')
parser.add_argument('-m', '--metadata', default=None, type=str,
parser.add_argument('-m', '--metadata', default=None, type=str,
required=False, help='File path to metadata')
parser.add_argument('--data_issues_path', '--issues', default=None, help="Path to data issues repository")
args = parser.parse_args()
Expand All @@ -29,9 +29,9 @@ def parse_arguments_l2():

def get_l2(config_file, inpath, outpath, variables, metadata, data_issues_path: Path) -> AWS:
# Define input path
station_name = config_file.split('/')[-1].split('.')[0]
station_name = config_file.split('/')[-1].split('.')[0]
station_path = os.path.join(inpath, station_name)

# checking that data_issues_path is valid
if data_issues_path is None:
data_issues_path = Path("../PROMICE-AWS-data-issues")
Expand All @@ -41,16 +41,16 @@ def get_l2(config_file, inpath, outpath, variables, metadata, data_issues_path:
raise ValueError("data_issues_path is missing. Please provide a valid path to the data issues repository")

if os.path.exists(station_path):
aws = AWS(config_file,
aws = AWS(config_file,
station_path,
data_issues_repository=data_issues_path,
var_file=variables,
data_issues_repository=data_issues_path,
var_file=variables,
meta_file=metadata)
else:
aws = AWS(config_file,
inpath,
data_issues_repository=data_issues_path,
var_file=variables,
aws = AWS(config_file,
inpath,
data_issues_repository=data_issues_path,
var_file=variables,
meta_file=metadata)

# Perform level 1 and 2 processing
Expand All @@ -61,7 +61,7 @@ def get_l2(config_file, inpath, outpath, variables, metadata, data_issues_path:
if not os.path.isdir(outpath):
os.mkdir(outpath)
if aws.L2.attrs['format'] == 'raw':
prepare_and_write(aws.L2, outpath, aws.vars, aws.meta, '10min')
prepare_and_write(aws.L2, outpath, aws.vars, aws.meta, '10min', resample=False)
prepare_and_write(aws.L2, outpath, aws.vars, aws.meta, '60min')
return aws

Expand All @@ -85,6 +85,5 @@ def main():
)


if __name__ == "__main__":
if __name__ == "__main__":
main()

6 changes: 4 additions & 2 deletions src/pypromice/pipeline/join_l2.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ def join_l2(file1,file2,outpath,variables,metadata) -> xr.Dataset:

all_ds.attrs['format'] = 'merged RAW and TX'

# Resample to hourly, daily and monthly datasets and write to file
prepare_and_write(all_ds, outpath, variables, metadata, resample = False)
# Writing the 10min file which has mixed temporal resolution
prepare_and_write(all_ds, outpath, variables, metadata, '10min', resample = False)
# Writing the resampled hourly file for consistency with previous version (could be removed if not used)
prepare_and_write(all_ds, outpath, variables, metadata, '60min', resample = True)

logger.info(f'Files saved to {os.path.join(outpath, name)}...')
return all_ds
Expand Down
154 changes: 148 additions & 6 deletions src/pypromice/pipeline/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
"""
import logging
import numpy as np
import pandas as pd
import xarray as xr
from pypromice.core.variables.wind import calculate_directional_wind_speed
logger = logging.getLogger(__name__)

def resample_dataset(ds_h, t):
def resample_dataset(ds_h, t, completeness_threshold=0.8):
'''Resample L2 AWS data, e.g. hourly to daily average. This uses pandas
DataFrame resampling at the moment as a work-around to the xarray Dataset
resampling. As stated, xarray resampling is a lengthy process that takes
Expand All @@ -24,8 +25,12 @@ def resample_dataset(ds_h, t):
ds_h : xarray.Dataset
L3 AWS dataset either at 10 min (for raw data) or hourly (for tx data)
t : str
Resample factor, same variable definition as in
Resample factor( "60min", "1D" or "MS"), same variable definition as in
pandas.DataFrame.resample()
completeness_threshold : float
Lower limit of completness of an hourly/daily/monthly aggregate (nr of
samples in aggregate / expected nr of samples). Aggregates below that
limit are replaced by NaNs.

Returns
-------
Expand All @@ -48,9 +53,6 @@ def resample_dataset(ds_h, t):
# Resample the DataFrame
df_resampled = df_h.resample(t).mean()

# calculating the completeness of resampled time intervals
df_resampled_count = df_h.resample(t).count()

# exception for precip_u and precip_l which are accumulated with some reset
# we therefore take the positive increments in the higher resolution data (like 10 min)
# and sum them into the aggregated data (hourly/daily/monthly).
Expand All @@ -59,7 +61,13 @@ def resample_dataset(ds_h, t):
if var in df_h.columns:
df_resampled[var] = df_h[var].diff().clip(lower=0).resample(t).sum()

# There is currently not completeness threshold!
# First attempt of completness filter
df_resampled = apply_completeness_filters(df_resampled,
df_h,
t,
time_thresh=completeness_threshold,
value_thresh=completeness_threshold,
)

# taking the 10 min data and using it as instantaneous values:
is_10_minutes_timestamp = (ds_h.time.diff(dim='time') / np.timedelta64(1, 's') == 600)
Expand Down Expand Up @@ -129,6 +137,140 @@ def resample_dataset(ds_h, t):
return ds_resampled


def compute_dt(index: pd.DatetimeIndex) -> pd.Series:
"""
Compute timestep durations (dt) in seconds between consecutive samples.
Only {600, 3600, 86400} are allowed; others set to NaN and backfilled.

Parameters
----------
index : pd.DatetimeIndex

Returns
-------
pd.Series
dt in seconds aligned to `index` (first value backfilled).
"""
dt = index.to_series().diff().dt.total_seconds().round()
allowed = {600.0, 3600.0, 86400.0}
dt = dt.where(dt.isin(allowed), np.nan)
return dt.bfill()


def compute_time_weights(dt: pd.Series, t: str) -> pd.Series:
"""
Compute per-sample time weights (contribution to one target bin) from dt.

Parameters
----------
dt : pd.DatetimeIndex
Timestep durations in seconds
t : str
Target resampling frequency ("60min", "1D", "MS").

Returns
-------
pd.Series
Weights in [0, 1] aligned to `index`.
"""
w = pd.Series(0.0, index=dt.index)
if t == "60min":
w[dt == 600] = 1/6
w[dt == 3600] = 1
w[dt == 86400] = 0
elif t == "1D":
w[dt == 600] = 1/(6*24)
w[dt == 3600] = 1/24
w[dt == 86400] = 1
elif t == "MS":
w[dt == 600] = 1/(6*24*30)
w[dt == 3600] = 1/(24*30)
w[dt == 86400] = 1/30
else:
w[:] = 1.0
return w


def compute_time_completeness(w: pd.Series, t: str) -> pd.Series:
"""
Compute time-based completeness per resampled bin from per-sample weights.

Parameters
----------
w : pd.DatetimeIndex
Weights in [0, 1] aligned to `index`.
t : str
Target resampling frequency.

Returns
-------
pd.Series
Completeness (0..1) per bin indexed by the resampled DateTimeIndex.
"""
return w.resample(t).sum().clip(upper=1.0).fillna(0.0)


def compute_value_completeness(df_h: pd.DataFrame, w: pd.Series, t: str) -> pd.DataFrame:
"""
Compute per-variable value completeness as weighted non-NaN fraction per bin.

Parameters
----------
df_h : pd.DataFrame
High-resolution data with DateTimeIndex.
w : pd.DatetimeIndex
Weights in [0, 1] aligned to `index`.
t : str
Target resampling frequency.

Returns
-------
pd.DataFrame
Fraction (0..1) of expected weighted samples observed per variable and bin.
"""
expected = w.resample(t).sum().clip(upper=1.0).where(lambda s: s > 0, np.nan)
observed_equiv = (~df_h.isna()).astype(float).mul(w, axis=0).resample(t).sum()
return observed_equiv.div(expected, axis=0).fillna(0.0).clip(0.0, 1.0)


def apply_completeness_filters(
df_resampled: pd.DataFrame,
df_h: pd.DataFrame,
t: str,
time_thresh: float = 0.8,
value_thresh: float = 0.8,
) -> tuple[pd.DataFrame, pd.Series, pd.DataFrame]:
"""
Apply time- and value-based completeness filters to resampled data.

Parameters
----------
df_resampled : pd.DataFrame
Resampled aggregates to be filtered.
df_h : pd.DataFrame
High-resolution source data (for completeness computation).
t : str
Target resampling frequency.
time_thresh : float, optional
Minimum time completeness threshold, by default 0.8.
value_thresh : float, optional
Minimum value completeness threshold, by default 0.8.

Returns
-------
tuple[pd.DataFrame, pd.Series, pd.DataFrame]
(filtered_df, time_completeness, value_completeness).
"""
dt = compute_dt(df_h.index)
w = compute_time_weights(dt, t)
tc = compute_time_completeness(w, t)
vc = compute_value_completeness(df_h, w, t)
out = df_resampled.copy()
out.loc[tc.reindex(out.index, fill_value=0) < time_thresh] = np.nan
out[vc.reindex(index=out.index, columns=out.columns, fill_value=0) < value_thresh] = np.nan
return out


def calculateSaturationVaporPressure(t, T_0=273.15, T_100=373.15, es_0=6.1071,
es_100=1013.246, eps=0.622):
'''Calculate specific humidity
Expand Down
98 changes: 98 additions & 0 deletions tests/e2e/test_resample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import numpy as np
import pandas as pd
import unittest

from pypromice.pipeline.resample import apply_completeness_filters


def _resample_and_filter(df_h, t, time_thresh=0.8, value_thresh=0.8):
df_resampled = df_h.resample(t).mean()
filtered = apply_completeness_filters(
df_resampled,
df_h,
t,
time_thresh=time_thresh,
value_thresh=value_thresh,
)
return df_resampled, filtered


class TestCompletenessFilters(unittest.TestCase):

def test_10min_to_hourly_with_nans(self):
# 2 hours @10min; first hour complete, second hour missing 2 samples
idx = pd.date_range("2025-01-01 00:00", periods=24, freq="10min")
x = np.arange(24).astype(float)
x[8:10] = np.nan # two NaNs in second hour (4/6 present -> 0.666 < 0.8)
df_h = pd.DataFrame({"x": x}, index=idx)

df_res, filtered = _resample_and_filter(df_h, t="60min")

self.assertIn(pd.infer_freq(df_res.index), {"h", "H"}) # resample result index
# hour bins: 00:00 and 01:00
self.assertFalse(pd.isna(filtered.loc["2025-01-01 00:00", "x"])) # complete -> kept
self.assertTrue(pd.isna(filtered.loc["2025-01-01 01:00", "x"])) # incomplete -> masked

def test_hourly_to_daily_with_nans(self):
# 2 days @hourly; day1 has 20/24 good (pass), day2 has 15/24 good (fail)
idx = pd.date_range("2025-02-01", periods=96, freq="60min")
x = np.ones(96)
day1_bad = [1, 5, 9, 13] # 4 NaNs -> 20/24 present
day2_bad = [24 + i for i in range(9)] # 9 NaNs -> 15/24 present
x[day1_bad + day2_bad] = np.nan
df_h = pd.DataFrame({"x": x}, index=idx)

df_res, filtered = _resample_and_filter(df_h, t="1D")

self.assertFalse(pd.isna(filtered.loc["2025-02-01", "x"])) # pass
self.assertTrue(pd.isna(filtered.loc["2025-02-02", "x"])) # fail

def test_daily_to_monthly_ms_with_nans(self):
# Two months daily; Jun has 28/30 present (pass), Jul has 20/31 present (fail)
idx = pd.date_range("2025-06-01", periods=120, freq="1D") # Jun(30) + Jul(31)
x = np.arange(120, dtype=float)
# Make 2 NaNs in June (28/30)
x[[5, 17]] = np.nan
# Make 11 NaNs in July (20/31 present -> below 0.8)
july_start = 30
x[[july_start + i for i in [0, 1, 2, 3, 5, 7, 9, 11, 13, 15, 17]]] = np.nan
df_h = pd.DataFrame({"x": x}, index=idx)

df_res, filtered = _resample_and_filter(df_h, t="MS")

# MS bins at month starts
self.assertFalse(pd.isna(filtered.loc["2025-06-01", "x"])) # June kept
self.assertTrue(pd.isna(filtered.loc["2025-07-01", "x"])) # July masked

def test_mixed_10min_then_hourly_to_hourly(self):
# First hour: 5 samples @10min (5/6=0.833 pass), second hour: 1 hourly sample (pass)
idx_10 = pd.date_range("2025-03-01 00:00", periods=5, freq="10min")
idx_60 = pd.date_range("2025-03-01 02:00", periods=3, freq="60min")
idx = idx_10.union(idx_60)
x = np.arange(len(idx), dtype=float)
df_h = pd.DataFrame({"x": x}, index=idx)

df_res, filtered = _resample_and_filter(df_h, t="60min")

self.assertFalse(pd.isna(filtered.loc["2025-03-01 00:00", "x"])) # 5/6 -> pass
self.assertTrue(pd.isna(filtered.loc["2025-03-01 01:00", "x"])) # 0 hourly -> failed
self.assertFalse(pd.isna(filtered.loc["2025-03-01 02:00", "x"])) # 1 hourly -> pass

def test_mixed_hourly_then_daily_to_daily(self):
# Day1: 20 hourly samples (20/24=0.833 pass)
# Day2: a single daily sample (pass, counts as 1)
idx_hourly = pd.date_range("2025-04-01 00:00", periods=30, freq="60min")
idx_daily = pd.date_range("2025-04-03 00:00", periods=3, freq="D")
idx = idx_hourly.union(idx_daily)
x = np.arange(len(idx), dtype=float)
df_h = pd.DataFrame({"x": x}, index=idx)

df_res, filtered = _resample_and_filter(df_h, t="1D")

self.assertFalse(pd.isna(filtered.loc["2025-04-01", "x"])) # 20/24 -> pass
self.assertTrue(pd.isna(filtered.loc["2025-04-02", "x"])) # 6/24 -> failed
self.assertFalse(pd.isna(filtered.loc["2025-04-03", "x"])) # daily sample -> pass


if __name__ == "__main__":
unittest.main()