diff --git a/tests/test_util.py b/tests/test_util.py index bcaecd4f..607a8781 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -4,6 +4,10 @@ """Tests for `xradar` util package.""" +import datetime +import fnmatch +import random + import numpy as np import pytest import xarray as xr @@ -536,3 +540,45 @@ def test_select_sweep_dataset_vars(): metadata = required_metadata + ["polarization_mode", "nyquist_velocity"] assert sorted(sweep2.data_vars) == sorted(select + ["quality1"] + metadata) + + +def test_create_volume(): + pattern = "T_PAZ?63_C_LFPW_20230420*" + filenames = list(DATASETS.registry.keys()) + files = fnmatch.filter(filenames, pattern) + random.shuffle(files) + sweeps = [io.open_odim_datatree(DATASETS.fetch(fn)) for fn in files] + + time_coverage_start = "2023-04-20T06:50:00Z" + time_coverage_start = datetime.datetime.fromisoformat(time_coverage_start) + time_coverage_end = "2023-04-20T06:55:00Z" + time_coverage_end = datetime.datetime.fromisoformat(time_coverage_end) + volume = util.create_volume( + sweeps=sweeps, + time_coverage_start=time_coverage_start, + time_coverage_end=time_coverage_end, + min_angle=0.5, + max_angle=5.0, + ) + assert volume.ds.time_coverage_start == "2023-04-20T06:50:00Z" + assert volume.ds.time_coverage_end == "2023-04-20T06:55:00Z" + sweep_group_name = ["sweep_0", "sweep_1", "sweep_2"] + np.testing.assert_array_equal(volume.ds.sweep_group_name, sweep_group_name) + angles = [3.6, 1.6, 1.0] + np.testing.assert_array_equal(volume.ds.sweep_fixed_angle, angles) + sweep_fixed_angle = [ + volume[sweep].ds.sweep_fixed_angle.values.item() + for sweep in util.get_sweep_keys(volume) + ] + np.testing.assert_array_equal(sweep_fixed_angle, angles) + + time_coverage_start = "2023-04-19T06:50:00Z" + time_coverage_start = datetime.datetime.fromisoformat(time_coverage_start) + time_coverage_end = "2023-04-19T06:55:00Z" + time_coverage_end = datetime.datetime.fromisoformat(time_coverage_end) + with pytest.raises(ValueError, match="No sweeps remain after filtering."): + volume = util.create_volume( + sweeps=sweeps, + time_coverage_start=time_coverage_start, + time_coverage_end=time_coverage_end, + ) diff --git a/xradar/util.py b/xradar/util.py index a4fab248..e137b2af 100644 --- a/xradar/util.py +++ b/xradar/util.py @@ -29,6 +29,7 @@ "get_sweep_dataset_vars", "get_sweep_metadata_vars", "select_sweep_dataset_vars", + "create_volume", ] __doc__ = __doc__.format("\n ".join(__all__)) @@ -513,6 +514,7 @@ def get_sweep_keys(dtree): keys : list List of associated keys with sweep_n """ + sweep_group_keys = [] for key in list(dtree.children): parts = key.split("_") @@ -740,3 +742,90 @@ def select_sweep_dataset_vars(sweep, select, ancillary=False, optional_metadata= sweep_out = sweep[select] return sweep_out + + +def format_zulu(t: np.datetime64) -> str: + return str(t).split(".")[0] + "Z" + + +def create_volume( + sweeps: list[xr.DataTree], + time_coverage_start: np.datetime64 = None, + time_coverage_end: np.datetime64 = None, + min_angle: float = None, + max_angle: float = None, + volume_number: int = 0, +) -> xr.DataTree: + """ + Combine sweeps into a single stacked radar volume with optional time and angle filtering. + All timestamps are handled as np.datetime64[ns] internally. + """ + + # Normalize time bounds + if time_coverage_start is not None: + time_coverage_start = np.datetime64(time_coverage_start, "ns") + if time_coverage_end is not None: + time_coverage_end = np.datetime64(time_coverage_end, "ns") + + # Step 1: Extract (ds, time, angle) tuples + sweep_entries = [] + for dt in sweeps: + for key in get_sweep_keys(dt): + ds = xr.decode_cf(dt[key].ds) + time = ds["time"].values.astype("datetime64[ns]") + mask = time != np.datetime64("NaT", "ns") + time = time[mask] + t0 = time.min() + angle = float(ds["sweep_fixed_angle"].item()) + sweep_entries.append((ds, t0, angle)) + + # Step 2: Filter by time + if time_coverage_start is not None: + sweep_entries = [ + entry for entry in sweep_entries if entry[1] >= time_coverage_start + ] + if time_coverage_end is not None: + sweep_entries = [ + entry for entry in sweep_entries if entry[1] <= time_coverage_end + ] + + # Step 3: Filter by angle + if min_angle is not None: + sweep_entries = [entry for entry in sweep_entries if entry[2] >= min_angle] + if max_angle is not None: + sweep_entries = [entry for entry in sweep_entries if entry[2] <= max_angle] + + if not sweep_entries: + raise ValueError("No sweeps remain after filtering.") + + # Step 4: Sort by time + sweep_entries.sort(key=lambda x: x[1]) + + # Step 5: Prepare root metadata + root_ds = sweeps[0].ds.copy() + root_ds = root_ds.drop_vars( + ["sweep_group_name", "sweep_fixed_angle"], errors="ignore" + ) + + root_ds["sweep_group_name"] = xr.DataArray( + np.array([f"sweep_{i}" for i in range(len(sweep_entries))]), + dims="sweep", + ) + root_ds["sweep_fixed_angle"] = xr.DataArray( + [angle for _, _, angle in sweep_entries], dims="sweep" + ) + + # Step 6: Assign time coverage + time_coverage_start = time_coverage_start or sweep_entries[0][1] + time_coverage_end = time_coverage_end or sweep_entries[-1][1] + + root_ds["time_coverage_start"] = format_zulu(time_coverage_start) + root_ds["time_coverage_end"] = format_zulu(time_coverage_end) + root_ds.attrs["volume_number"] = volume_number + + # Step 7: Assemble final tree + volume = xr.DataTree(root_ds, name="root") + for i, (ds, _, _) in enumerate(sweep_entries): + volume[f"sweep_{i}"] = xr.DataTree(ds.copy()) + + return volume