|
64 | 64 | parse_timedelta, |
65 | 65 | ) |
66 | 66 |
|
| 67 | +from distributed.core import Status |
67 | 68 | from distributed.dashboard.components import add_periodic_callback |
68 | 69 | from distributed.dashboard.components.shared import ( |
69 | 70 | DashboardComponent, |
@@ -239,20 +240,63 @@ def update(self): |
239 | 240 | self.source.data.update({"left": x[:-1], "right": x[1:], "top": counts}) |
240 | 241 |
|
241 | 242 |
|
242 | | -def _memory_color(current: int, limit: int) -> str: |
243 | | - """Dynamic color used by WorkersMemory and ClusterMemory""" |
244 | | - if limit and current > limit: |
245 | | - return "red" |
246 | | - if limit and current > limit / 2: |
247 | | - return "orange" |
248 | | - return "blue" |
| 243 | +class MemoryColor: |
| 244 | + """Change the color of the memory bars from blue to orange when process memory goes |
| 245 | + above the ``target`` threshold and to red when the worker pauses. |
| 246 | + Workers in ``closing_gracefully`` state will also be orange. |
249 | 247 |
|
| 248 | + If ``target`` is disabled, change to orange on ``spill`` instead. |
| 249 | + If spilling is completely disabled, never turn orange. |
250 | 250 |
|
251 | | -class ClusterMemory(DashboardComponent): |
| 251 | + If pausing is disabled, change to red when passing the ``terminate`` threshold |
| 252 | + instead. If both pause and terminate are disabled, turn red when passing |
| 253 | + ``memory_limit``. |
| 254 | +
|
| 255 | + Note |
| 256 | + ---- |
| 257 | + A worker will start spilling when managed memory alone passes the target threshold. |
| 258 | + However, here we're switching to orange when the process memory goes beyond target, |
| 259 | + which is usually earlier. |
| 260 | + This is deliberate for the sake of simplicity and also because, when the process |
| 261 | + memory passes the spill threshold, it will keep spilling until it falls below the |
| 262 | + target threshold - so it's not completely wrong. Again, we don't want to track |
| 263 | + the hysteresis cycle of the spill system here for the sake of simplicity. |
| 264 | +
|
| 265 | + In short, orange should be treated as "the worker *may* be spilling". |
| 266 | + """ |
| 267 | + |
| 268 | + orange: float |
| 269 | + red: float |
| 270 | + |
| 271 | + def __init__(self): |
| 272 | + target = dask.config.get("distributed.worker.memory.target") |
| 273 | + spill = dask.config.get("distributed.worker.memory.spill") |
| 274 | + terminate = dask.config.get("distributed.worker.memory.terminate") |
| 275 | + # These values can be False. It's also common to configure them to impossibly |
| 276 | + # high values to achieve the same effect. |
| 277 | + self.orange = min(target or math.inf, spill or math.inf) |
| 278 | + self.red = min(terminate or math.inf, 1.0) |
| 279 | + |
| 280 | + def _memory_color(self, current: int, limit: int, status: Status) -> str: |
| 281 | + if status != Status.running: |
| 282 | + return "red" |
| 283 | + if not limit: |
| 284 | + return "blue" |
| 285 | + if current >= limit * self.red: |
| 286 | + return "red" |
| 287 | + if current >= limit * self.orange: |
| 288 | + return "orange" |
| 289 | + return "blue" |
| 290 | + |
| 291 | + |
| 292 | +class ClusterMemory(DashboardComponent, MemoryColor): |
252 | 293 | """Total memory usage on the cluster""" |
253 | 294 |
|
254 | 295 | @log_errors |
255 | 296 | def __init__(self, scheduler, width=600, **kwargs): |
| 297 | + DashboardComponent.__init__(self) |
| 298 | + MemoryColor.__init__(self) |
| 299 | + |
256 | 300 | self.scheduler = scheduler |
257 | 301 | self.source = ColumnDataSource( |
258 | 302 | { |
@@ -327,12 +371,30 @@ def __init__(self, scheduler, width=600, **kwargs): |
327 | 371 | ) |
328 | 372 | self.root.add_tools(hover) |
329 | 373 |
|
| 374 | + def _cluster_memory_color(self) -> str: |
| 375 | + colors = { |
| 376 | + self._memory_color( |
| 377 | + current=ws.memory.process, |
| 378 | + limit=getattr(ws, "memory_limit", 0), |
| 379 | + status=ws.status, |
| 380 | + ) |
| 381 | + for ws in self.scheduler.workers.values() |
| 382 | + } |
| 383 | + |
| 384 | + assert colors.issubset({"red", "orange", "blue"}) |
| 385 | + if "red" in colors: |
| 386 | + return "red" |
| 387 | + elif "orange" in colors: |
| 388 | + return "orange" |
| 389 | + else: |
| 390 | + return "blue" |
| 391 | + |
330 | 392 | @without_property_validation |
331 | 393 | @log_errors |
332 | 394 | def update(self): |
333 | 395 | limit = sum(ws.memory_limit for ws in self.scheduler.workers.values()) |
334 | 396 | meminfo = self.scheduler.memory |
335 | | - color = _memory_color(meminfo.process, limit) |
| 397 | + color = self._cluster_memory_color() |
336 | 398 |
|
337 | 399 | width = [ |
338 | 400 | meminfo.managed_in_memory, |
@@ -363,11 +425,14 @@ def update(self): |
363 | 425 | update(self.source, result) |
364 | 426 |
|
365 | 427 |
|
366 | | -class WorkersMemory(DashboardComponent): |
| 428 | +class WorkersMemory(DashboardComponent, MemoryColor): |
367 | 429 | """Memory usage for single workers""" |
368 | 430 |
|
369 | 431 | @log_errors |
370 | 432 | def __init__(self, scheduler, width=600, **kwargs): |
| 433 | + DashboardComponent.__init__(self) |
| 434 | + MemoryColor.__init__(self) |
| 435 | + |
371 | 436 | self.scheduler = scheduler |
372 | 437 | self.source = ColumnDataSource( |
373 | 438 | { |
@@ -477,7 +542,7 @@ def quadlist(i: Iterable[T]) -> list[T]: |
477 | 542 | meminfo = ws.memory |
478 | 543 | limit = getattr(ws, "memory_limit", 0) |
479 | 544 | max_limit = max(max_limit, limit, meminfo.process + meminfo.managed_spilled) |
480 | | - color_i = _memory_color(meminfo.process, limit) |
| 545 | + color_i = self._memory_color(meminfo.process, limit, ws.status) |
481 | 546 |
|
482 | 547 | width += [ |
483 | 548 | meminfo.managed_in_memory, |
|
0 commit comments