Description
What is your issue?
Hi,
I am using interp
with linear
method along a sorted chunked dimension.
From the Dask dashboard I see workers failing and restarting, till it fails, with messages along the lines of:
distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:45829 (pid=65947) exceeded 95% memory budget. Restarting...
....
distributed.scheduler - ERROR - Task ('chunked_aware_interpnd-transpose-transpose-store-map-f31ac35773dcc2e7a00aa402e89cfe6b', 0, 4, 1) marked as failed because 4 workers died while trying to run it
I allocated 8GB per worker with the distributed scheduler, but you can reproduce the issue even with the default one.
Up to xarray<=2024.11.0
I was able to make it work using a trick (below).
I can successfully run with 2024.11.0 and dask[distributed]<=2025.2.0
, so it is "independent" from Dask versions.
I have seen a few discussions in PRs and Issues, but failed to get closure on what would be the best approach in this case. Since my time axis is sorted, interpolation is linear, it should be possible to not have to load in memory all chunks along that axis. Any suggestions? Common ways of dealing with this?
1. Generate data
import pandas as pd
import xarray as xr
import dask.array as da
n_a = 5
n_b = 400
n_avg = 360
n_raw = 1_000_000
time1 = pd.date_range(start="2024-01-01", periods=n_avg, freq="8h")
time2 = pd.date_range(start="2023-12-28", periods=n_raw, freq="min")
data_avg = da.random.normal(size=(n_avg, n_a, n_b), chunks=(360, 1, 200))
data_raw = da.random.normal(size=(n_raw, n_a, n_b), chunks=(8_000, 1, 200))
da_avg = xr.DataArray(data_avg, dims=["time", "a", "b"], coords={"time": time1})
da_avg.to_zarr("da_avg.zarr", mode="w")
da_raw = xr.DataArray(data_raw, dims=["time", "a", "b"], coords={"time": time2})
da_raw.to_zarr("da_raw.zarr", mode="w")
da_raw = xr.open_dataarray(
"da_raw.zarr",
engine="zarr",
chunks={"time": 8_000, "a": 1, "b": 200},
)
da_avg = xr.open_dataarray(
"da_avg.zarr",
engine="zarr",
chunks={"time": 360, "a": 1, "b": 200},
)
# evaluate on this array, basically da_raw["time"] but chunked
time_eval = xr.DataArray(
da_raw["time"],
dims=["time"],
coords={"time": da_raw["time"]},
name="time_eval",
).chunk({"time": 8_000})
Interp
Now the following does NOT work (regardless of the xarray version), memory issues.
interp_da = da_avg.interp(
time=time_eval,
assume_sorted=True,
method="linear",
kwargs={"fill_value": "extrapolate", "bounds_error": False},
)
interp_da.to_zarr("interp_da.zarr", mode="w")
Interp with trick to maintain dask indexer
The following DOES work in xarray 2024.11.0, but not with 2025.1.2, I suspect because of the apply_ufunc
change to interp
.
interp_da = da_avg.interp(
time=time_eval.expand_dims({"temp": 1}),
assume_sorted=True,
method="linear",
kwargs={"fill_value": "extrapolate", "bounds_error": False},
)
interp_da.to_zarr("interp_da.zarr", mode="w")
Sanity check vs scipy
Sanity check the results I got with 2024.11.0 were indeed the expected ones:
from scipy.interpolate import interp1d
y = da_avg.isel(a=0, b=0).to_numpy()
x = da_avg["time"].to_numpy().astype(int)
f = interp1d(x, y, kind="linear", fill_value="extrapolate", bounds_error=False)
y_eval = f(time_eval.to_numpy().astype(int))
y_xr = interp_da.isel(temp=0, a=0, b=0).to_numpy()
assert np.allclose(y_eval, y_xr)