Skip to content

Commit eb03e62

Browse files
committed
add scheduler plugin
1 parent 9e3f745 commit eb03e62

File tree

1 file changed

+32
-0
lines changed

1 file changed

+32
-0
lines changed

python/cugraph/cugraph/testing/mg_utils.py

+32
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
from pprint import pformat
1717
import time
1818
from dask.distributed import wait, default_client
19+
import logging
20+
from distributed.diagnostics.plugin import WorkerPlugin, SchedulerPlugin
21+
from distributed.scheduler import Scheduler
1922
from dask import persist
2023
from dask.distributed import Client
2124
from dask.base import is_dask_collection
@@ -27,6 +30,32 @@
2730
import numpy as np
2831

2932

33+
class GracefullyRetireWorkers(WorkerPlugin):
34+
def __init__(self, logger):
35+
self.logger = logger
36+
self.count = 0
37+
self.key = None
38+
self.state = 1
39+
40+
async def remove_worker(self, scheduler, worker: str, *, stimulus_id, **kwargs) :
41+
print("a worker is leaving the cluster and state = ", self.state, " count = ", self.count, flush=True)
42+
#wait(scheduler.retire_workers())
43+
if self.state == -1:
44+
self.logger.critical(" Worker %s left the cluster", worker)
45+
if self.count == 0:
46+
self.logger.critical(" An error occured: retiring all workers")
47+
self.count += 1
48+
await scheduler.retire_workers()
49+
50+
def setup(self, worker):
51+
self.worker = worker
52+
53+
def transition(self, key, start, finish, *args, **kwargs):
54+
if finish in ['error', 'erred']:
55+
print("transition = ", finish)
56+
self.state = -1
57+
58+
3059
def start_dask_client(
3160
protocol=None,
3261
rmm_async=False,
@@ -157,6 +186,9 @@ def start_dask_client(
157186
num_workers = len(dask_worker_devices.split(","))
158187

159188
client.wait_for_workers(num_workers)
189+
190+
s_plugin = GracefullyRetireWorkers(logging)
191+
client.register_plugin(s_plugin)
160192
# Add a reference to tempdir_object to the client to prevent it from
161193
# being deleted when this function returns. This will be deleted in
162194
# stop_dask_client()

0 commit comments

Comments
 (0)