3
3
# Copyright (c) Jupyter Development Team.
4
4
# Distributed under the terms of the Modified BSD License.
5
5
6
+ import asyncio
6
7
import importlib
7
8
from inspect import isawaitable
8
9
from typing import Any , Dict , List , Union
11
12
import dask
12
13
from dask .utils import format_bytes
13
14
from dask .distributed import Adaptive
14
- from tornado .ioloop import IOLoop
15
- from tornado .concurrent import Future
16
15
17
16
# A type for a dask cluster model: a serializable
18
17
# representation of information about the cluster.
@@ -60,15 +59,28 @@ def __init__(self) -> None:
60
59
self ._adaptives : Dict [str , Adaptive ] = dict ()
61
60
self ._cluster_names : Dict [str , str ] = dict ()
62
61
self ._n_clusters = 0
62
+ self ._initialized = None
63
63
64
- self .initialized = Future ()
64
+ async def _async_init (self ):
65
+ """The async part of init
65
66
66
- async def start_clusters ():
67
- for model in dask .config .get ("labextension.initial" ):
68
- await self .start_cluster (configuration = model )
69
- self .initialized .set_result (self )
67
+ Invoked by `await manager`
68
+ """
69
+ for model in dask .config .get ("labextension.initial" ):
70
+ await self .start_cluster (configuration = model )
71
+ return self
70
72
71
- IOLoop .current ().add_callback (start_clusters )
73
+ @property
74
+ def initialized (self ):
75
+ """Don't create initialization task until it's been requested
76
+
77
+ typically via `await manager`
78
+
79
+ Makes it easier to ensure we don't do anything before we are in the event loop.
80
+ """
81
+ if self ._initialized is None :
82
+ self ._initialized = asyncio .create_task (self ._async_init ())
83
+ return self ._initialized
72
84
73
85
async def start_cluster (
74
86
self , cluster_id : str = "" , configuration : dict = {}
@@ -121,7 +133,9 @@ async def close_cluster(self, cluster_id: str) -> Union[ClusterModel, None]:
121
133
"""
122
134
cluster = self ._clusters .get (cluster_id )
123
135
if cluster :
124
- await cluster .close ()
136
+ r = cluster .close ()
137
+ if isawaitable (r ):
138
+ await r
125
139
self ._clusters .pop (cluster_id )
126
140
name = self ._cluster_names .pop (cluster_id )
127
141
adaptive = self ._adaptives .pop (cluster_id , None )
@@ -130,7 +144,7 @@ async def close_cluster(self, cluster_id: str) -> Union[ClusterModel, None]:
130
144
else :
131
145
return None
132
146
133
- def get_cluster (self , cluster_id ) -> Union [ClusterModel , None ]:
147
+ async def get_cluster (self , cluster_id ) -> Union [ClusterModel , None ]:
134
148
"""
135
149
Get a Dask cluster model.
136
150
@@ -151,7 +165,7 @@ def get_cluster(self, cluster_id) -> Union[ClusterModel, None]:
151
165
152
166
return make_cluster_model (cluster_id , name , cluster , adaptive )
153
167
154
- def list_clusters (self ) -> List [ClusterModel ]:
168
+ async def list_clusters (self ) -> List [ClusterModel ]:
155
169
"""
156
170
List the Dask cluster models known to the manager.
157
171
@@ -188,7 +202,7 @@ async def scale_cluster(self, cluster_id: str, n: int) -> Union[ClusterModel, No
188
202
await t
189
203
return make_cluster_model (cluster_id , name , cluster , adaptive = None )
190
204
191
- def adapt_cluster (
205
+ async def adapt_cluster (
192
206
self , cluster_id : str , minimum : int , maximum : int
193
207
) -> Union [ClusterModel , None ]:
194
208
cluster = self ._clusters .get (cluster_id )
@@ -290,8 +304,3 @@ def make_cluster_model(
290
304
model ["adapt" ] = {"minimum" : adaptive .minimum , "maximum" : adaptive .maximum }
291
305
292
306
return model
293
-
294
-
295
- # Create a default cluster manager
296
- # to keep track of clusters.
297
- manager = DaskClusterManager ()
0 commit comments