diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index 3f619641004..53d51d3df4e 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -95,6 +95,7 @@ def __init__( self.cluster = cluster self.worker_key = worker_key self._workers_to_close_kwargs = kwargs + self._worker_name_mapping = {} if interval is None: interval = dask.config.get("distributed.adaptive.interval") @@ -131,6 +132,26 @@ def requested(self): def observed(self): return self.cluster.observed + @property + def observed_name_mapped(self): + self._assign_hosts_to_names() + return self._worker_name_mapping + + def _assign_hosts_to_names(self) -> None: + unassigned_worker_names = self._unassigned_worker_names() + for worker_address in self.cluster.scheduler_info["workers"].keys(): + if worker_address not in self._worker_name_mapping.values(): + assert unassigned_worker_names + self._worker_name_mapping[ + unassigned_worker_names.pop() + ] = worker_address + for worker_name, worker_address in self._worker_name_mapping.copy().items(): + if worker_address not in self.cluster.scheduler_info["workers"].keys(): + del self._worker_name_mapping[worker_name] + + def _unassigned_worker_names(self) -> set: + return self.requested - self._worker_name_mapping.keys() + async def target(self): """ Determine target number of workers that should exist. diff --git a/distributed/deploy/adaptive_core.py b/distributed/deploy/adaptive_core.py index 0543e117b69..e965001c58f 100644 --- a/distributed/deploy/adaptive_core.py +++ b/distributed/deploy/adaptive_core.py @@ -87,6 +87,7 @@ class AdaptiveCore: plan: set[WorkerState] requested: set[WorkerState] observed: set[WorkerState] + observed_name_mapped: dict[str, str] close_counts: defaultdict[WorkerState, int] _adapting: bool log: deque[tuple[float, dict]] @@ -130,6 +131,7 @@ async def _adapt(): self.plan = set() self.requested = set() self.observed = set() + self.observed_name_mapped = {} except Exception: pass @@ -181,7 +183,7 @@ async def recommendations(self, target: int) -> dict: """ plan = self.plan requested = self.requested - observed = self.observed + observed = self.observed_name_mapped if target == len(plan): self.close_counts.clear() @@ -192,14 +194,16 @@ async def recommendations(self, target: int) -> dict: return {"status": "up", "n": target} # target < len(plan) - not_yet_arrived = requested - observed + not_yet_arrived = requested - observed.keys() to_close = set() if not_yet_arrived: to_close.update(toolz.take(len(plan) - target, not_yet_arrived)) if target < len(plan) - len(to_close): L = await self.workers_to_close(target=target) - to_close.update(L) + to_close.update( + [key for key, value in observed.items() for name in L if value == name] + ) firmly_close = set() for w in to_close: