-
Notifications
You must be signed in to change notification settings - Fork 137
Expand file tree
/
Copy pathdatahandling.py
More file actions
1511 lines (1279 loc) · 55.8 KB
/
datahandling.py
File metadata and controls
1511 lines (1279 loc) · 55.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# dea_datahandling.py
"""
Loading and manipulating Digital Earth Australia products and data
using the Open Data Cube and xarray.
License: The code in this notebook is licensed under the Apache License,
Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0). Digital Earth
Australia data is licensed under the Creative Commons by Attribution 4.0
license (https://creativecommons.org/licenses/by/4.0/).
Contact: If you need assistance, please post a question on the Open Data
Cube Discord chat (https://discord.com/invite/4hhBQVas5U) or on the GIS Stack
Exchange (https://gis.stackexchange.com/questions/ask?tags=open-data-cube)
using the `open-data-cube` tag (you can view previously asked questions
here: https://gis.stackexchange.com/questions/tagged/open-data-cube).
If you would like to report an issue with this script, you can file one
on GitHub (https://github.com/GeoscienceAustralia/dea-notebooks/issues/new).
Last modified: April 2025
"""
# This is a test
import datetime
# Import required packages
import os
import warnings
import zipfile
from collections import Counter
import numpy as np
import odc.algo
import odc.geo.xr
import pandas as pd
import requests
import rioxarray
import sklearn.decomposition
import xarray as xr
from odc.algo import mask_cleanup
from scipy.ndimage import binary_dilation
from skimage.color import hsv2rgb, rgb2hsv
from skimage.exposure import match_histograms
def _dc_query_only(**kw):
"""
Remove load-only datacube parameters, the rest can be
passed to Query/dc.find_datasets.
Returns
-------
dict of query parameters
"""
def _impl(
measurements=None,
output_crs=None,
resolution=None,
resampling=None,
skip_broken_datasets=None,
dask_chunks=None,
fuse_func=None,
align=None,
datasets=None,
progress_cbk=None,
group_by=None,
**query,
):
return query
return _impl(**kw)
def _common_bands(dc, products):
"""
Takes a list of products and returns a list of measurements/bands
that are present in all products
Returns
-------
List of band names
"""
common = None
bands = None
for p in products:
p = dc.index.products.get_by_name(p)
if common is None:
common = set(p.measurements)
bands = list(p.measurements)
else:
common = common.intersection(set(p.measurements))
return [band for band in bands if band in common]
def _contiguity_fuser(dst: np.ndarray, src: np.ndarray) -> None:
"""
Ensure contiguity data is properly combined by replacing
pixels in `dst` that are either 0 (non-contiguous) or 255
(nodata) with the corresponding value from `src`, propogating
1 (valid contiguous data) if it exists.
"""
np.copyto(dst, src, where=np.isin(dst, (255, 0)))
def load_ard(
dc,
products=None,
cloud_mask="fmask",
min_gooddata=0.00,
mask_pixel_quality=True,
mask_filters=None,
mask_contiguity=False,
fmask_categories=["valid", "snow", "water"],
s2cloudless_categories=["valid"],
ls7_slc_off=True,
dtype="auto",
predicate=None,
verbose=True,
**kwargs,
):
"""
Load multiple Geoscience Australia Landsat or Sentinel 2
Collection 3 Analysis Ready Data products (e.g. Landsat 5, 7, 8, 9; Sentinel 2A, 2B, 2C),
optionally apply pixel quality/cloud masking and contiguity masks,
and drop time steps that contain greater than a minimum proportion
of good quality (e.g. non-cloudy or shadowed) pixels.
The function supports loading the following Landsat products:
* ga_ls5t_ard_3
* ga_ls7e_ard_3
* ga_ls8c_ard_3
* ga_ls9c_ard_3
And Sentinel-2 products:
* ga_s2am_ard_3
* ga_s2bm_ard_3
* ga_s2cm_ard_3
Cloud masking can be performed using the Fmask (Function of Mask)
cloud mask for Landsat and Sentinel-2, and the s2cloudless
(Sentinel Hub cloud detector for Sentinel-2 imagery) cloud mask for
Sentinel-2.
Last modified: February 2025
Parameters
----------
dc : datacube Datacube object
The Datacube to connect to, i.e. ``dc = datacube.Datacube()``.
This allows you to also use development datacubes if required.
products : list
A list of product names to load. Valid options are
['ga_ls5t_ard_3', 'ga_ls7e_ard_3', 'ga_ls8c_ard_3', 'ga_ls9c_ard_3']
for Landsat, ['ga_s2am_ard_3', 'ga_s2bm_ard_3', 'ga_s2cm_ard_3']
for Sentinel 2.
cloud_mask : string, optional
The cloud mask used by the function. This is used for both
masking out poor quality pixels (e.g. clouds) if
``mask_pixel_quality=True``, and for calculating the
``min_gooddata`` percentage when dropping cloudy or low quality
satellite observations. Two cloud masks are supported:
* ``'fmask'`` (default; available for Landsat, Sentinel-2)
* ``'s2cloudless'`` (Sentinel-2 only)
min_gooddata : float, optional
The minimum percentage of good quality pixels required for a
satellite observation to be loaded. Defaults to 0.00 which will
return all observations regardless of pixel quality (set to e.g.
0.99 to return only observations with more than 99% good quality
pixels).
mask_pixel_quality : str or bool, optional
Whether to mask out poor quality (e.g. cloudy) pixels by setting
them as nodata. Depending on the choice of cloud mask, the
function will identify good quality pixels using the categories
passed to the ``fmask_categories`` or ``s2cloudless_categories``
params. Set to False to turn off pixel quality masking completely.
Poor quality pixels will be set to NaN (and convert all data to
`float32`) if ``dtype='auto'``, or be set to the data's native
nodata value (usually -999) if ``dtype='native'`` (see ``dtype``
below for more details).
mask_filters : iterable of tuples, optional
Iterable tuples of morphological operations - ("<operation>", <radius>)
to apply to the inverted pixel quality mask, where:
operation: string; one of these morphological operations:
* ``'dilation'`` = Expands poor quality pixels/clouds outwards
* ``'erosion'`` = Shrinks poor quality pixels/clouds inwards
* ``'closing'`` = Remove small holes in clouds by expanding then shrinking poor quality pixels
* ``'opening'`` = Remove small or narrow clouds by shrinking then expanding poor quality pixels
radius: int,
e.g. ``mask_filters=[('erosion', 5), ("opening", 2), ("dilation", 2)]``
mask_contiguity : str or bool, optional
Whether to mask out pixels that are missing data in any band
(i.e. "non-contiguous" pixels). This can be important for
generating clean composite datasets. The default of False will
not apply any contiguity mask.
If loading NBART data, set:
* ``mask_contiguity='nbart'`` (or ``mask_contiguity=True``)
If loading NBAR data, specify:
* ``mask_contiguity='nbar'``
Non-contiguous pixels will be set to NaN if ``dtype='auto'``, or
set to the data's native nodata value if ``dtype='native'`` (see
'dtype' below).
fmask_categories : list, optional
A list of Fmask cloud mask categories to consider as good
quality pixels when calculating ``min_gooddata`` and when masking
data by pixel quality if ``mask_pixel_quality=True``.
The default is ``['valid', 'snow', 'water']``; all other Fmask
categories ('cloud', 'shadow', 'nodata') will be treated as low
quality pixels. Choose from: 'nodata', 'valid', 'cloud',
'shadow', 'snow', and 'water'.
s2cloudless_categories : list, optional
A list of s2cloudless cloud mask categories to consider as good
quality pixels when calculating ``min_gooddata`` and when masking
data by pixel quality if ``mask_pixel_quality=True``. The default
is ``['valid']``; all other s2cloudless categories ('cloud',
'nodata') will be treated as low quality pixels. Choose from:
'nodata', 'valid', or 'cloud'.
ls7_slc_off : bool, optional
An optional boolean indicating whether to include data from
after the Landsat 7 SLC failure (i.e. SLC-off). Defaults to
True, which keeps all Landsat 7 observations > May 31 2003.
dtype : string, optional
Controls the data type/dtype that layers are coerced to after
loading. Valid values: 'native', 'auto', and 'float{16|32|64}'.
When 'auto' is used, the data will be converted to `float32`
if masking is used, otherwise data will be returned in the
native data type of the data. Be aware that if data is loaded
in its native dtype, nodata and masked pixels will be returned
with the data's native nodata value (typically -999), not NaN.
predicate : function, optional
DEPRECATED: Please use ``dataset_predicate`` instead.
An optional function that can be passed in to restrict the datasets that
are loaded. A predicate function should take a
``datacube.model.Dataset`` object as an input (i.e. as returned
from ``dc.find_datasets``), and return a boolean. For example,
a predicate function could be used to return True for only
datasets acquired in January: `dataset.time.begin.month == 1`
verbose : bool, optional
If True, print progress statements during loading
**kwargs :
A set of keyword arguments to `dc.load` that define the
spatiotemporal query and load parameters used to extract data.
Keyword arguments can either be listed directly in the
``load_ard`` call like any other parameter (e.g.
``measurements=['nbart_red']``), or by passing in a query kwarg
dictionary (e.g. ``**query``). Keywords can include ``measurements``,
``x``, ``y``, ``time``, ``resolution``, ``resampling``, ``group_by``, ``crs``;
see the ``dc.load`` documentation for all possible options:
https://datacube-core.readthedocs.io/en/latest/api/indexed-data/generate/datacube.Datacube.load.html
Returns
-------
combined_ds : xarray.Dataset
An xarray.Dataset containing only satellite observations with
a proportion of good quality pixels greater than `min_gooddata`.
Notes
-----
The `load_ard` function builds on the Open Data Cube's native `dc.load`
function by adding the ability to load multiple satellite data
products at once, and automatically apply cloud masking and filtering.
For loading non-satellite data products (e.g. DEA Water Observations),
use `dc.load` instead.
"""
# Attempt to import datacube and raise an error if not available
try:
from datacube.utils.dates import normalise_dt
except ImportError as e:
raise ImportError(
"`datacube` is required for `load_ard`. "
"Please install DEA Tools with the `[datacube]` extra, e.g.: "
"`pip install dea-tools[datacube]`"
) from e
#########
# Setup #
#########
# Convert products to a list if it is passed as a string
products = [products] if isinstance(products, str) else products
# Valid Landsat products
valid_ls = ["ga_ls5t_ard_3", "ga_ls7e_ard_3", "ga_ls8c_ard_3", "ga_ls9c_ard_3"]
valid_s2 = ["ga_s2am_ard_3", "ga_s2bm_ard_3", "ga_s2cm_ard_3"]
# Verify that products were provided
if not products:
raise ValueError(
f"Please provide a list of Landsat or Sentinel-2 Analysis Ready Data "
f"product names to load data from. Valid options are: "
f"{valid_ls + valid_s2}."
)
# Determine whether products are all Landsat, all S2, or mixed
if all([product in valid_ls for product in products]):
product_type = "ls"
elif all([product in valid_s2 for product in products]):
product_type = "s2"
elif all([product in valid_s2 + valid_ls for product in products]):
product_type = "mixed"
warnings.warn(
"You have selected a combination of Landsat and Sentinel-2 "
"products. This can produce unexpected results as these "
"products use the same names for different spectral bands "
"(e.g. Landsat and Sentinel-2's 'nbart_swir_2'); use with "
"caution."
)
else:
# If an invalid product is passed, raise error
invalid_products = [product for product in products if product not in valid_s2 + valid_ls]
raise ValueError(
f"The `load_ard` function only supports Landsat and "
f"Sentinel-2 Analysis Ready Data products; {invalid_products} is not supported. "
f"Valid options are: {valid_ls + valid_s2}."
)
# Set contiguity band depending on `mask_contiguity`;
# "oa_nbart_contiguity" if True, False or "nbart",
# "oa_nbar_contiguity" if "nbar"
if mask_contiguity in (True, False, "nbart"):
contiguity_band = "oa_nbart_contiguity"
elif mask_contiguity == "nbar":
contiguity_band = "oa_nbar_contiguity"
else:
raise ValueError(
f"Unsupported value '{mask_contiguity}' passed to "
"`mask_contiguity`. Please provide either 'nbart', 'nbar', "
"True, or False."
)
# Set pixel quality (PQ) band depending on `cloud_mask`
if cloud_mask == "fmask":
pq_band = "oa_fmask"
pq_categories = fmask_categories
elif cloud_mask == "s2cloudless":
pq_band = "oa_s2cloudless_mask"
pq_categories = s2cloudless_categories
# Raise error if s2cloudless is requested for Landsat products
if product_type in ["ls", "mixed"]:
raise ValueError(
"The 's2cloudless' cloud mask is not available for "
"Landsat products. Please set `mask_pixel_quality` to "
"'fmask' or False."
)
else:
raise ValueError(
f"Unsupported value '{cloud_mask}' passed to "
"`cloud_mask`. Please provide either 'fmask', "
"'s2cloudless', True, or False."
)
# To ensure that the categorical PQ/contiguity masking bands are
# loaded using nearest neighbour resampling, we need to add these to
# the resampling kwarg if it exists and is not "nearest".
# This only applies if a string resampling method is supplied;
# if a resampling dictionary (e.g. `resampling={'*': 'bilinear',
# 'oa_fmask': 'mode'}` is passed instead we assume the user wants
# to select custom resampling methods for each of their bands.
resampling = kwargs.get("resampling")
if isinstance(resampling, str) and resampling not in (None, "nearest"):
kwargs["resampling"] = {
"*": resampling,
pq_band: "nearest",
contiguity_band: "nearest",
}
# We extract and deal with `dask_chunks` separately as every
# function call uses dask internally regardless of whether the user
# sets `dask_chunks` themselves
dask_chunks = kwargs.pop("dask_chunks", None)
# Create a list of requested measurements so that we can eventually
# return only the measurements the user orignally asked for
requested_measurements = kwargs.pop("measurements", None)
# Copy our measurements list so we can temporarily append extra PQ
# and/or contiguity masking bands when loading our data
measurements = requested_measurements.copy() if requested_measurements else None
# Deal with "load all" case: pick a set of bands that are common
# across requested products
if measurements is None:
measurements = _common_bands(dc, products)
# Deal with edge case where user supplies alias for PQ/contiguity
# by stripping PQ/contiguity masks of their "oa_" prefix
else:
contiguity_band = (
contiguity_band.replace("oa_", "")
if contiguity_band.replace("oa_", "") in measurements
else contiguity_band
)
pq_band = pq_band.replace("oa_", "") if pq_band.replace("oa_", "") in measurements else pq_band
# Use custom fuse function to ensure contiguity is combined correctly
# when grouping data by solar day. Without this, contiguity data from
# neighbouring images is pasted semi-randomly over each other,
# producing artefacts in the output.
kwargs["fuse_func"] = {contiguity_band: _contiguity_fuser}
# If `measurements` are specified but do not include PQ or
# contiguity variables, add these to `measurements`
if pq_band not in measurements:
measurements.append(pq_band)
if mask_contiguity and contiguity_band not in measurements:
measurements.append(contiguity_band)
# Get list of data and mask bands so that we can later exclude
# mask bands from being masked themselves
data_bands = [band for band in measurements if band not in (pq_band, contiguity_band)]
mask_bands = [band for band in measurements if band not in data_bands]
#################
# Find datasets #
#################
# Pull out query params only to pass to dc.find_datasets
query = _dc_query_only(**kwargs)
# If predicate is specified, use this function to filter the list
# of datasets prior to load
if verbose:
if predicate:
print(
"The 'predicate' parameter will be deprecated in future "
"versions of this function as this functionality has now "
"been added to Datacube itself. Please use "
"`dataset_predicate=...` instead."
)
query["dataset_predicate"] = predicate
# Extract list of datasets for each product using query params
dataset_list = []
# Get list of datasets for each product
if verbose:
print("Finding datasets")
for product in products:
# Obtain list of datasets for product
if verbose:
print(
f" {product} (ignoring SLC-off observations)"
if not ls7_slc_off and product == "ga_ls7e_ard_3"
else f" {product}"
)
datasets = dc.find_datasets(product=product, **query)
# Remove Landsat 7 SLC-off observations if ls7_slc_off=False
if not ls7_slc_off and product == "ga_ls7e_ard_3":
datasets = [i for i in datasets if normalise_dt(i.time.begin) < datetime.datetime(2003, 5, 31)]
# Add any returned datasets to list
dataset_list.extend(datasets)
# Raise exception if no datasets are returned
if len(dataset_list) == 0:
raise ValueError(
"No data available for query: ensure that "
"the products specified have data for the "
"time and location requested"
)
#############
# Load data #
#############
# Note we always load using dask here so that we can lazy load data
# before filtering by `min_gooddata`
ds = dc.load(
datasets=dataset_list,
measurements=measurements,
dask_chunks={} if dask_chunks is None else dask_chunks,
**kwargs,
)
####################
# Filter good data #
####################
# Calculate pixel quality mask
pq_mask = odc.algo.fmask_to_bool(ds[pq_band], categories=pq_categories)
# The good data percentage calculation has to load all pixel quality
# data, which can be slow. If the user has chosen no filtering
# by using the default `min_gooddata = 0`, we can skip this step
# completely to save processing time
if min_gooddata > 0.0:
# Compute good data for each observation as % of total pixels
if verbose:
print(f"Counting good quality pixels for each time step using {cloud_mask}")
data_perc = pq_mask.sum(axis=[1, 2], dtype="int32") / (pq_mask.shape[1] * pq_mask.shape[2])
keep = (data_perc >= min_gooddata).persist()
# Filter by `min_gooddata` to drop low quality observations
total_obs = len(ds.time)
ds = ds.sel(time=keep)
pq_mask = pq_mask.sel(time=keep)
if verbose:
print(
f"Filtering to {len(ds.time)} out of {total_obs} "
f"time steps with at least {min_gooddata:.1%} "
f"good quality pixels"
)
# Morphological filtering on cloud masks
if (mask_filters is not None) & mask_pixel_quality:
if verbose:
print(f"Applying morphological filters to pixel quality mask: {mask_filters}")
pq_mask = ~mask_cleanup(~pq_mask, mask_filters=mask_filters)
warnings.warn(
"As of `dea_tools` v0.3.0, pixel quality masks are "
"inverted before being passed to `mask_filters` (i.e. so "
"that good quality/clear pixels are False and poor quality "
"pixels/clouds are True). This means that 'dilation' will "
"now expand cloudy pixels, rather than shrink them as in "
"previous versions."
)
###############
# Apply masks #
###############
# Create a combined mask to hold both pixel quality and contiguity.
# This is more efficient than creating multiple dask tasks for
# similar masking operations.
mask = None
# Add pixel quality mask to combined mask
if mask_pixel_quality:
if verbose:
print(f"Applying {cloud_mask} pixel quality/cloud mask")
mask = pq_mask
# Add contiguity mask to combined mask
if mask_contiguity:
if verbose:
print(f"Applying contiguity mask ({contiguity_band})")
cont_mask = ds[contiguity_band] == 1
# If mask already has data if mask_pixel_quality == True,
# multiply with cont_mask to perform a logical 'or' operation
# (keeping only pixels good in both)
mask = cont_mask if mask is None else mask * cont_mask
# Split into data/masks bands, as conversion to float and masking
# should only be applied to data bands
ds_data = ds[data_bands]
ds_masks = ds[mask_bands]
# Mask data if either of the above masks were generated
if mask is not None:
ds_data = odc.algo.keep_good_only(ds_data, where=mask)
# Automatically set dtype to either native or float32 depending
# on whether masking was requested
if dtype == "auto":
dtype = "native" if mask is None else "float32"
# Set nodata values using odc.algo tools to reduce peak memory
# use when converting data dtype
if dtype != "native":
ds_data = odc.algo.to_float(ds_data, dtype=dtype)
# Put data and mask bands back together
attrs = ds.attrs
ds = xr.merge([ds_data, ds_masks])
ds.attrs.update(attrs)
###############
# Return data #
###############
# Drop bands not originally requested by user
if requested_measurements:
ds = ds[requested_measurements]
# If user supplied `dask_chunks`, return data as a dask array
# without actually loading it into memory
if dask_chunks is not None:
if verbose:
print(f"Returning {len(ds.time)} time steps as a dask array")
return ds
if verbose:
print(f"Loading {len(ds.time)} time steps")
return ds.compute()
def mostcommon_crs(dc, product, query):
"""
Takes a given query and returns the most common CRS for observations
returned for that spatial extent. This can be useful when your study
area lies on the boundary of two UTM zones, forcing you to decide
which CRS to use for your `output_crs` in `dc.load`.
Parameters
----------
dc : datacube Datacube object
The Datacube to connect to, i.e. `dc = datacube.Datacube()`.
This allows you to also use development datacubes if required.
product : str
A product name (or list of product names) to load CRSs from.
query : dict
A datacube query including x, y and time range to assess for the
most common CRS
Returns
-------
epsg_string : str
An EPSG string giving the most common CRS from all datasets
returned by the query above
"""
# Find list of datasets matching query for either product or
# list of products
if isinstance(product, list):
matching_datasets = []
for i in product:
matching_datasets.extend(dc.find_datasets(product=i, **query))
else:
matching_datasets = dc.find_datasets(product=product, **query)
# Extract all CRSs
crs_list = [str(i.crs) for i in matching_datasets]
# If CRSs are returned
if len(crs_list) > 0:
# Identify most common CRS
crs_counts = Counter(crs_list)
crs_mostcommon = crs_counts.most_common(1)[0][0]
# Warn user if multiple CRSs are encountered
if len(crs_counts.keys()) > 1:
warnings.warn(
f"Multiple UTM zones {list(crs_counts.keys())} "
f"were returned for this query. Defaulting to "
f"the most common zone: {crs_mostcommon}",
UserWarning,
)
return crs_mostcommon
raise ValueError(
f"No CRS was returned as no data was found for "
f"the supplied product ({product}) and query. "
f"Please ensure that data is available for "
f"{product} for the spatial extents and time "
f"period specified in the query (e.g. by using "
f"the Data Cube Explorer for this datacube "
f"instance)."
)
def download_unzip(url, output_dir=None, remove_zip=True):
"""
Downloads and unzips a .zip file from an external URL to a local
directory.
Parameters
----------
url : str
A string giving a URL path to the zip file you wish to download
and unzip
output_dir : str, optional
An optional string giving the directory to unzip files into.
Defaults to None, which will unzip files in the current working
directory
remove_zip : bool, optional
An optional boolean indicating whether to remove the downloaded
.zip file after files are unzipped. Defaults to True, which will
delete the .zip file.
"""
# Get basename for zip file
zip_name = os.path.basename(url)
# Raise exception if the file is not of type .zip
if not zip_name.endswith(".zip"):
raise ValueError(
f"The URL provided does not point to a .zip "
f"file (e.g. {zip_name}). Please specify a "
f"URL path to a valid .zip file"
)
# Download zip file
print(f"Downloading {zip_name}")
r = requests.get(url)
with open(zip_name, "wb") as f:
f.write(r.content)
# Extract into output_dir
with zipfile.ZipFile(zip_name, "r") as zip_ref:
zip_ref.extractall(output_dir)
print(f"Unzipping output files to: {output_dir if output_dir else os.getcwd()}")
# Optionally cleanup
if remove_zip:
os.remove(zip_name)
def wofs_fuser(dest, src):
"""
Fuse two WOfS water measurements represented as ``ndarray``s.
Note: this is a copy of the function located here:
https://github.com/GeoscienceAustralia/digitalearthau/blob/develop/digitalearthau/utils.py
"""
empty = (dest & 1).astype(bool)
both = ~empty & ~((src & 1).astype(bool))
dest[empty] = src[empty]
dest[both] |= src[both]
def dilate(array, dilation=10, invert=True):
"""
Dilate a binary array by a specified nummber of pixels using a
disk-like radial dilation.
By default, invalid (e.g. False or 0) values are dilated. This is
suitable for applications such as cloud masking (e.g. creating a
buffer around cloudy or shadowed pixels). This functionality can
be reversed by specifying `invert=False`.
Parameters
----------
array : array
The binary array to dilate.
dilation : int, optional
An optional integer specifying the number of pixels to dilate
by. Defaults to 10, which will dilate `array` by 10 pixels.
invert : bool, optional
An optional boolean specifying whether to invert the binary
array prior to dilation. The default is True, which dilates the
invalid values in the array (e.g. False or 0 values).
Returns
-------
An array of the same shape as `array`, with valid data pixels
dilated by the number of pixels specified by `dilation`.
"""
y, x = np.ogrid[
-dilation : (dilation + 1),
-dilation : (dilation + 1),
]
# disk-like radial dilation
kernel = (x * x) + (y * y) <= (dilation + 0.5) ** 2
# If invert=True, invert True values to False etc
if invert:
array = ~array
return ~binary_dilation(array.astype(bool), structure=kernel.reshape((1,) + kernel.shape))
def paths_to_datetimeindex(paths, string_slice=(0, 10)):
"""
Helper function to generate a Pandas datetimeindex object
from dates contained in a file path string.
Parameters
----------
paths : list of strings
A list of file path strings that will be used to extract times
string_slice : tuple
An optional tuple giving the start and stop position that
contains the time information in the provided paths. These are
applied to the basename (i.e. file name) in each path, not the
path itself. Defaults to (0, 10).
Returns
-------
datetime : pandas.DatetimeIndex
A pandas.DatetimeIndex object containing a 'datetime64[ns]' derived
from the file paths provided by `paths`.
"""
date_strings = [os.path.basename(i)[slice(*string_slice)] for i in paths]
return pd.to_datetime(date_strings)
def _select_along_axis(values, idx, axis):
other_ind = np.ix_(*[np.arange(s) for s in idx.shape])
sl = other_ind[:axis] + (idx,) + other_ind[axis:]
return values[sl]
def first(array: xr.DataArray, dim: str, index_name: str = None) -> xr.DataArray:
"""
Finds the first occuring non-null value along the given dimension.
Parameters
----------
array : xr.DataArray
The array to search.
dim : str
The name of the dimension to reduce by finding the first
non-null value.
Returns
-------
reduced : xr.DataArray
An array of the first non-null values.
The `dim` dimension will be removed, and replaced with a coord
of the same name, containing the value of that dimension where
the last value was found.
"""
axis = array.get_axis_num(dim)
idx_first = np.argmax(~pd.isnull(array), axis=axis)
reduced = array.reduce(_select_along_axis, idx=idx_first, axis=axis)
reduced[dim] = array[dim].isel({dim: xr.DataArray(idx_first, dims=reduced.dims)})
if index_name is not None:
reduced[index_name] = xr.DataArray(idx_first, dims=reduced.dims)
return reduced
def last(array: xr.DataArray, dim: str, index_name: str = None) -> xr.DataArray:
"""
Finds the last occuring non-null value along the given dimension.
Parameters
----------
array : xr.DataArray
The array to search.
dim : str
The name of the dimension to reduce by finding the last non-null
value.
index_name : str, optional
If given, the name of a coordinate to be added containing the
index of where on the dimension the nearest value was found.
Returns
-------
reduced : xr.DataArray
An array of the last non-null values.
The `dim` dimension will be removed, and replaced with a coord
of the same name, containing the value of that dimension where
the last value was found.
"""
axis = array.get_axis_num(dim)
rev = (slice(None),) * axis + (slice(None, None, -1),)
idx_last = -1 - np.argmax(~pd.isnull(array)[rev], axis=axis)
reduced = array.reduce(_select_along_axis, idx=idx_last, axis=axis)
reduced[dim] = array[dim].isel({dim: xr.DataArray(idx_last, dims=reduced.dims)})
if index_name is not None:
reduced[index_name] = xr.DataArray(idx_last, dims=reduced.dims)
return reduced
def nearest(array: xr.DataArray, dim: str, target, index_name: str = None) -> xr.DataArray:
"""
Finds the nearest values to a target label along the given
dimension, for all other dimensions.
E.g. For a DataArray with dimensions ('time', 'x', 'y'):
``nearest_array = nearest(array, 'time', '2017-03-12')``
will return an array with the dimensions ('x', 'y'), with non-null
values found closest for each (x, y) pixel to that location along
the time dimension.
The returned array will include the 'time' coordinate for each x,y
pixel that the nearest value was found.
Parameters
----------
array : xr.DataArray
The array to search.
dim : str
The name of the dimension to look for the target label.
target : same type as array[dim]
The value to look up along the given dimension.
index_name : str, optional
If given, the name of a coordinate to be added containing the
index of where on the dimension the nearest value was found.
Returns
-------
nearest_array : xr.DataArray
An array of the nearest non-null values to the target label.
The `dim` dimension will be removed, and replaced with a coord
of the same name, containing the value of that dimension closest
to the given target label.
"""
before_target = slice(None, target)
after_target = slice(target, None)
da_before = array.sel({dim: before_target})
da_after = array.sel({dim: after_target})
da_before = last(da_before, dim, index_name) if da_before[dim].shape[0] else None
da_after = first(da_after, dim, index_name) if da_after[dim].shape[0] else None
if da_before is None and da_after is not None:
return da_after
if da_after is None and da_before is not None:
return da_before
target = array[dim].dtype.type(target)
is_before_closer = abs(target - da_before[dim]) < abs(target - da_after[dim])
nearest_array = xr.where(is_before_closer, da_before, da_after, keep_attrs=True)
nearest_array[dim] = xr.where(is_before_closer, da_before[dim], da_after[dim], keep_attrs=True)
if index_name is not None:
nearest_array[index_name] = xr.where(
is_before_closer,
da_before[index_name],
da_after[index_name],
keep_attrs=True,
)
return nearest_array
def parallel_apply(ds, dim, func, use_threads=False, *args, **kwargs):
"""
Applies a custom function in parallel along the dimension of an
xarray.Dataset or xarray.DataArray.
The function can be any function that can be applied to an
individual xarray.Dataset or xarray.DataArray (e.g. data for a
single timestep). The function should also return data in
xarray.Dataset or xarray.DataArray format.
This function is useful as a simple method for parallising code
that cannot easily be parallised using Dask.
Parameters
----------
ds : xarray.Dataset or xarray.DataArray
xarray data with a dimension `dim` to apply the custom function
along.
dim : string
The dimension along which the custom function will be applied.
func : function
The function that will be applied in parallel to each array
along dimension `dim`. The first argument passed to this
function should be the array along `dim`.
use_threads : bool, optional
Whether to use threads instead of processes for parallelisation.
Defaults to False, which means it'll use multi-processing.
In brief, the difference between threads and processes is that threads
share memory, while processes have separate memory.
*args :
Any number of arguments that will be passed to `func`.
**kwargs :
Any number of keyword arguments that will be passed to `func`.
Returns
-------
xarray.Dataset
A concatenated dataset containing an output for each array
along the input `dim` dimension.
"""
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import partial
from itertools import repeat
from tqdm import tqdm
# Use threads or processes
Executor = ThreadPoolExecutor if use_threads else ProcessPoolExecutor
with Executor as executor:
# Update func to add kwargs
func = partial(func, **kwargs)
# Apply func in parallel
groups = [group for (i, group) in ds.groupby(dim)]
to_iterate = (groups, *(repeat(i, len(groups)) for i in args))
out_list = list(tqdm(executor.map(func, *to_iterate), total=len(groups)))
# Combine to match the original dataset
return xr.concat(out_list, dim=ds[dim])
def _apply_weights(da, band_weights):
"""
Apply weights from a dictionary to the bands of a
multispectral xarray.DataArray. Raises a ValueError if any
bands in `da` are not present in the `band_weights` dictionary.