Skip to content

WIP: Support specifying multiple dashboard addresses. #7189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@

from distributed import Nanny
from distributed._signals import wait_for_signals
from distributed.comm import get_address_host_port
from distributed.comm import get_address_host_port, unparse_host_port
from distributed.deploy.utils import nprocesses_nthreads
from distributed.preloading import validate_preload_argv
from distributed.proctitle import (
enable_proctitle_on_children,
enable_proctitle_on_current,
)
from distributed.utils import import_term, parse_ports
from distributed.utils import clean_dashboard_address, import_term, parse_ports

logger = logging.getLogger("distributed.dask_worker")

Expand Down Expand Up @@ -77,7 +77,11 @@
"--dashboard-address",
type=str,
default=":0",
help="Address on which to listen for diagnostics dashboard",
help="Address on which to listen for diagnostics dashboard. "
"When creating multiple workers with --nworkers, dashboard addresses "
"can be specified by using commas to separate multiple ip addresses. "
"For example, --dashboard-address=3000,3001,3002 will use ports "
"3000, 3001, 3002.",
)
@click.option(
"--dashboard/--no-dashboard",
Expand Down Expand Up @@ -367,7 +371,9 @@ def del_pid_file():

worker_class = import_term(worker_class)

port_kwargs = _apportion_ports(worker_port, nanny_port, n_workers, nanny)
port_kwargs = _apportion_ports(
worker_port, nanny_port, dashboard_address, n_workers, nanny
)
assert len(port_kwargs) == n_workers

if nanny:
Expand Down Expand Up @@ -404,7 +410,6 @@ async def run():
contact_address=contact_address,
host=host,
dashboard=dashboard,
dashboard_address=dashboard_address,
name=name
if n_workers == 1 or name is None or name == ""
else str(name) + "-" + str(i),
Expand Down Expand Up @@ -455,18 +460,22 @@ async def wait_for_signals_and_close():


def _apportion_ports(
worker_port: str | None, nanny_port: str | None, n_workers: int, nanny: bool
worker_port: str | None,
nanny_port: str | None,
dashboard_address: str,
n_workers: int,
nanny: bool,
) -> list[dict[str, Any]]:
"""Spread out evenly --worker-port and/or --nanny-port ranges to the workers and
nannies, avoiding overlap.

Returns
=======
List of kwargs to pass to the Worker or Nanny construtors
List of kwargs to pass to the Worker or Nanny constructors
"""
seen = set()

def parse_unique(s: str | None) -> Iterator[int | None]:
def parse_port_unique(s: str | None) -> Iterator[int | None]:
ports = parse_ports(s)
if ports in ([0], [None]):
for _ in range(n_workers):
Expand All @@ -477,8 +486,17 @@ def parse_unique(s: str | None) -> Iterator[int | None]:
seen.add(port)
yield port

worker_ports_iter = parse_unique(worker_port)
nanny_ports_iter = parse_unique(nanny_port)
def parse_address_unique(s: str) -> Iterator[str]:
addresses = clean_dashboard_address(s)
if len(addresses) == 1 and addresses[0]["port"] in (0, None):
for _ in range(n_workers):
yield unparse_host_port(addresses[0]["address"], addresses[0]["port"])
else:
for item in addresses:
yield unparse_host_port(item["address"], item["port"])

worker_ports_iter = parse_port_unique(worker_port)
nanny_ports_iter = parse_port_unique(nanny_port)

# [(worker ports, nanny ports), ...]
ports: list[tuple[set[int | None], set[int | None]]] = [
Expand Down Expand Up @@ -506,6 +524,7 @@ def parse_unique(s: str | None) -> Iterator[int | None]:
more_nps = False

kwargs = []
dashboard_addresses_iter = parse_address_unique(dashboard_address)
for worker_ports_i, nanny_ports_i in ports:
if not worker_ports_i or not nanny_ports_i:
if nanny:
Expand All @@ -518,6 +537,13 @@ def parse_unique(s: str | None) -> Iterator[int | None]:
f"Not enough ports in range --worker_port {worker_port} "
f"for {n_workers} workers"
)
try:
address = next(dashboard_addresses_iter)
except StopIteration:
raise ValueError(
f"Not enough addresses in --dashboard_address {dashboard_address} "
f"for {n_workers} workers"
)

# None and int can't be sorted together,
# but None and 0 are guaranteed to be alone
Expand All @@ -528,9 +554,9 @@ def parse_unique(s: str | None) -> Iterator[int | None]:
np: Any = sorted(nanny_ports_i)
if len(np) == 1:
np = np[0]
kwargs_i = {"port": np, "worker_port": wp}
kwargs_i = {"port": np, "worker_port": wp, "dashboard_address": address}
else:
kwargs_i = {"port": wp}
kwargs_i = {"port": wp, "dashboard_address": address}

kwargs.append(kwargs_i)

Expand Down
153 changes: 99 additions & 54 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,112 +35,155 @@
[
# Single worker
(
(None, None, 1, False),
[{"port": None}],
(None, None, ":0", 1, False),
[{"port": None, "dashboard_address": ":0"}],
),
(
(None, None, 1, True),
[{"port": None, "worker_port": None}],
(None, None, ":0", 1, True),
[{"port": None, "worker_port": None, "dashboard_address": ":0"}],
),
(("123", None, 1, False), [{"port": 123}]),
(("123", None, ":0", 1, False), [{"port": 123, "dashboard_address": ":0"}]),
(
("123", None, 1, True),
[{"port": None, "worker_port": 123}],
("123", None, ":0", 1, True),
[{"port": None, "worker_port": 123, "dashboard_address": ":0"}],
),
(
(None, "456", 1, True),
[{"port": 456, "worker_port": None}],
(None, "456", ":0", 1, True),
[{"port": 456, "worker_port": None, "dashboard_address": ":0"}],
),
(
("123", "456", 1, True),
[{"port": 456, "worker_port": 123}],
("123", "456", ":0", 1, True),
[{"port": 456, "worker_port": 123, "dashboard_address": ":0"}],
),
(
("123", "456", "789", 1, True),
[{"port": 456, "worker_port": 123, "dashboard_address": ":789"}],
),
(
("123", "456", ":789", 1, True),
[{"port": 456, "worker_port": 123, "dashboard_address": ":789"}],
),
# port=None or 0 and multiple workers
(
(None, None, 2, False),
(None, None, ":0", 2, False),
[
{"port": None},
{"port": None},
{"port": None, "dashboard_address": ":0"},
{"port": None, "dashboard_address": ":0"},
],
),
(
(None, None, 2, True),
(None, None, ":0", 2, True),
[
{"port": None, "worker_port": None},
{"port": None, "worker_port": None},
{"port": None, "worker_port": None, "dashboard_address": ":0"},
{"port": None, "worker_port": None, "dashboard_address": ":0"},
],
),
(
(0, "0", 2, True),
(0, "0", ":0", 2, True),
[
{"port": 0, "worker_port": 0},
{"port": 0, "worker_port": 0},
{"port": 0, "worker_port": 0, "dashboard_address": ":0"},
{"port": 0, "worker_port": 0, "dashboard_address": ":0"},
],
),
(
("0", None, 2, True),
("0", None, ":0", 2, True),
[
{"port": None, "worker_port": 0},
{"port": None, "worker_port": 0},
{"port": None, "worker_port": 0, "dashboard_address": ":0"},
{"port": None, "worker_port": 0, "dashboard_address": ":0"},
],
),
# port ranges
(
("100:103", None, 1, False),
[{"port": [100, 101, 102, 103]}],
("100:103", None, ":0", 1, False),
[{"port": [100, 101, 102, 103], "dashboard_address": ":0"}],
),
(
("100:103", None, ":0", 2, False),
[
{
"port": [100, 102],
"dashboard_address": ":0",
}, # Round robin apportion
{"port": [101, 103], "dashboard_address": ":0"},
],
),
# multiple dashboard addresses
(
(None, None, "123,456", 1, False),
[{"port": None, "dashboard_address": ":123"}],
),
(
("100:103", None, 2, False),
(None, None, "123,456", 2, False),
[
{"port": [100, 102]}, # Round robin apportion
{"port": [101, 103]},
{"port": None, "dashboard_address": ":123"},
{"port": None, "dashboard_address": ":456"},
],
),
# port range is not an exact multiple of n_workers
(
("100:107", None, 3, False),
("100:107", None, ":0", 3, False),
[
{"port": [100, 103, 106]},
{"port": [101, 104, 107]},
{"port": [102, 105]},
{"port": [100, 103, 106], "dashboard_address": ":0"},
{"port": [101, 104, 107], "dashboard_address": ":0"},
{"port": [102, 105], "dashboard_address": ":0"},
],
),
(
("100:103", None, 2, True),
("100:103", None, ":0", 2, True),
[
{"port": None, "worker_port": [100, 102]},
{"port": None, "worker_port": [101, 103]},
{"port": None, "worker_port": [100, 102], "dashboard_address": ":0"},
{"port": None, "worker_port": [101, 103], "dashboard_address": ":0"},
],
),
(
(None, "110:113", 2, True),
(None, "110:113", ":0", 2, True),
[
{"port": [110, 112], "worker_port": None},
{"port": [111, 113], "worker_port": None},
{"port": [110, 112], "worker_port": None, "dashboard_address": ":0"},
{"port": [111, 113], "worker_port": None, "dashboard_address": ":0"},
],
),
# port ranges have different length between nannies and workers
(
("100:103", "110:114", 2, True),
("100:103", "110:114", ":0", 2, True),
[
{
"port": [110, 112, 114],
"worker_port": [100, 102],
"dashboard_address": ":0",
},
{
"port": [111, 113],
"worker_port": [101, 103],
"dashboard_address": ":0",
},
],
),
# dashboard addresses have different length from workers
(
("100:101", None, "123,456,789", 2, False),
[
{"port": [110, 112, 114], "worker_port": [100, 102]},
{"port": [111, 113], "worker_port": [101, 103]},
{"port": 100, "dashboard_address": ":123"},
{"port": 101, "dashboard_address": ":456"},
],
),
# identical port ranges
(
("100:103", "100:103", 2, True),
("100:103", "100:103", ":0", 2, True),
[
{"port": 101, "worker_port": 100},
{"port": 103, "worker_port": 102},
{"port": 101, "worker_port": 100, "dashboard_address": ":0"},
{"port": 103, "worker_port": 102, "dashboard_address": ":0"},
],
),
# overlapping port ranges
(
("100:105", "104:106", 2, True),
("100:105", "104:106", ":0", 2, True),
[
{"port": [104, 106], "worker_port": [100, 102]},
{"port": 105, "worker_port": [101, 103]},
{
"port": [104, 106],
"worker_port": [100, 102],
"dashboard_address": ":0",
},
{"port": 105, "worker_port": [101, 103], "dashboard_address": ":0"},
],
),
],
Expand All @@ -151,19 +194,21 @@ def test_apportion_ports(args, expect):

def test_apportion_ports_bad():
with pytest.raises(ValueError, match="Not enough ports in range"):
_apportion_ports("100:102", None, 4, False)
_apportion_ports("100:102", None, ":0", 4, False)
with pytest.raises(ValueError, match="Not enough ports in range"):
_apportion_ports(None, "100:102", 4, False)
_apportion_ports(None, "100:102", ":0", 4, False)
with pytest.raises(ValueError, match="Not enough ports in range"):
_apportion_ports("100:102", "100:102", 3, True)
_apportion_ports("100:102", "100:102", ":0", 3, True)
with pytest.raises(ValueError, match="Not enough ports in range"):
_apportion_ports("100:102", "102:104", 3, True)
_apportion_ports("100:102", "102:104", ":0", 3, True)
with pytest.raises(ValueError, match="port_stop must be greater than port_start"):
_apportion_ports("102:100", None, 4, False)
_apportion_ports("102:100", None, ":0", 4, False)
with pytest.raises(ValueError, match="invalid literal for int"):
_apportion_ports("foo", None, 1, False)
_apportion_ports("foo", None, ":0", 1, False)
with pytest.raises(ValueError, match="too many values to unpack"):
_apportion_ports("100:101:102", None, 1, False)
_apportion_ports("100:101:102", None, ":0", 1, False)
with pytest.raises(ValueError, match="Not enough addresses in"):
_apportion_ports(None, None, "123,456", 3, False)


@pytest.mark.slow
Expand Down
1 change: 1 addition & 0 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def _get_ip(host, port, family):
ip = sock.getsockname()[0]
return ip
except OSError as e:
print("s", host, port)
warnings.warn(
"Couldn't detect a suitable IP address for "
"reaching %r, defaulting to hostname: %s" % (host, e),
Expand Down