Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions Tools/dea_tools/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import dask.distributed
from aiohttp import ClientConnectionError
from odc.io.cgroups import get_cpu_quota
from odc.stac import configure_rio as cfg_rio

_HAVE_PROXY = bool(find_spec("jupyter_server_proxy"))

Expand Down Expand Up @@ -86,10 +85,7 @@ def create_local_dask_cluster(

# Count cpus if threads_per_worker not provided
if threads_per_worker is None:
if get_cpu_quota() is not None:
threads_per_worker = round(get_cpu_quota())
else:
threads_per_worker = os.cpu_count()
threads_per_worker = round(get_cpu_quota()) if get_cpu_quota() is not None else os.cpu_count()

# by default split 95% of system memory by the n_workers.
if memory_limit == "spare_mem":
Expand All @@ -103,9 +99,16 @@ def create_local_dask_cluster(
**kwargs,
)

# configure aws access
# Configure AWS and GDAL/rasterio access. Use datacube `configure_s3_access`
# function preferentially if datacube is installed, as this function will
# choose the correct settings automatically. If datacube is not installed,
# use version of function from odc.loader > odc.stac.
if configure_rio:
cfg_rio(cloud_defaults=True, aws={"aws_unsigned": True}, client=client)
try:
from datacube.utils.aws import configure_s3_access
except ImportError:
from odc.stac import configure_s3_access
configure_s3_access(cloud_defaults=True, aws_unsigned=True, client=client)

# Show the dask cluster settings
if display_client:
Expand Down Expand Up @@ -171,9 +174,7 @@ def create_dask_gateway_cluster(profile="r5_L", workers=2):

# limit username to alphanumeric characters
# kubernetes pods won't launch if labels contain anything other than [a-Z, -, _]
options["jupyterhub_user"] = "".join(
c if c.isalnum() else "-" for c in os.getenv("JUPYTERHUB_USER")
)
options["jupyterhub_user"] = "".join(c if c.isalnum() else "-" for c in os.getenv("JUPYTERHUB_USER"))

cluster = gateway.new_cluster(options)
cluster.scale(workers)
Expand Down
Loading