Skip to content

Commit 48fcf48

Browse files
authored
Reduce size of scheduler_info (#9045)
1 parent 08f01fa commit 48fcf48

File tree

5 files changed

+39
-16
lines changed

5 files changed

+39
-16
lines changed

distributed/client.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,7 +1298,7 @@ def dashboard_link(self):
12981298
try:
12991299
return self.cluster.dashboard_link
13001300
except AttributeError:
1301-
scheduler, info = self._get_scheduler_info()
1301+
scheduler, info = self._get_scheduler_info(n_workers=0)
13021302
if scheduler is None:
13031303
return None
13041304
else:
@@ -1312,20 +1312,20 @@ def dashboard_link(self):
13121312

13131313
return format_dashboard_link(host, port)
13141314

1315-
def _get_scheduler_info(self):
1315+
def _get_scheduler_info(self, n_workers):
13161316
from distributed.scheduler import Scheduler
13171317

13181318
if (
13191319
self.cluster
13201320
and hasattr(self.cluster, "scheduler")
13211321
and isinstance(self.cluster.scheduler, Scheduler)
13221322
):
1323-
info = self.cluster.scheduler.identity()
1323+
info = self.cluster.scheduler.identity(n_workers=n_workers)
13241324
scheduler = self.cluster.scheduler
13251325
elif (
13261326
self._loop_runner.is_started() and self.scheduler and not self.asynchronous
13271327
):
1328-
info = sync(self.loop, self.scheduler.identity)
1328+
info = sync(self.loop, self.scheduler.identity, n_workers=n_workers)
13291329
scheduler = self.scheduler
13301330
else:
13311331
info = self._scheduler_identity
@@ -1368,7 +1368,7 @@ def _repr_html_(self):
13681368
except PackageNotFoundError:
13691369
JUPYTERLAB = False
13701370

1371-
scheduler, info = self._get_scheduler_info()
1371+
scheduler, info = self._get_scheduler_info(n_workers=5)
13721372

13731373
return get_template("client.html.j2").render(
13741374
id=self.id,
@@ -1585,18 +1585,20 @@ async def _ensure_connected(self, timeout=None):
15851585

15861586
logger.debug("Started scheduling coroutines. Synchronized")
15871587

1588-
async def _update_scheduler_info(self):
1588+
async def _update_scheduler_info(self, n_workers=5):
15891589
if self.status not in ("running", "connecting") or self.scheduler is None:
15901590
return
15911591
try:
1592-
self._scheduler_identity = SchedulerInfo(await self.scheduler.identity())
1592+
self._scheduler_identity = SchedulerInfo(
1593+
await self.scheduler.identity(n_workers=n_workers)
1594+
)
15931595
except OSError:
15941596
logger.debug("Not able to query scheduler for identity")
15951597

15961598
async def _wait_for_workers(
15971599
self, n_workers: int, timeout: float | None = None
15981600
) -> None:
1599-
info = await self.scheduler.identity()
1601+
info = await self.scheduler.identity(n_workers=-1)
16001602
self._scheduler_identity = SchedulerInfo(info)
16011603
if timeout:
16021604
deadline = time() + parse_timedelta(timeout)
@@ -1619,7 +1621,7 @@ def running_workers(info):
16191621
% (running_workers(info), n_workers, timeout)
16201622
)
16211623
await asyncio.sleep(0.1)
1622-
info = await self.scheduler.identity()
1624+
info = await self.scheduler.identity(n_workers=-1)
16231625
self._scheduler_identity = SchedulerInfo(info)
16241626

16251627
def wait_for_workers(self, n_workers: int, timeout: float | None = None) -> None:
@@ -4407,11 +4409,14 @@ async def _profile(
44074409
else:
44084410
return state
44094411

4410-
def scheduler_info(self, **kwargs):
4412+
def scheduler_info(self, n_workers: int = 5, **kwargs: Any) -> SchedulerInfo:
44114413
"""Basic information about the workers in the cluster
44124414
44134415
Parameters
44144416
----------
4417+
n_workers: int
4418+
The number of workers for which to fetch information. To fetch all,
4419+
use -1.
44154420
**kwargs : dict
44164421
Optional keyword arguments for the remote function
44174422
@@ -4429,7 +4434,7 @@ def scheduler_info(self, **kwargs):
44294434
'time-delay': 0.0061032772064208984}}}
44304435
"""
44314436
if not self.asynchronous:
4432-
self.sync(self._update_scheduler_info)
4437+
self.sync(self._update_scheduler_info, n_workers=n_workers)
44334438
return self._scheduler_identity
44344439

44354440
def dump_cluster_state(

distributed/scheduler.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1663,6 +1663,8 @@ class SchedulerState:
16631663
idle_task_count: set[WorkerState]
16641664
#: Workers that are fully utilized. May include non-running workers.
16651665
saturated: set[WorkerState]
1666+
#: Current total memory across all workers (sum over memory_limit)
1667+
total_memory: int
16661668
#: Current number of threads across all workers
16671669
total_nthreads: int
16681670
#: History of number of threads
@@ -1787,6 +1789,7 @@ def __init__(
17871789
self.task_groups = {}
17881790
self.task_prefixes = {}
17891791
self.task_metadata = {}
1792+
self.total_memory = 0
17901793
self.total_nthreads = 0
17911794
self.total_nthreads_history = [(time(), 0)]
17921795
self.queued = queued
@@ -4084,16 +4087,22 @@ def _repr_html_(self) -> str:
40844087
tasks=self.tasks,
40854088
)
40864089

4087-
def identity(self) -> dict[str, Any]:
4090+
def identity(self, n_workers: int = -1) -> dict[str, Any]:
40884091
"""Basic information about ourselves and our cluster"""
4092+
if n_workers == -1:
4093+
n_workers = len(self.workers)
40894094
d = {
40904095
"type": type(self).__name__,
40914096
"id": str(self.id),
40924097
"address": self.address,
40934098
"services": {key: v.port for (key, v) in self.services.items()},
40944099
"started": self.time_started,
4100+
"n_workers": len(self.workers),
4101+
"total_threads": self.total_nthreads,
4102+
"total_memory": self.total_memory,
40954103
"workers": {
4096-
worker.address: worker.identity() for worker in self.workers.values()
4104+
worker.address: worker.identity()
4105+
for worker in itertools.islice(self.workers.values(), n_workers)
40974106
},
40984107
}
40994108
return d
@@ -4544,6 +4553,7 @@ async def add_worker(
45444553
dh_addresses.add(address)
45454554
dh["nthreads"] += nthreads
45464555

4556+
self.total_memory += ws.memory_limit
45474557
self.total_nthreads += nthreads
45484558
self.total_nthreads_history.append((time(), self.total_nthreads))
45494559
self.aliases[name] = address
@@ -5455,6 +5465,7 @@ async def remove_worker(
54555465
dh_addresses: set = dh["addresses"]
54565466
dh_addresses.remove(address)
54575467
dh["nthreads"] -= ws.nthreads
5468+
self.total_memory -= ws.memory_limit
54585469
self.total_nthreads -= ws.nthreads
54595470
self.total_nthreads_history.append((time(), self.total_nthreads))
54605471
if not dh_addresses:

distributed/tests/test_client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3923,6 +3923,10 @@ def test_scheduler_info(c):
39233923
assert isinstance(info, dict)
39243924
assert len(info["workers"]) == 2
39253925
assert isinstance(info["started"], float)
3926+
info = c.scheduler_info(n_workers=1)
3927+
assert len(info["workers"]) == 1
3928+
info = c.scheduler_info(n_workers=-1)
3929+
assert len(info["workers"]) == 2
39263930

39273931

39283932
def test_write_scheduler_file(c, loop):

distributed/tests/test_scheduler.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4301,6 +4301,9 @@ async def test_Scheduler__to_dict(c, s, a):
43014301
"extensions",
43024302
"services",
43034303
"started",
4304+
"n_workers",
4305+
"total_threads",
4306+
"total_memory",
43044307
"workers",
43054308
"status",
43064309
"thread_id",

distributed/widgets/templates/scheduler_info.html.j2

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,23 @@
1010
<strong>Comm:</strong> {{ address }}
1111
</td>
1212
<td style="text-align: left;">
13-
<strong>Workers:</strong> {{ workers | length }}
13+
<strong>Workers:</strong> {{ n_workers }} {% if n_workers > workers | length %} (shown below: {{ workers | length }}) {% endif %}
1414
</td>
1515
</tr>
1616
<tr>
1717
<td style="text-align: left;">
1818
<strong>Dashboard:</strong> <a href="{{ scheduler | format_dashboard_address }}" target="_blank">{{ scheduler | format_dashboard_address }}</a>
1919
</td>
2020
<td style="text-align: left;">
21-
<strong>Total threads:</strong> {{ workers.values() | map(attribute='nthreads') | sum }}
21+
<strong>Total threads:</strong> {{ total_threads }}
2222
</td>
2323
</tr>
2424
<tr>
2525
<td style="text-align: left;">
2626
<strong>Started:</strong> {{ started | datetime_from_timestamp | format_time_ago }}
2727
</td>
2828
<td style="text-align: left;">
29-
<strong>Total memory:</strong> {{ workers.values() | map(attribute='memory_limit') | sum | format_bytes }}
29+
<strong>Total memory:</strong> {{ total_memory | format_bytes }}
3030
</td>
3131
</tr>
3232
</table>

0 commit comments

Comments
 (0)