Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions dask_cloudprovider/openstack/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,31 +110,32 @@ async def get_external_ip(self, conn):
async def create_and_assign_floating_ip(self, conn):
"""Create and assign a floating IP to the instance."""
try:
loop = asyncio.get_event_loop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be grabbing the loop here like this. Why are we changing from the standard call_async mechanism that Dask uses internally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, you are right. Let me give you a recap on why I implemented this in the first place;

I wanted to use OpenStackScheduler/OpenStackWorker on their own (without spinning up a full OpenStackCluster), but I realized that if I use cluster.call_async without a cluster, Python quite naturally complains about “different event loop.” 😄

I completely understand that this workaround (grabbing the loop and running it directly in an executor) is a bit of a hack—please let me know if you’d prefer a different approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really support using ProcessInterface subclasses outside of a Cluster. What is your use case for doing this?

Copy link
Contributor Author

@armagankaratosun armagankaratosun Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a use case where I have to provision a Dask cluster on two different OpenStack infrastructures (and they will communicate over the floating IPs). The trick here is that I only want to provision one scheduler on one of these clouds, and the workers should enroll themselves to it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Instead of changing this logic for everyone I've opened #456 which means that you could subclass OpenStackInstance in your code and just override call_async with your own logic here. That way you have the flexibility to do what you need, but without changing this functionality for all users.

# Create a floating IP
floating_ip = await self.cluster.call_async(
conn.network.create_ip,
floating_network_id=self.config["external_network_id"],
floating_ip = await loop.run_in_executor(
None,
lambda: conn.network.create_ip(
floating_network_id=self.config["external_network_id"]
)
)

# Find the first port of the instance
ports = await self.cluster.call_async(
conn.network.ports,
device_id=self.instance.id
ports = await loop.run_in_executor(
None,
lambda: list(conn.network.ports(device_id=self.instance.id))
)
ports = list(ports)
if not ports:
raise RuntimeError(f"No network ports found for instance {self.instance.id}")

# Assign the floating IP to the instance's port
await self.cluster.call_async(
conn.network.update_ip,
floating_ip,
port_id=ports[0].id
await loop.run_in_executor(
None,
lambda: conn.network.update_ip(floating_ip, port_id=ports[0].id)
)

return floating_ip.floating_ip_address
except Exception as e:
self.cluster._log(f"Failed to create or assign floating IP: {str(e)}")
self.cluster._log(f"Failed to create or assign floating IP: {e}")
return None

async def destroy_vm(self):
Expand Down
Loading