Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Real_world_examples/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ More complex case study workflows demonstrating how DEA can be used to address r
Seasonal_water_extents.ipynb
Shipping_lane_identification.ipynb
Surface_area_duration.ipynb
Tracking_sediment_flow.ipynb
Turbidity_animated_timeseries.ipynb
Urban_change_detection.ipynb
Vegetation_phenology.ipynb
Expand Down
1,438 changes: 1,438 additions & 0 deletions Real_world_examples/Tracking_sediment_flow.ipynb

Large diffs are not rendered by default.

44 changes: 43 additions & 1 deletion Tests/dea_tools/test_temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import xarray as xr
import datacube
from datacube.utils.masking import mask_invalid_data
from dea_tools.temporal import xr_regression
from dea_tools.temporal import xr_regression, xr_optical_flow

@pytest.fixture()
def satellite_ds():
Expand All @@ -30,6 +30,21 @@ def satellite_ds():
return ds


@pytest.fixture()
def intertidal_da():
# Connect to datacube
dc = datacube.Datacube()

# Load available data
return dc.load(
product="ga_s2ls_intertidal_cyear_3",
measurements=["elevation"],
y=(-34.75, -34.76),
x=(138.48, 138.51),
time=("2020", "2023"),
).elevation


# Run test on different pixels and alternative hypotheses
@pytest.mark.parametrize(
"x, y, alternative",
Expand Down Expand Up @@ -182,8 +197,35 @@ def test_nan_mask_preserved(sample_da):
assert np.isnan(ds[var][1, 2]), f"{var} did not preserve NaN mask"


@pytest.mark.parametrize("method,parallel,radius", [
("ilk", False, 20),
("ilk", True, 20),
("ilk", True, 10),
("tvl1", False, 20),
("tvl1", True, 20),
])
def test_xr_optical_flow_basic(intertidal_da, method, parallel, radius):

# Run the optical flow function
ds_flow = xr_optical_flow(intertidal_da, method=method, parallel=parallel, radius=radius)

# Check output type
assert isinstance(ds_flow, xr.Dataset)

# Check keys
assert "v" in ds_flow and "u" in ds_flow

# Check shapes
nt = len(intertidal_da.time)
ny, nx = intertidal_da.shape[1:]
assert ds_flow.v.shape == (nt - 1, ny, nx)
assert ds_flow.u.shape == (nt - 1, ny, nx)

# Check coordinates
np.testing.assert_array_equal(ds_flow.time.values, intertidal_da.time[1:].values)
np.testing.assert_array_equal(ds_flow.y.values, intertidal_da.y.values)
np.testing.assert_array_equal(ds_flow.x.values, intertidal_da.x.values)

# Check geobox
assert ds_flow.odc.geobox == intertidal_da.odc.geobox

171 changes: 112 additions & 59 deletions Tools/dea_tools/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
If you would like to report an issue with this script, file one on
GitHub: https://github.com/GeoscienceAustralia/dea-notebooks/issues/new

Last modified: May 2024
Last modified: October 2025
"""

import warnings
Expand All @@ -30,6 +30,9 @@
from odc.geo.xr import assign_crs
from packaging import version
from scipy.stats import t
import concurrent.futures
from tqdm import tqdm
from skimage.registration import optical_flow_ilk, optical_flow_tvl1


def allNaN_arg(da, dim, stat):
Expand Down Expand Up @@ -298,13 +301,9 @@ def xr_phenology(
"ROS": np.float32,
}
da_template = da.isel(time=0).drop("time")
template = xr.Dataset(
{
var_name: da_template.astype(var_dtype)
for var_name, var_dtype in stats_dtype.items()
if var_name in stats
}
)
template = xr.Dataset({
var_name: da_template.astype(var_dtype) for var_name, var_dtype in stats_dtype.items() if var_name in stats
})
da_all_time = da.chunk({"time": -1})

lazy_phenology = da_all_time.map_blocks(
Expand Down Expand Up @@ -431,9 +430,7 @@ def fourier_mean(x, n=3, step=5):
for j in range(x.shape[1]):
y = np.fft.fft(x[i, j, :])
for k in range(n):
result[i, j, k] = np.mean(
np.abs(y[1 + k * step : ((k + 1) * step + 1) or None])
)
result[i, j, k] = np.mean(np.abs(y[1 + k * step : ((k + 1) * step + 1) or None]))

return result

Expand All @@ -448,9 +445,7 @@ def fourier_std(x, n=3, step=5):
for j in range(x.shape[1]):
y = np.fft.fft(x[i, j, :])
for k in range(n):
result[i, j, k] = np.std(
np.abs(y[1 + k * step : ((k + 1) * step + 1) or None])
)
result[i, j, k] = np.std(np.abs(y[1 + k * step : ((k + 1) * step + 1) or None]))

return result

Expand All @@ -465,9 +460,7 @@ def fourier_median(x, n=3, step=5):
for j in range(x.shape[1]):
y = np.fft.fft(x[i, j, :])
for k in range(n):
result[i, j, k] = np.median(
np.abs(y[1 + k * step : ((k + 1) * step + 1) or None])
)
result[i, j, k] = np.median(np.abs(y[1 + k * step : ((k + 1) * step + 1) or None]))

return result

Expand Down Expand Up @@ -602,9 +595,7 @@ def temporal_statistics(da, stats):
da_all_time = da.chunk({"time": -1})

# apply function across chunks
lazy_ds = da_all_time.map_blocks(
temporal_statistics, kwargs={"stats": stats}, template=template
)
lazy_ds = da_all_time.map_blocks(temporal_statistics, kwargs={"stats": stats}, template=template)

try:
crs = da.odc.geobox.crs
Expand Down Expand Up @@ -650,28 +641,23 @@ def temporal_statistics(da, stats):
n3 = zz[:, :, 2]

# intialise dataset with first statistic
ds = xr.DataArray(
n1, attrs=attrs, coords={x_dim: x, y_dim: y}, dims=[y_dim, x_dim]
).to_dataset(name=stats[0] + "_n1")
ds = xr.DataArray(n1, attrs=attrs, coords={x_dim: x, y_dim: y}, dims=[y_dim, x_dim]).to_dataset(
name=stats[0] + "_n1"
)

# add other datasets
for i, j in zip([n2, n3], ["n2", "n3"]):
ds[stats[0] + "_" + j] = xr.DataArray(
i, attrs=attrs, coords={"x": x, "y": y}, dims=["y", "x"]
)
ds[stats[0] + "_" + j] = xr.DataArray(i, attrs=attrs, coords={"x": x, "y": y}, dims=["y", "x"])
else:
# simpler if first function isn't fourier transform
first_func = stats_dict.get(str(stats[0]))
ds = first_func(da)

# convert back to xarray dataset
ds = xr.DataArray(
ds, attrs=attrs, coords={x_dim: x, y_dim: y}, dims=[y_dim, x_dim]
).to_dataset(name=stats[0])
ds = xr.DataArray(ds, attrs=attrs, coords={x_dim: x, y_dim: y}, dims=[y_dim, x_dim]).to_dataset(name=stats[0])

# loop through the other functions
for stat in stats[1:]:

# handle the fourier transform examples
if stat in ("f_std", "f_median", "f_mean"):
stat_func = stats_dict.get(str(stat))
Expand All @@ -681,9 +667,7 @@ def temporal_statistics(da, stats):
n3 = zz[:, :, 2]

for i, j in zip([n1, n2, n3], ["n1", "n2", "n3"]):
ds[stat + "_" + j] = xr.DataArray(
i, attrs=attrs, coords={x_dim: x, y_dim: y}, dims=[y_dim, x_dim]
)
ds[stat + "_" + j] = xr.DataArray(i, attrs=attrs, coords={x_dim: x, y_dim: y}, dims=[y_dim, x_dim])

else:
# Select a stats function from the dictionary
Expand Down Expand Up @@ -731,12 +715,8 @@ def time_buffer(input_date, buffer="30 days", output_format="%Y-%m-%d"):
`input_date='2018-01-01'` and `buffer='30 days'`
"""
# Use assertions to check we have the correct function input
assert isinstance(
input_date, str
), "Input date must be a string in quotes in 'yyyy-mm-dd' format"
assert isinstance(
buffer, str
), "Buffer must be a string supported by `pandas.Timedelta`, e.g. '5 days'"
assert isinstance(input_date, str), "Input date must be a string in quotes in 'yyyy-mm-dd' format"
assert isinstance(buffer, str), "Buffer must be a string supported by `pandas.Timedelta`, e.g. '5 days'"

# Convert inputs to pandas format
buffer = pd.Timedelta(buffer)
Expand Down Expand Up @@ -828,11 +808,7 @@ def __init__(self, cov, cor, slope, intercept, pval, stderr):

def __repr__(self):
return "LinregressResult({})".format(
", ".join(
"{}={}".format(k, getattr(self, k))
for k in dir(self)
if not k.startswith("_")
)
", ".join("{}={}".format(k, getattr(self, k)) for k in dir(self) if not k.startswith("_"))
)


Expand Down Expand Up @@ -1027,9 +1003,7 @@ def _pvalue(tstats, n, alternative):
assert dim in x.dims, f"Array `x` does not contain dimension '{dim}'."

# Assert that both arrays have the same length along "dim"
assert len(x[dim]) == len(
y[dim]
), f"Arrays `x` and `y` have different lengths along dimension '{dim}'."
assert len(x[dim]) == len(y[dim]), f"Arrays `x` and `y` have different lengths along dimension '{dim}'."

# Apply optional outlier masking to x and y variable
if outliers_y is not None:
Expand Down Expand Up @@ -1086,18 +1060,16 @@ def _pvalue(tstats, n, alternative):
)

# Combine into single dataset
regression_ds = xr.merge(
[
cov.rename("cov").astype(np.float32),
cor.rename("cor").astype(np.float32),
r2.rename("r2").astype(np.float32),
slope.rename("slope").astype(np.float32),
intercept.rename("intercept").astype(np.float32),
pval.rename("pvalue").astype(np.float32),
stderr.rename("stderr").astype(np.float32),
n.rename("n").astype(np.int16),
]
)
regression_ds = xr.merge([
cov.rename("cov").astype(np.float32),
cor.rename("cor").astype(np.float32),
r2.rename("r2").astype(np.float32),
slope.rename("slope").astype(np.float32),
intercept.rename("intercept").astype(np.float32),
pval.rename("pvalue").astype(np.float32),
stderr.rename("stderr").astype(np.float32),
n.rename("n").astype(np.int16),
])

return regression_ds

Expand Down Expand Up @@ -1155,3 +1127,84 @@ def calculate_stsad(vec, window_size=365, step=10, progress=None, window="hann")
progress=progress,
window=window,
)


def xr_optical_flow(da, method="ilk", radius=20, parallel=True, **kwargs):
"""
Compute optical flow between consecutive time steps in an xarray DataArray.

Optical flow is computed for consecutive time pairs (t, t+1) using the
iterative Lucas-Kanade (ILK) method from scikit-image. This function
parallelises across time steps if requested.

Parameters
----------
da : xarray.DataArray
Input data with dimensions ("time", "y", "x") representing a temporal image sequence.
method : {"ilk", "tvl1"}, optional
Optical flow algorithm to use:
- "ilk": Iterative Lucas–Kanade (default, fast and robust)
- "tvl1": Total Variation L1 (more accurate but slower)
radius : int, optional
If `method="ilk"`, the radius of the window used by the optical flow algorithm.
Default is 20.
parallel : bool, optional
If True, computations are parallelised across time steps using
`concurrent.futures.ThreadPoolExecutor()`.
**kwargs : dict
Additional keyword arguments passed to `skimage.registration.optical_flow_ilk`
or `skimage.registration.optical_flow_tvl1`.

Returns
-------
xarray.Dataset
Dataset containing:
- ``v`` : Vertical (y-axis) component of optical flow.
- ``u`` : Horizontal (x-axis) component of optical flow.
Both have dimensions ("time", "y", "x") and coordinates matching the input.

"""

# Select optical flow method
if method == "ilk":
flow_func = lambda a, b: optical_flow_ilk(a, b, radius=radius, **kwargs)
elif method == "tvl1":
flow_func = lambda a, b: optical_flow_tvl1(a, b, **kwargs)
else:
raise ValueError(f"Unsupported method '{method}'. Use 'ilk' or 'tvl1'.")

# Define worker function
def _compute_flow(t):
return flow_func(da.isel(time=t), da.isel(time=t + 1))

# Get x and y dim names
y_dim, x_dim = da.odc.spatial_dims

# Indices in the array to iterate over
indices = range(len(da.time) - 1)

# Run in parallel if requested
if parallel:
with concurrent.futures.ThreadPoolExecutor() as executor:
flow_results = list(
tqdm(
executor.map(_compute_flow, indices),
total=len(indices),
desc=f"Computing optical flow in parallel using {method}",
)
)

# Otherwise, run in sequence
else:
flow_results = [_compute_flow(t) for t in tqdm(indices, desc=f"Computing optical flow using {method}")]

# Combine results into xarray.Dataset
flow_stacked = np.stack(flow_results)

return xr.Dataset(
{
"v": (("time", y_dim, x_dim), flow_stacked[:, 0]),
"u": (("time", y_dim, x_dim), flow_stacked[:, 1]),
},
coords={"time": da.time[1:], y_dim: da[y_dim], x_dim: da[x_dim]},
)
Loading