Skip to content

Commit 67d693d

Browse files
Remove calls to self.cluster.call_async() (#456)
* Add call_async method to VMInterface * Switch to VMInterface.call_async
1 parent 0e2bdbf commit 67d693d

File tree

6 files changed

+31
-27
lines changed

6 files changed

+31
-27
lines changed

dask_cloudprovider/azure/azurevm.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def __init__(
7979

8080
async def create_vm(self):
8181
if not self.subnet:
82-
[subnet_info, *_] = await self.cluster.call_async(
82+
[subnet_info, *_] = await self.call_async(
8383
self.cluster.network_client.subnets.list,
8484
self.cluster.resource_group,
8585
self.vnet,
@@ -96,7 +96,7 @@ async def create_vm(self):
9696
],
9797
"networkSecurityGroup": {
9898
"id": (
99-
await self.cluster.call_async(
99+
await self.call_async(
100100
self.cluster.network_client.network_security_groups.get,
101101
self.cluster.resource_group,
102102
self.security_group,
@@ -107,7 +107,7 @@ async def create_vm(self):
107107
"tags": self.cluster.get_tags(),
108108
}
109109
if self.public_ingress:
110-
self.public_ip = await self.cluster.call_async(
110+
self.public_ip = await self.call_async(
111111
self.cluster.network_client.public_ip_addresses.begin_create_or_update(
112112
self.cluster.resource_group,
113113
self.nic_name,
@@ -124,7 +124,7 @@ async def create_vm(self):
124124
"id": self.public_ip.id
125125
}
126126
self.cluster._log("Assigned public IP")
127-
self.nic = await self.cluster.call_async(
127+
self.nic = await self.call_async(
128128
self.cluster.network_client.network_interfaces.begin_create_or_update(
129129
self.cluster.resource_group,
130130
self.nic_name,
@@ -198,17 +198,17 @@ async def create_vm(self):
198198
self.cluster._log(
199199
f"with parameters\n{json.dumps(vm_parameters, sort_keys=True, indent=2)}"
200200
)
201-
await self.cluster.call_async(
201+
await self.call_async(
202202
self.cluster.compute_client.virtual_machines.begin_create_or_update(
203203
self.cluster.resource_group, self.name, vm_parameters
204204
).wait
205205
)
206-
self.vm = await self.cluster.call_async(
206+
self.vm = await self.call_async(
207207
self.cluster.compute_client.virtual_machines.get,
208208
self.cluster.resource_group,
209209
self.name,
210210
)
211-
self.nic = await self.cluster.call_async(
211+
self.nic = await self.call_async(
212212
self.cluster.network_client.network_interfaces.get,
213213
self.cluster.resource_group,
214214
self.nic.name,
@@ -222,32 +222,32 @@ async def create_vm(self):
222222
return private_ip_address, None
223223

224224
async def destroy_vm(self):
225-
await self.cluster.call_async(
225+
await self.call_async(
226226
self.cluster.compute_client.virtual_machines.begin_delete(
227227
self.cluster.resource_group, self.name
228228
).wait
229229
)
230230
self.cluster._log(f"Terminated VM {self.name}")
231-
for disk in await self.cluster.call_async(
231+
for disk in await self.call_async(
232232
self.cluster.compute_client.disks.list_by_resource_group,
233233
self.cluster.resource_group,
234234
):
235235
if self.name in disk.name:
236-
await self.cluster.call_async(
236+
await self.call_async(
237237
self.cluster.compute_client.disks.begin_delete(
238238
self.cluster.resource_group,
239239
disk.name,
240240
).wait
241241
)
242242
self.cluster._log(f"Removed disks for VM {self.name}")
243-
await self.cluster.call_async(
243+
await self.call_async(
244244
self.cluster.network_client.network_interfaces.begin_delete(
245245
self.cluster.resource_group, self.nic.name
246246
).wait
247247
)
248248
self.cluster._log("Deleted network interface")
249249
if self.public_ingress:
250-
await self.cluster.call_async(
250+
await self.call_async(
251251
self.cluster.network_client.public_ip_addresses.begin_delete(
252252
self.cluster.resource_group, self.public_ip.name
253253
).wait

dask_cloudprovider/digitalocean/droplet.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,20 @@ async def create_vm(self):
5656
backups=False,
5757
user_data=self.cluster.render_process_cloud_init(self),
5858
)
59-
await self.cluster.call_async(self.droplet.create)
59+
await self.call_async(self.droplet.create)
6060
for action in self.droplet.get_actions():
6161
while action.status != "completed":
6262
action.load()
6363
await asyncio.sleep(0.1)
6464
while self.droplet.ip_address is None:
65-
await self.cluster.call_async(self.droplet.load)
65+
await self.call_async(self.droplet.load)
6666
await asyncio.sleep(0.1)
6767
self.cluster._log(f"Created droplet {self.name}")
6868

6969
return self.droplet.ip_address, None
7070

7171
async def destroy_vm(self):
72-
await self.cluster.call_async(self.droplet.destroy)
72+
await self.call_async(self.droplet.destroy)
7373
self.cluster._log(f"Terminated droplet {self.name}")
7474

7575

dask_cloudprovider/gcp/instances.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ async def create_vm(self):
206206
self.gcp_config = self.create_gcp_config()
207207

208208
try:
209-
inst = await self.cluster.call_async(
209+
inst = await self.call_async(
210210
self.cluster.compute.instances()
211211
.insert(project=self.projectid, zone=self.zone, body=self.gcp_config)
212212
.execute
@@ -232,7 +232,7 @@ async def create_vm(self):
232232

233233
async def get_internal_ip(self):
234234
return (
235-
await self.cluster.call_async(
235+
await self.call_async(
236236
self.cluster.compute.instances()
237237
.list(
238238
project=self.projectid, zone=self.zone, filter=f"name={self.name}"
@@ -243,7 +243,7 @@ async def get_internal_ip(self):
243243

244244
async def get_external_ip(self):
245245
return (
246-
await self.cluster.call_async(
246+
await self.call_async(
247247
self.cluster.compute.instances()
248248
.list(
249249
project=self.projectid, zone=self.zone, filter=f"name={self.name}"
@@ -253,7 +253,7 @@ async def get_external_ip(self):
253253
)["items"][0]["networkInterfaces"][0]["accessConfigs"][0]["natIP"]
254254

255255
async def update_status(self):
256-
d = await self.cluster.call_async(
256+
d = await self.call_async(
257257
self.cluster.compute.instances()
258258
.list(project=self.projectid, zone=self.zone, filter=f"name={self.name}")
259259
.execute
@@ -276,7 +276,7 @@ def expand_source_image(self, source_image):
276276

277277
async def close(self):
278278
self.cluster._log(f"Closing Instance: {self.name}")
279-
await self.cluster.call_async(
279+
await self.call_async(
280280
self.cluster.compute.instances()
281281
.delete(project=self.projectid, zone=self.zone, instance=self.name)
282282
.execute

dask_cloudprovider/generic/vmcluster.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ async def close(self):
6363
await self.destroy_vm()
6464
await super().close()
6565

66+
async def call_async(self, f, *args, **kwargs):
67+
"""Run a blocking function in a thread as a coroutine."""
68+
return await self.call_async(f, *args, **kwargs)
69+
6670

6771
class SchedulerMixin(object):
6872
"""A mixin for Schedulers."""

dask_cloudprovider/hetzner/vserver.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def __init__(
5151
self.docker_image = docker_image
5252

5353
async def create_vm(self):
54-
await self.cluster.call_async(
54+
await self.call_async(
5555
self.client.servers.create,
5656
server_type=self.server_type,
5757
image=self.image,
@@ -62,14 +62,14 @@ async def create_vm(self):
6262
self.server = self.client.servers.get_by_name(self.name)
6363
for action in self.server.get_actions():
6464
while action.status != Action.STATUS_SUCCESS:
65-
await self.cluster.call_async(action.reload)
65+
await self.call_async(action.reload)
6666
await asyncio.sleep(0.1)
6767
self.cluster._log(f"Created Hetzner vServer {self.name}")
6868

6969
return self.server.public_net.ipv4.ip, None
7070

7171
async def destroy_vm(self):
72-
await self.cluster.call_async(self.client.servers.delete, server=self.server)
72+
await self.call_async(self.client.servers.delete, server=self.server)
7373
self.cluster._log(f"Terminated vServer {self.name}")
7474

7575

dask_cloudprovider/openstack/instances.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,13 @@ async def create_and_assign_floating_ip(self, conn):
111111
"""Create and assign a floating IP to the instance."""
112112
try:
113113
# Create a floating IP
114-
floating_ip = await self.cluster.call_async(
114+
floating_ip = await self.call_async(
115115
conn.network.create_ip,
116116
floating_network_id=self.config["external_network_id"],
117117
)
118118

119119
# Find the first port of the instance
120-
ports = await self.cluster.call_async(
120+
ports = await self.call_async(
121121
conn.network.ports,
122122
device_id=self.instance.id
123123
)
@@ -126,7 +126,7 @@ async def create_and_assign_floating_ip(self, conn):
126126
raise RuntimeError(f"No network ports found for instance {self.instance.id}")
127127

128128
# Assign the floating IP to the instance's port
129-
await self.cluster.call_async(
129+
await self.call_async(
130130
conn.network.update_ip,
131131
floating_ip,
132132
port_id=ports[0].id
@@ -170,7 +170,7 @@ async def destroy_vm(self):
170170
try:
171171
instance = conn.compute.get_server(self.instance.id)
172172
if instance:
173-
await self.cluster.call_async(conn.compute.delete_server, instance.id)
173+
await self.call_async(conn.compute.delete_server, instance.id)
174174
self.cluster._log(f"Terminated instance {self.name}")
175175
else:
176176
self.cluster._log(f"Instance {self.name} not found or already deleted.")

0 commit comments

Comments
 (0)