diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e5bd745d..bd9dd696 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -16,15 +16,6 @@ jobs: uses: actions/checkout@v4 - name: Run ruff and mypy checks run: tox -e ruff,mypy - py38: - runs-on: ubuntu-20.04 - steps: - - name: Install dependencies - run: sudo DEBIAN_FRONTEND=noninteractive apt-get -qy install tox - - name: Git checkout - uses: actions/checkout@v4 - - name: Run tox - run: tox -e py38 py310: runs-on: ubuntu-22.04 steps: diff --git a/.github/workflows/main_check.yaml b/.github/workflows/main_check.yaml index c5679fd4..106490e3 100644 --- a/.github/workflows/main_check.yaml +++ b/.github/workflows/main_check.yaml @@ -4,6 +4,7 @@ on: push: branches: - main + - 'v*' jobs: post-merge-tests: diff --git a/.github/workflows/version_check.yaml b/.github/workflows/version_check.yaml index 43c67c8a..ca0b747e 100644 --- a/.github/workflows/version_check.yaml +++ b/.github/workflows/version_check.yaml @@ -1,6 +1,8 @@ name: Check Semantic Versioning on: - - pull_request + pull_request: + branches: + - main jobs: version-check: diff --git a/VERSION b/VERSION index 431264f2..d04af6af 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1!10.9.1 +1!11.0.0 diff --git a/examples/az.py b/examples/az.py index 7903ee87..5e781211 100644 --- a/examples/az.py +++ b/examples/az.py @@ -5,7 +5,7 @@ import logging import pycloudlib -from pycloudlib.cloud import ImageType +from pycloudlib.types import ImageType cloud_config = """#cloud-config runcmd: @@ -109,6 +109,29 @@ def demo_pro_fips(): print(instance.execute("sudo ua status --wait")) +def demo_pro_fips_updates(): + """Show example of launchig a Ubuntu PRO FIPS image through Azure.""" + with pycloudlib.Azure(tag="azure") as client: + image_id = client.daily_image(release="jammy", image_type=ImageType.PRO_FIPS_UPDATES) + + pub_key, priv_key = client.create_key_pair(key_name="test_pro_fips") + pub_path, priv_path = save_keys( + key_name="test_pro_fips", + pub_key=pub_key, + priv_key=priv_key, + ) + client.use_key(pub_path, priv_path) + + print("Launching Focal Pro FIPS Updates instance.") + with client.launch( + image_id=image_id, + instance_type="Standard_DS2_v2", # default is Standard_DS1_v2 + ) as instance: + instance.wait() + print(instance.ip) + print(instance.execute("sudo ua status --wait")) + + if __name__ == "__main__": # Avoid polluting the log with azure info logging.getLogger("adal-python").setLevel(logging.WARNING) @@ -118,3 +141,4 @@ def demo_pro_fips(): demo() demo_pro() demo_pro_fips() + demo_pro_fips_updates() diff --git a/examples/ec2.py b/examples/ec2.py index 659e8f2a..b5b822c6 100755 --- a/examples/ec2.py +++ b/examples/ec2.py @@ -6,7 +6,7 @@ import os import pycloudlib -from pycloudlib.cloud import ImageType +from pycloudlib.types import ImageType def hot_add(ec2, daily): @@ -92,22 +92,13 @@ def launch_basic(ec2, daily): print(instance.availability_zone) -def launch_pro(ec2, daily): - """Show basic functionality on PRO instances.""" - print("Launching Pro instance...") - with ec2.launch(daily) as instance: - instance.wait() - print(instance.execute("sudo ua status --wait")) - print("Deleting Pro instance...") - - -def launch_pro_fips(ec2, daily): - """Show basic functionality on PRO instances.""" - print("Launching Pro FIPS instance...") - with ec2.launch(daily) as instance: +def launch_pro(ec2, name, image): + """Show basic functionality on Pro instances.""" + print("Launching {} instance...".format(name)) + with ec2.launch(image) as instance: instance.wait() - print(instance.execute("sudo ua status --wait")) - print("Deleting Pro FIPS instance...") + print(instance.execute("sudo pro status --wait")) + print("Deleting {} instance...".format(name)) def handle_ssh_key(ec2, key_name): @@ -140,13 +131,17 @@ def demo(): key_name = "test-ec2" handle_ssh_key(ec2, key_name) - daily = ec2.daily_image(release="bionic") - daily_pro = ec2.daily_image(release="bionic", image_type=ImageType.PRO) - daily_pro_fips = ec2.daily_image(release="bionic", image_type=ImageType.PRO_FIPS) + daily = ec2.daily_image(release="focal") + daily_pro = ec2.daily_image(release="focal", image_type=ImageType.PRO) + daily_pro_fips = ec2.daily_image(release="focal", image_type=ImageType.PRO_FIPS) + daily_pro_fips_updates = ec2.daily_image( + release="focal", image_type=ImageType.PRO_FIPS_UPDATES + ) launch_basic(ec2, daily) - launch_pro(ec2, daily_pro) - launch_pro_fips(ec2, daily_pro_fips) + launch_pro(ec2, "PRO", daily_pro) + launch_pro(ec2, "PRO FIPS", daily_pro_fips) + launch_pro(ec2, "PRO FIPS UPDATES", daily_pro_fips_updates) custom_vpc(ec2, daily) snapshot(ec2, daily) launch_multiple(ec2, daily) diff --git a/examples/gce.py b/examples/gce.py index 3eca925b..a3c0093c 100755 --- a/examples/gce.py +++ b/examples/gce.py @@ -6,7 +6,7 @@ import os import pycloudlib -from pycloudlib.cloud import ImageType +from pycloudlib.types import ImageType def manage_ssh_key(gce): @@ -39,20 +39,12 @@ def generic(gce): print(inst.execute("lsb_release -a")) -def pro(gce): +def pro(gce, series, image_type): """Show example of running a GCE PRO machine.""" - daily = gce.daily_image("bionic", image_type=ImageType.PRO) + daily = gce.daily_image(series, image_type) with gce.launch(daily) as inst: inst.wait() - print(inst.execute("sudo ua status --wait")) - - -def pro_fips(gce): - """Show example of running a GCE PRO FIPS machine.""" - daily = gce.daily_image("bionic", image_type=ImageType.PRO_FIPS) - with gce.launch(daily) as inst: - inst.wait() - print(inst.execute("sudo ua status --wait")) + print(inst.execute("sudo pro status --wait")) def demo(): @@ -62,8 +54,9 @@ def demo(): manage_ssh_key(gce) generic(gce) - pro(gce) - pro_fips(gce) + pro(gce, "focal", ImageType.PRO) + pro(gce, "focal", ImageType.PRO_FIPS) + pro(gce, "jammy", ImageType.PRO_FIPS_UPDATES) if __name__ == "__main__": diff --git a/examples/lxd.py b/examples/lxd.py index 8b48a314..14e090f7 100755 --- a/examples/lxd.py +++ b/examples/lxd.py @@ -6,7 +6,7 @@ import textwrap import pycloudlib -from pycloudlib.cloud import ImageType +from pycloudlib.types import ImageType RELEASE = "noble" diff --git a/examples/openstack_example.py b/examples/openstack_example.py new file mode 100644 index 00000000..fe8657ef --- /dev/null +++ b/examples/openstack_example.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +# This file is part of pycloudlib. See LICENSE file for license information. +"""Basic examples of various lifecycles with a Openstack instance.""" + +import logging +import os +import sys + +import pycloudlib + +REQUIRED_ENV_VARS = ("OS_AUTH_URL", "OS_PASSWORD", "OS_USERNAME") + + +def basic_lifecycle(image_id: str): + """Demonstrate basic set of lifecycle operations with OpenStack.""" + with pycloudlib.Openstack("pycloudlib-test") as os_cloud: + with os_cloud.launch(image_id=image_id) as inst: + inst.wait() + + result = inst.execute("uptime") + print(result) + inst.console_log() + inst.delete(wait=False) + + +def demo(image_id: str): + """Show examples of using the Openstack module.""" + basic_lifecycle(image_id) + + +def assert_openstack_config(): + """Assert any required OpenStack env variables and args needed for demo.""" + if len(sys.argv) != 2: + sys.stderr.write( + f"Usage: {sys.argv[0]} \n" + "Must provide an image id from openstack image list\n\n" + ) + sys.exit(1) + for env_name in REQUIRED_ENV_VARS: + assert os.environ.get( + env_name + ), f"Missing required Openstack environment variable: {env_name}" + + +if __name__ == "__main__": + assert_openstack_config() + logging.basicConfig(level=logging.DEBUG) + image_id = sys.argv[1] + demo(image_id=sys.argv[1]) diff --git a/examples/oracle/oracle-example-cluster-test.py b/examples/oracle/oracle-example-cluster-test.py index 53105b84..605bb73b 100644 --- a/examples/oracle/oracle-example-cluster-test.py +++ b/examples/oracle/oracle-example-cluster-test.py @@ -3,6 +3,7 @@ """Basic examples of various lifecycle with an OCI instance.""" import logging +import re import threading import time from datetime import datetime @@ -51,7 +52,7 @@ def cluster() -> Generator[List[OciInstance], None, None]: class TestOracleClusterBasic: """Test basic functionalities of Oracle Cluster.""" - def test_basic_ping_on_private_ips(self, cluster: List[OciInstance]): # pylint: disable=W0621 + def test_basic_ping_on_private_ips(self, cluster: List[OciInstance]): """ Test that cluster instances can ping each other on private IPs. @@ -71,7 +72,7 @@ def test_basic_ping_on_private_ips(self, cluster: List[OciInstance]): # pylint: logger.info("Successfully pinged %s from %s", private_ip, instance.private_ip) -def setup_mofed_iptables_rules(instance: OciInstance): +def setup_mofed_iptables_rules(instance: OciInstance) -> OciInstance: """ Set up IPTABLES rules for RDMA usage. @@ -126,7 +127,73 @@ def ensure_image_is_rdma_ready(instance: OciInstance): r = instance.execute("ibstatus") if not r.stdout or not r.ok: logger.info("Infiniband status: %s", r.stdout + "\n" + r.stderr) - pytest.skip("The image beiing used is not RDMA ready") + pytest.skip("The image being used is not RDMA ready") + + +def ensure_second_vnics_ready(test_cluster: List[OciInstance]): + """ + Check if all cluster instances have a secondary VNIC and attach and configure one if not. + + If the instance already has a secondary VNIC, it will skip the attachment process. + + Otherwise, it will do the following to set up the secondary VNIC: + - Attach a secondary VNIC to the instance + - Configure the secondary VNIC using information from the IMDS + - Set up the iptables rules on the appropriate NIC for RDMA usage + + Args: + test_cluster (List[OciInstance]): The cluster (list of instances) to check and configure. + """ + for instance in test_cluster: + if instance.secondary_vnic_private_ip: + logger.info( + "Instance %s already has a secondary VNIC, not attaching one.", instance.name + ) + continue + logger.info("Creating a secondary VNIC on instance %s", instance.name) + # create a secondary VNIC on the 2nd vnic on the private subnet for RDMA usage + instance.add_network_interface( + nic_index=1, + subnet_name="private subnet-mofed-vcn", # use the private subnet for mofed testing + ) + instance.configure_secondary_vnic() + setup_mofed_iptables_rules(instance) + + +def get_private_nic_pci_address(instance: OciInstance) -> str: + """ + Get the PCI address of the second NIC on the instance (mlx5_1) which is used for RDMA. + + The `mst status -v` command returns an output in the following format, where the + PCI address can be extracted from the column at index 2: + + ``` + $ sudo mst status -v | grep mlx5_1 + ConnectX6DX(rev:0) NA 4b:00.1 mlx5_1 net-ens300f1np1 + ``` + + Args: + instance (OciInstance): The instance to get the PCI address from. + + Returns: + str: The PCI address of the second NIC. + + Raises: + ValueError: If a valid PCI address cannot be parsed from the mst output. + """ + r = instance.execute("sudo mst status -v | grep mlx5_1") + if not r.ok or not r.stdout: + raise ValueError("Failed to retrieve PCI address: mst status command failed") + + try: + pciaddr = r.stdout.split()[2] + except IndexError: + raise ValueError("Failed to retrieve the second column of the mst status output") + + if not re.match(r".*[0-9a-fA-F]{2}:[0-9a-fA-F]{2}\.[0-9a-fA-F]$", pciaddr): + raise ValueError(f"Invalid PCI address format: {pciaddr}") + + return pciaddr class TestOracleClusterRdma: @@ -135,7 +202,7 @@ class TestOracleClusterRdma: @pytest.fixture(scope="class") def mofed_cluster( self, - cluster: List[OciInstance], # pylint: disable=W0621 + cluster: List[OciInstance], ) -> Generator[List[OciInstance], None, None]: """ Configure cluster for RDMA testing. @@ -144,20 +211,7 @@ def mofed_cluster( List[OciInstance]: RDMA-ready cluster instances. """ ensure_image_is_rdma_ready(cluster[0]) - for instance in cluster: - if instance.secondary_vnic_private_ip: - logger.info( - "Instance %s already has a secondary VNIC, not attaching one.", instance.name - ) - continue - logger.info("Creating a secondary VNIC on instance %s", instance.name) - # create a secondary VNIC on the 2nd vnic on the private subnet for RDMA usage - instance.add_network_interface( - nic_index=1, - subnet_name="private subnet-mofed-vcn", # use the private subnet for mofed testing - ) - instance.configure_secondary_vnic() - setup_mofed_iptables_rules(instance) + ensure_second_vnics_ready(cluster) yield cluster @@ -295,3 +349,146 @@ def start_server(): ) logger.info("ucx_perftest output: %s", r.stdout) assert r.ok, "Failed to run ucx_perftest" + + +class TestOracleClusterOfedTools: + """ + Test Nvidia tools included in OFED userspace package. + + Validate that CLI tools included in OFED are installed and executable. + Only verify query commands to avoid affecting the physical NIC firmware. + """ + + def test_mst_status(self, cluster: List[OciInstance]): + """ + Run mst status to confirm it is installed. + + Args: + cluster (List[OciInstance]): cluster instances + """ + dut_instance = cluster[0] + + r = dut_instance.execute("sudo mst status") + logger.info("mst status output: %s", r.stdout) + assert r.ok, "Failed to run mst status" + assert "MST modules" in r.stdout + assert "PCI Devices" in r.stdout + + def test_mlxconfig(self, cluster: List[OciInstance]): + """ + Run mlxconfig to confirm it is installed. + + Args: + cluster (List[OciInstance]): cluster instances + """ + dut_instance = cluster[0] + pci_addr = get_private_nic_pci_address(dut_instance) + + r = dut_instance.execute(f"sudo mlxconfig -d {pci_addr} q") + logger.info("mlxconfig query output: %s", r.stdout) + assert r.ok, "Failed to run mlxconfig query" + assert "ConnectX" in r.stdout + + def test_mlxfwmanager(self, cluster: List[OciInstance]): + """ + Run mlxfwmanager to confirm it is installed. + + Args: + cluster (List[OciInstance]): cluster instances + """ + dut_instance = cluster[0] + pci_addr = get_private_nic_pci_address(dut_instance) + + r = dut_instance.execute(f"sudo mlxfwmanager -d {pci_addr} --query") + logger.info("mlxfwmanager query output: %s", r.stdout) + assert r.ok, "Failed to run mlxfwmanager query" + assert "ConnectX" in r.stdout + assert "Device Type:" in r.stdout + assert "Part Number:" in r.stdout + + def test_flint(self, cluster: List[OciInstance]): + """ + Run flint to confirm it is installed. + + Args: + cluster (List[OciInstance]): cluster instances + """ + dut_instance = cluster[0] + pci_addr = get_private_nic_pci_address(dut_instance) + + r = dut_instance.execute(f"sudo flint -d {pci_addr} q") + logger.info("flint query output: %s", r.stdout) + assert r.ok, "Failed to run flint query" + assert "Image type:" in r.stdout + assert "FW Version:" in r.stdout + assert "Product Version:" in r.stdout + + def test_mlxfwreset(self, cluster: List[OciInstance]): + """ + Run mlxfwreset to confirm it is installed. + + Args: + cluster (List[OciInstance]): cluster instances + """ + dut_instance = cluster[0] + pci_addr = get_private_nic_pci_address(dut_instance) + + r = dut_instance.execute(f"sudo mlxfwreset -d {pci_addr} q") + logger.info("mlxfwreset query output: %s", r.stdout) + assert r.ok, "Failed to run mlxfwreset query" + assert "3: Driver restart and PCI reset" in r.stdout + assert "0: Tool is the owner" in r.stdout + + +class TestOracleClusterPerformance: + """Test traffic performance between Oracle Cluster instances.""" + + @pytest.fixture(scope="class") + def private_vnic_cluster( + self, + cluster: List[OciInstance], + ) -> Generator[List[OciInstance], None, None]: + """ + Cluster with private VNIC pair. + + Yields: + List[OciInstance]: Instances of cluster with private VNIC. + """ + ensure_second_vnics_ready(cluster) + + yield cluster + + def test_iperf3(self, private_vnic_cluster: List[OciInstance]): + """ + Test iperf3 between two instances. + + This tests the following: + - iperf3 successfully runs between two instances + - iperf3 throughput is greater than the minimum threshold (45.0) + + Args: + private_vnic_cluster (List[OciInstance]): Cluster using private VNICs + """ + min_throughput = 28.0 + server_instance = private_vnic_cluster[0] + client_instance = private_vnic_cluster[1] + + def start_server(): + """Start the iperf3 server on the "server_instance".""" + server_instance.execute("iperf3 -s -1") + + server_thread = threading.Thread(target=start_server) + server_thread.start() + + # Wait for iperf3 server to start before starting the client + time.sleep(5) + r = client_instance.execute( + f"iperf3 -c {server_instance.secondary_vnic_private_ip} -P 40 -Z | grep SUM" + ) + iperf3_output = r.stdout + logger.info("iperf3 output: %s", iperf3_output) + assert r.ok, "Failed to run iperf3" + + throughput = iperf3_output.splitlines()[-1].split()[5] + print("iperf3 measured throughput: %s" % throughput) + assert float(throughput) > min_throughput diff --git a/pycloudlib/azure/cloud.py b/pycloudlib/azure/cloud.py index fb75655d..3129ecb8 100644 --- a/pycloudlib/azure/cloud.py +++ b/pycloudlib/azure/cloud.py @@ -15,7 +15,7 @@ from pycloudlib.azure import security_types, util from pycloudlib.azure.instance import AzureInstance, VMInstanceStatus -from pycloudlib.cloud import BaseCloud, ImageType +from pycloudlib.cloud import BaseCloud from pycloudlib.config import ConfigFile from pycloudlib.errors import ( InstanceNotFoundError, @@ -23,6 +23,7 @@ PycloudlibError, PycloudlibTimeoutError, ) +from pycloudlib.types import ImageType from pycloudlib.util import get_timestamped_tag, update_nested UBUNTU_DAILY_IMAGES = { @@ -37,6 +38,7 @@ "noble": "Canonical:ubuntu-24_04-lts-daily:server:latest", "oracular": "Canonical:ubuntu-24_10-daily:server:latest", "plucky": "Canonical:ubuntu-25_04-daily:server:latest", + "questing": "Canonical:ubuntu-25_10-daily:server:latest", } UBUNTU_MINIMAL_DAILY_IMAGES = { @@ -46,6 +48,7 @@ "noble": "Canonical:ubuntu-24_04-lts-daily:minimal:latest", "oracular": "Canonical:ubuntu-24_10-daily:minimal:latest", "plucky": "Canonical:ubuntu-25_04-daily:minimal:latest", + "questing": "Canonical:ubuntu-25_04-daily:minimal:latest", } UBUNTU_DAILY_PRO_IMAGES = { @@ -62,6 +65,10 @@ "focal": "Canonical:0001-com-ubuntu-pro-focal-fips:pro-fips-20_04:latest", } +UBUNTU_DAILY_PRO_FIPS_UPDATES_IMAGES = { + "jammy": "Canonical:0001-com-ubuntu-pro-jammy-fips:pro-fips-22_04-gen1:latest", +} + UBUNTU_RELEASE_IMAGES = { "xenial": "Canonical:UbuntuServer:16.04-LTS:latest", "bionic": "Canonical:UbuntuServer:18.04-LTS:latest", @@ -636,7 +643,7 @@ def delete_image(self, image_id, **kwargs): if delete_poller.status() == "Succeeded": if image_id in self.registered_images: del self.registered_images[image_id] - self._log.debug("Image %s was deleted", image_id) + self._record_image_deletion(image_id) else: self._log.debug( "Error deleting %s. Status: %d", @@ -683,6 +690,8 @@ def _get_images_dict(self, image_type: ImageType): return UBUNTU_DAILY_PRO_IMAGES if image_type == ImageType.PRO_FIPS: return UBUNTU_DAILY_PRO_FIPS_IMAGES + if image_type == ImageType.PRO_FIPS_UPDATES: + return UBUNTU_DAILY_PRO_FIPS_UPDATES_IMAGES if image_type == ImageType.MINIMAL: return UBUNTU_MINIMAL_DAILY_IMAGES @@ -1101,12 +1110,13 @@ class compatibility. raise InstanceNotFoundError(resource_id=instance_id) - def snapshot(self, instance, clean=True, delete_provisioned_user=True, **kwargs): + def snapshot(self, instance, *, clean=True, keep=False, delete_provisioned_user=True, **kwargs): """Snapshot an instance and generate an image from it. Args: instance: Instance to snapshot clean: Run instance clean method before taking snapshot + keep: keep the snapshot after the cloud instance is cleaned up delete_provisioned_user: Deletes the last provisioned user kwargs: Other named arguments specific to this implementation @@ -1138,7 +1148,11 @@ def snapshot(self, instance, clean=True, delete_provisioned_user=True, **kwargs) image_id = image.id image_name = image.name - self.created_images.append(image_id) + self._store_snapshot_info( + snapshot_id=image_id, + snapshot_name=image_name, + keep_snapshot=keep, + ) self.registered_images[image_id] = { "name": image_name, diff --git a/pycloudlib/azure/instance.py b/pycloudlib/azure/instance.py index 513b06ea..fe2cc41a 100644 --- a/pycloudlib/azure/instance.py +++ b/pycloudlib/azure/instance.py @@ -301,6 +301,7 @@ def remove_network_interface(self, ip_address: str): vm_nics_ids = [nic.id for nic in self._instance["vm"].network_profile.network_interfaces] all_nics: List[NetworkInterface] = list(self._network_client.network_interfaces.list_all()) vm_nics = [nic for nic in all_nics if nic.id in vm_nics_ids] + primary_nic: Optional[NetworkInterface] = None primary_nic = [nic for nic in vm_nics if nic.primary][0] nic_params = [] nic_to_remove: Optional[NetworkInterface] = None diff --git a/pycloudlib/cloud.py b/pycloudlib/cloud.py index f75cd2f1..0d2ea8c5 100644 --- a/pycloudlib/cloud.py +++ b/pycloudlib/cloud.py @@ -1,7 +1,6 @@ # This file is part of pycloudlib. See LICENSE file for license information. """Base class for all other clouds to provide consistent set of functions.""" -import enum import getpass import io import logging @@ -20,6 +19,7 @@ ) from pycloudlib.instance import BaseInstance from pycloudlib.key import KeyPair +from pycloudlib.types import ImageInfo from pycloudlib.util import ( get_timestamped_tag, log_exception_list, @@ -28,16 +28,6 @@ _RequiredValues = Optional[Sequence[Optional[Any]]] -@enum.unique -class ImageType(enum.Enum): - """Allowed image types when launching cloud images.""" - - GENERIC = "generic" - MINIMAL = "minimal" - PRO = "Pro" - PRO_FIPS = "Pro FIPS" - - class BaseCloud(ABC): """Base Cloud Class.""" @@ -58,7 +48,8 @@ def __init__( config_file: path to pycloudlib configuration file """ self.created_instances: List[BaseInstance] = [] - self.created_images: List[str] = [] + self.created_images: List[ImageInfo] = [] + self.preserved_images: List[ImageInfo] = [] # each dict will hold an id and name self._log = logging.getLogger("{}.{}".format(__name__, self.__class__.__name__)) self.config = self._check_and_get_config(config_file, required_values) @@ -179,7 +170,6 @@ def launch( image_id: string, image ID to use for the instance instance_type: string, type of instance to create user_data: used by cloud-init to run custom scripts/configuration - username: username to use when connecting via SSH **kwargs: dictionary of other arguments to pass to launch Returns: @@ -189,12 +179,13 @@ def launch( raise NotImplementedError @abstractmethod - def snapshot(self, instance, clean=True, **kwargs): + def snapshot(self, instance, *, clean=True, keep=False, **kwargs): """Snapshot an instance and generate an image from it. Args: instance: Instance to snapshot clean: run instance clean method before taking snapshot + keep: keep the snapshot after the cloud instance is cleaned up Returns: An image id @@ -216,11 +207,18 @@ def clean(self) -> List[Exception]: instance.delete() except Exception as e: exceptions.append(e) - for image_id in self.created_images: + for image_info in self.created_images: try: - self.delete_image(image_id) + self.delete_image(image_id=image_info.image_id) except Exception as e: exceptions.append(e) + for image_info in self.preserved_images: + # noop - just log that we're not cleaning up these images + self._log.info( + "Preserved image %s [id:%s] is NOT being cleaned up.", + image_info.image_name, + image_info.image_id, + ) return exceptions def list_keys(self): @@ -371,3 +369,72 @@ def _get_ssh_keys( private_key_path=private_key_path, name=name, ) + + def _store_snapshot_info( + self, + snapshot_id: str, + snapshot_name: str, + keep_snapshot: bool, + ) -> ImageInfo: + """ + Save the snapshot information for later cleanup depending on the keep_snapshot argument. + + This method saves the snapshot information to either `created_images` or `preserved_images` + based on the value of `keep_snapshot`. These lists are used by the `BaseCloud`'s `clean()` + method to manage snapshots during cleanup. The snapshot information is also logged in a + consistent format so that individual clouds do NOT need to worry about logging. + + Args: + snapshot_id (str): ID of the snapshot (used later to delete the snapshot). + snapshot_name (str): Name of the snapshot (for user reference). + keep_snapshot (bool): Whether to keep the snapshot after the cloud instance is cleaned up. + + Returns: + ImageInfo: An ImageInfo object containing the snapshot information. + """ + image_info = ImageInfo( + image_id=snapshot_id, + image_name=snapshot_name, + ) + if not keep_snapshot: + self.created_images.append(image_info) + self._log.info( + "Created temporary snapshot %s", + image_info, + ) + else: + self.preserved_images.append(image_info) + self._log.info( + "Created permanent snapshot %s", + image_info, + ) + return image_info + + def _record_image_deletion(self, image_id: str): + """ + Record the deletion of an image. + + This method should be called after an image is successfully deleted. + It will remove the image from the list of created_images or preserved_images + so that the cloud does not attempt to re-clean it up later. It will also log + the deletion of the image. + + :param image_id: ID of the image that was deleted + """ + if match := [i for i in self.created_images if i.image_id == image_id]: + deleted_image = match[0] + self.created_images.remove(deleted_image) + self._log.debug( + "Snapshot %s has been deleted. Will no longer need to be cleaned up later.", + deleted_image, + ) + elif match := [i for i in self.preserved_images if i.image_id == image_id]: + deleted_image = match[0] + self.preserved_images.remove(deleted_image) + self._log.debug( + "Snapshot %s has been deleted. This snapshot was taken with keep=True, " + "but since it has been manually deleted, it will not be preserved.", + deleted_image, + ) + else: + self._log.debug("Deleted image %s", image_id) diff --git a/pycloudlib/ec2/cloud.py b/pycloudlib/ec2/cloud.py index d479bcd3..55b3e4b2 100644 --- a/pycloudlib/ec2/cloud.py +++ b/pycloudlib/ec2/cloud.py @@ -6,12 +6,13 @@ import botocore -from pycloudlib.cloud import BaseCloud, ImageType +from pycloudlib.cloud import BaseCloud from pycloudlib.config import ConfigFile from pycloudlib.ec2.instance import EC2Instance from pycloudlib.ec2.util import _get_session, _tag_resource from pycloudlib.ec2.vpc import VPC from pycloudlib.errors import CloudSetupError, ImageNotFoundError, PycloudlibError +from pycloudlib.types import ImageType from pycloudlib.util import LTS_RELEASES, UBUNTU_RELEASE_VERSION_MAP # Images before mantic don't have gp3 disk type @@ -161,7 +162,10 @@ def _get_name_for_image_type(self, release: str, image_type: ImageType, daily: b return f"ubuntu-pro-server/images/{disk_type}/ubuntu-{release}-{release_ver}-*" if image_type == ImageType.PRO_FIPS: - return f"ubuntu-pro-fips*/images/{disk_type}/ubuntu-{release}-{release_ver}-*" + return f"ubuntu-pro-fips-server/images/{disk_type}/ubuntu-{release}-{release_ver}-*" + + if image_type == ImageType.PRO_FIPS_UPDATES: + return f"ubuntu-pro-fips-updates-server/images/{disk_type}/ubuntu-{release}-{release_ver}-*" raise ValueError("Invalid image_type") @@ -294,6 +298,8 @@ def delete_image(self, image_id, **kwargs): self._log.debug("removing custom snapshot %s", snapshot_id) self.client.delete_snapshot(SnapshotId=snapshot_id) + self._record_image_deletion(image_id) + def delete_key(self, name): """Delete an uploaded key. @@ -416,12 +422,13 @@ def list_keys(self): keypair_names.append(keypair["KeyName"]) return keypair_names - def snapshot(self, instance, clean=True): + def snapshot(self, instance, *, clean=True, keep=False, **kwargs): """Snapshot an instance and generate an image from it. Args: instance: Instance to snapshot clean: run instance clean method before taking snapshot + keep: keep the snapshot after the cloud instance is cleaned up Returns: An image id @@ -440,7 +447,12 @@ def snapshot(self, instance, clean=True): ) image_ami_edited = response["ImageId"] image = self.resource.Image(image_ami_edited) - self.created_images.append(image.id) + + self._store_snapshot_info( + snapshot_id=image.id, + snapshot_name=image.name, + keep_snapshot=keep, + ) self._wait_for_snapshot(image) _tag_resource(image, self.tag) diff --git a/pycloudlib/gce/cloud.py b/pycloudlib/gce/cloud.py index c5f4feff..9dd5d450 100644 --- a/pycloudlib/gce/cloud.py +++ b/pycloudlib/gce/cloud.py @@ -16,7 +16,7 @@ from google.api_core.extended_operation import ExtendedOperation from google.cloud import compute_v1 -from pycloudlib.cloud import BaseCloud, ImageType +from pycloudlib.cloud import BaseCloud from pycloudlib.config import ConfigFile from pycloudlib.errors import ( CloudSetupError, @@ -25,6 +25,7 @@ ) from pycloudlib.gce.instance import GceInstance from pycloudlib.gce.util import get_credentials, raise_on_error +from pycloudlib.types import ImageType from pycloudlib.util import UBUNTU_RELEASE_VERSION_MAP, subp logging.getLogger("google.cloud").setLevel(logging.WARNING) @@ -163,6 +164,11 @@ def _get_name_filter(self, release: str, image_type: ImageType): UBUNTU_RELEASE_VERSION_MAP[release].replace(".", ""), release ) + if image_type == ImageType.PRO_FIPS_UPDATES: + return "ubuntu-pro-fips-updates-{}-{}-*".format( + UBUNTU_RELEASE_VERSION_MAP[release].replace(".", ""), release + ) + raise ValueError("Invalid image_type: {}".format(image_type.value)) def _query_image_list(self, release: str, project: str, name_filter: str, arch: str): @@ -314,6 +320,7 @@ def delete_image(self, image_id, **kwargs): raise_on_error(operation) except GoogleAPICallError as e: raise_on_error(e) + self._record_image_deletion(image_id) def get_instance( self, @@ -427,12 +434,13 @@ def launch( self.created_instances.append(instance) return instance - def snapshot(self, instance: GceInstance, clean=True, **kwargs): + def snapshot(self, instance: GceInstance, *, clean=True, keep=False, **kwargs): """Snapshot an instance and generate an image from it. Args: instance: Instance to snapshot clean: run instance clean method before taking snapshot + keep: keep the snapshot after the cloud instance is cleaned up Returns: An image id @@ -470,7 +478,11 @@ def snapshot(self, instance: GceInstance, clean=True, **kwargs): self._wait_for_operation(operation) image_id = "projects/{}/global/images/{}".format(self.project, snapshot_name) - self.created_images.append(image_id) + self._store_snapshot_info( + snapshot_name=snapshot_name, + snapshot_id=image_id, + keep_snapshot=keep, + ) return image_id def _wait_for_operation(self, operation, operation_type="global", sleep_seconds=300): diff --git a/pycloudlib/ibm/cloud.py b/pycloudlib/ibm/cloud.py index a75c135b..6cbd65ff 100644 --- a/pycloudlib/ibm/cloud.py +++ b/pycloudlib/ibm/cloud.py @@ -13,7 +13,7 @@ from pycloudlib.cloud import BaseCloud from pycloudlib.config import ConfigFile -from pycloudlib.errors import InvalidTagNameError +from pycloudlib.errors import InvalidTagNameError, ResourceNotFoundError, ResourceType from pycloudlib.ibm._util import get_first as _get_first from pycloudlib.ibm._util import iter_resources as _iter_resources from pycloudlib.ibm._util import wait_until as _wait_until @@ -75,7 +75,11 @@ def __init__( authenticator = IAMAuthenticator(api_key) self.instance_counter = itertools.count(1) - self._client = VpcV1(authenticator=authenticator) + # Note this pins API version to ibm-vpc 0.28.0 also in setup.cfg. + # If updating API version: + # 1. Check latest https://github.com/IBM/vpc-python-sdk/releases/ + # 2. Update setup.cfg ibm-vpc < conditional + self._client = VpcV1(authenticator=authenticator, version="2025-04-22") self._client.set_service_url(f"https://{self.region}.iaas.cloud.ibm.com/v1") self._resource_manager_service = ResourceManagerV2(authenticator=authenticator) @@ -130,7 +134,9 @@ def delete_image(self, image_id: str, **kwargs): self._client.delete_image(image_id).get_result() except ApiException as e: if "does not exist" not in str(e): - raise + raise ResourceNotFoundError(ResourceType.IMAGE, image_id) from e + else: + self._record_image_deletion(image_id) def released_image(self, release, *, arch: str = "amd64", **kwargs): """ID of the latest released image for a particular release. @@ -312,12 +318,13 @@ def launch( return instance - def snapshot(self, instance: IBMInstance, clean: bool = True, **kwargs) -> str: + def snapshot(self, instance: IBMInstance, *, clean=True, keep=False, **kwargs) -> str: """Snapshot an instance and generate an image from it. Args: instance: Instance to snapshot clean: run instance clean method before taking snapshot + keep: keep the snapshot after the cloud instance is cleaned up Returns: An image id @@ -347,7 +354,11 @@ def snapshot(self, instance: IBMInstance, clean: bool = True, **kwargs) -> str: f"Snapshot not available after {timeout_seconds} seconds. Check IBM VPC console." ), ) - self.created_images.append(snapshot_id) + self._store_snapshot_info( + snapshot_name=str(image_prototype["name"]), + snapshot_id=snapshot_id, + keep_snapshot=keep, + ) return snapshot_id def list_keys(self) -> List[str]: diff --git a/pycloudlib/ibm/instance.py b/pycloudlib/ibm/instance.py index 07f91ef8..6190ca55 100644 --- a/pycloudlib/ibm/instance.py +++ b/pycloudlib/ibm/instance.py @@ -692,9 +692,11 @@ def find_existing( **kwargs, ) -> "IBMInstance": """Find an instance by ID.""" - instance = _IBMInstanceType.VSI.get_instance(client, instance_id) + response = _IBMInstanceType.VSI.get_instance(client, instance_id) + instance = response.result if not instance: - instance = _IBMInstanceType.BARE_METAL_SERVER.get_instance(client, instance_id) + response = _IBMInstanceType.BARE_METAL_SERVER.get_instance(client, instance_id) + instance = response.result if not instance: raise IBMException(f"Instance not found: {instance_id}") @@ -917,12 +919,12 @@ def _check_instance_failed_status(self) -> None: f"{self._instance['status_reasons'][0]['message']}" ) - def _wait_for_instance_start(self, **kwargs): + def _wait_for_instance_start(self, start_timeout=900, **kwargs): """Wait for the cloud instance to be up.""" self._log.info("Waiting for instance to finish provisioning.") self._wait_for_status( _Status.RUNNING, - sleep_seconds=900, + sleep_seconds=start_timeout, side_effect_fn=self._check_instance_failed_status, ) diff --git a/pycloudlib/ibm_classic/cloud.py b/pycloudlib/ibm_classic/cloud.py index b9d89b5d..82bb6f84 100644 --- a/pycloudlib/ibm_classic/cloud.py +++ b/pycloudlib/ibm_classic/cloud.py @@ -81,6 +81,7 @@ def delete_image(self, image_id: str, **kwargs): ) from e except SoftLayer.SoftLayerAPIError as e: raise IBMClassicException(f"Error deleting image {image_id}") from e + self._record_image_deletion(image_id) def released_image(self, release, *, disk_size: str = "25G", **kwargs): """ID (globalIdentifier) of the latest released image for a particular release. @@ -267,7 +268,9 @@ def launch( def snapshot( self, instance, + *, clean=True, + keep=False, note: Optional[str] = None, **kwargs, ): @@ -276,6 +279,7 @@ def snapshot( Args: instance: Instance to snapshot clean: run instance clean method before taking snapshot + keep: keep the snapshot after the cloud instance is cleaned up note: optional note to add to the snapshot Returns: @@ -290,10 +294,10 @@ def snapshot( name=f"{self.tag}-snapshot", notes=note, ) - self._log.info( - "Successfully created snapshot '%s' with ID: %s", - snapshot_result["name"], - snapshot_result["id"], + self._store_snapshot_info( + snapshot_name=snapshot_result["name"], + snapshot_id=snapshot_result["id"], + keep_snapshot=keep, ) return snapshot_result["id"] diff --git a/pycloudlib/lxd/_images.py b/pycloudlib/lxd/_images.py index 9ccd84fa..ba84f59d 100644 --- a/pycloudlib/lxd/_images.py +++ b/pycloudlib/lxd/_images.py @@ -6,7 +6,7 @@ import logging from typing import Any, List, Optional, Sequence, Tuple -from pycloudlib.cloud import ImageType +from pycloudlib.types import ImageType from pycloudlib.util import subp _REMOTE_DAILY = "ubuntu-daily" diff --git a/pycloudlib/lxd/cloud.py b/pycloudlib/lxd/cloud.py index 9efba4b7..b4de7c59 100644 --- a/pycloudlib/lxd/cloud.py +++ b/pycloudlib/lxd/cloud.py @@ -8,11 +8,12 @@ import yaml -from pycloudlib.cloud import BaseCloud, ImageType +from pycloudlib.cloud import BaseCloud from pycloudlib.constants import LOCAL_UBUNTU_ARCH from pycloudlib.lxd import _images from pycloudlib.lxd.defaults import base_vm_profiles from pycloudlib.lxd.instance import LXDInstance, LXDVirtualMachineInstance +from pycloudlib.types import ImageType from pycloudlib.util import subp @@ -393,11 +394,10 @@ def delete_image(self, image_id, **kwargs): image_id: string, LXD image fingerprint """ self._log.debug("Deleting image: '%s'", image_id) - subp(["lxc", "image", "delete", image_id]) - self._log.debug("Deleted %s", image_id) + self._record_image_deletion(image_id) - def snapshot(self, instance, clean=True, name=None): + def snapshot(self, instance: LXDInstance, *, clean=True, keep=False, name=None): # type: ignore """Take a snapshot of the passed in instance for use as image. :param instance: The instance to create an image from @@ -412,7 +412,11 @@ def snapshot(self, instance, clean=True, name=None): instance.clean() snapshot_name = instance.snapshot(name) - self.created_snapshots.append(snapshot_name) + self._store_snapshot_info( + snapshot_name=snapshot_name, + snapshot_id=snapshot_name, + keep_snapshot=keep, + ) return snapshot_name # pylint: disable=broad-except @@ -424,13 +428,6 @@ def clean(self) -> List[Exception]: """ exceptions = super().clean() - for snapshot in self.created_snapshots: - try: - subp(["lxc", "image", "delete", snapshot]) - except RuntimeError as e: - if "Image not found" not in str(e): - exceptions.append(e) - for profile in self.created_profiles: try: subp(["lxc", "profile", "delete", profile]) diff --git a/pycloudlib/oci/cloud.py b/pycloudlib/oci/cloud.py index 8b24fa48..8e5fbaa7 100644 --- a/pycloudlib/oci/cloud.py +++ b/pycloudlib/oci/cloud.py @@ -6,7 +6,7 @@ import json import os import re -from typing import List, Optional, cast +from typing import Dict, List, Optional, cast import oci @@ -20,11 +20,13 @@ ) from pycloudlib.oci.instance import OciInstance from pycloudlib.oci.utils import ( + generate_create_vnic_details, get_subnet_id, get_subnet_id_by_name, parse_oci_config_from_env_vars, wait_till_ready, ) +from pycloudlib.types import NetworkingConfig from pycloudlib.util import UBUNTU_RELEASE_VERSION_MAP, subp @@ -133,6 +135,7 @@ def delete_image(self, image_id, **kwargs): image_id: string, id of the image to delete """ self.compute_client.delete_image(image_id, **kwargs) + self._record_image_deletion(image_id) def released_image(self, release, operating_system="Canonical Ubuntu"): """Get the released image. @@ -251,6 +254,7 @@ def get_instance(self, instance_id, *, username: Optional[str] = None, **kwargs) availability_domain=self.availability_domain, oci_config=self.oci_config, username=username, + vcn_name=self.vcn_name, ) def launch( @@ -262,7 +266,10 @@ def launch( retry_strategy=None, username: Optional[str] = None, cluster_id: Optional[str] = None, + subnet_id: Optional[str] = None, subnet_name: Optional[str] = None, + metadata: Dict = {}, + primary_network_config: Optional[NetworkingConfig] = None, **kwargs, ) -> OciInstance: """Launch an instance. @@ -273,12 +280,20 @@ def launch( https://docs.cloud.oracle.com/en-us/iaas/Content/Compute/References/computeshapes.htm user_data: used by Cloud-Init to run custom scripts or provide custom Cloud-Init configuration + subnet_id: string, OCID of subnet to use for instance. + Takes precedence over subnet_name if both are provided. subnet_name: string, name of subnet to use for instance. + Only used if subnet_id is not provided. + metadata: Dict, key-value pairs provided to the launch + details for the instance. retry_strategy: a retry strategy from oci.retry module to apply for this operation username: username to use when connecting via SSH vcn_name: Name of the VCN to use. If not provided, the first VCN found will be used + subnet_name: string, name of subnet to use for instance. + primary_network_config: NetworkingConfig object to use for configuring the primary + network interface **kwargs: dictionary of other arguments to pass as LaunchInstanceDetails @@ -289,20 +304,24 @@ def launch( if not image_id: raise ValueError(f"{self._type} launch requires image_id param. Found: {image_id}") - if subnet_name: - subnet_id = get_subnet_id_by_name(self.network_client, self.compartment_id, subnet_name) - else: - subnet_id = get_subnet_id( - self.network_client, - self.compartment_id, - self.availability_domain, - vcn_name=self.vcn_name, - ) - metadata = { + if not subnet_id: + if subnet_name: + subnet_id = get_subnet_id_by_name(self.network_client, self.compartment_id, subnet_name) + else: + subnet_id = get_subnet_id( + self.network_client, + self.compartment_id, + self.availability_domain, + vcn_name=self.vcn_name, + networking_config=primary_network_config, + ) + default_metadata = { "ssh_authorized_keys": self.key_pair.public_key_content, } if user_data: - metadata["user_data"] = base64.b64encode(user_data.encode("utf8")).decode("ascii") + default_metadata["user_data"] = base64.b64encode(user_data.encode("utf8")).decode( + "ascii" + ) instance_details = oci.core.models.LaunchInstanceDetails( # noqa: E501 display_name=self.tag, @@ -312,8 +331,12 @@ def launch( shape=instance_type, subnet_id=subnet_id, image_id=image_id, - metadata=metadata, + metadata={**default_metadata, **metadata}, compute_cluster_id=cluster_id, + create_vnic_details=generate_create_vnic_details( + subnet_id=subnet_id, + networking_config=primary_network_config, + ), **kwargs, ) @@ -328,15 +351,43 @@ def launch( self.created_instances.append(instance) return instance - def snapshot(self, instance, clean=True, name=None): + def find_compatible_subnet(self, networking_config: NetworkingConfig) -> str: + """ + Automatically select a subnet that is compatible with the given networking_config. + + In this case, compatible means that the subnet can support the necessary networking type + (ipv4 only, ipv6 only, or dual stack) and the private or public requirement. + This method will select the first subnet that matches the criteria. + + Args: + networking_config: NetworkingConfig object to use for finding a subnet + + Returns: + id of the subnet selected + + Raises: + `PycloudlibError` if unable to determine `subnet_id` for the given `networking_config` + """ + subnet_id = get_subnet_id( + network_client=self.network_client, + compartment_id=self.compartment_id, + availability_domain=self.availability_domain, + vcn_name=self.vcn_name, + networking_config=networking_config, + ) + return subnet_id + + def snapshot(self, instance, *, clean=True, keep=False, name=None): """Snapshot an instance and generate an image from it. Args: instance: Instance to snapshot clean: run instance clean method before taking snapshot - name: (Optional) Name of created image + keep: Keep the image after the cloud instance is cleaned up + name: Name of created image + Returns: - An image object + The image id of the snapshot """ if clean: instance.clean() @@ -355,7 +406,11 @@ def snapshot(self, instance, clean=True, name=None): desired_state="AVAILABLE", ) - self.created_images.append(image_data.id) + self._store_snapshot_info( + snapshot_name=image_data.display_name, + snapshot_id=image_data.id, + keep_snapshot=keep, + ) return image_data.id diff --git a/pycloudlib/oci/instance.py b/pycloudlib/oci/instance.py index c401da0b..baaff64d 100644 --- a/pycloudlib/oci/instance.py +++ b/pycloudlib/oci/instance.py @@ -10,7 +10,13 @@ from pycloudlib.errors import PycloudlibError from pycloudlib.instance import BaseInstance -from pycloudlib.oci.utils import get_subnet_id, get_subnet_id_by_name, wait_till_ready +from pycloudlib.oci.utils import ( + generate_create_vnic_details, + get_subnet_id, + get_subnet_id_by_name, + wait_till_ready, +) +from pycloudlib.types import NetworkingConfig class OciInstance(BaseInstance): @@ -27,6 +33,7 @@ def __init__( oci_config=None, *, username: Optional[str] = None, + vcn_name: Optional[str] = None, ): """Set up the instance. @@ -46,6 +53,7 @@ def __init__( self.availability_domain = availability_domain self._fault_domain = None self._ip = None + self._vcn_name: Optional[str] = vcn_name if oci_config is None: oci_config = oci.config.from_file("~/.oci/config") # noqa: E501 @@ -145,7 +153,8 @@ def secondary_vnic_private_ip(self) -> Optional[str]: for vnic_attachment in vnic_attachments ] secondary_vnic_attachment = [vnic for vnic in vnics if not vnic.is_primary][0] - return secondary_vnic_attachment.private_ip + self._log.debug("secondary vnic attachment data:\n%s", secondary_vnic_attachment) + return secondary_vnic_attachment.private_ip or secondary_vnic_attachment.ipv6_addresses[0] @property def instance_data(self): @@ -258,7 +267,7 @@ def get_secondary_vnic_ip(self) -> str: def add_network_interface( self, nic_index: int = 0, - use_private_subnet: bool = False, + networking_config: Optional[NetworkingConfig] = None, subnet_name: Optional[str] = None, **kwargs: Any, ) -> str: @@ -270,13 +279,19 @@ def add_network_interface( Args: nic_index: The index of the NIC to add - subnet_name: Name of the subnet to add the NIC to. If not provided, - will use `use_private_subnet` to select first available subnet. - use_private_subnet: If True, will select the first available private - subnet. If False, will select the first available public subnet. - This is only used if `subnet_name` is not provided. + networking_config: Networking configuration to use when selecting subnet. This specifies + the networking type (ipv4, ipv6, or dualstack) and whether to use a public or + private subnet. If not provided, will default to selecting the first public subnet + found. + subnet_name: Name of the subnet to add the NIC to. If provided, this subnet will + blindly be selected and networking_config will be ignored. + + Returns: + str: The private IP address of the added network interface. """ if subnet_name: + if networking_config: + self._log.debug("Ignoring networking_config when subnet_name is provided.") subnet_id = get_subnet_id_by_name( self.network_client, self.compartment_id, @@ -287,10 +302,11 @@ def add_network_interface( self.network_client, self.compartment_id, self.availability_domain, - private=use_private_subnet, + networking_config=networking_config, + vcn_name=self._vcn_name, ) - create_vnic_details = oci.core.models.CreateVnicDetails( # noqa: E501 - subnet_id=subnet_id, + create_vnic_details = generate_create_vnic_details( + subnet_id=subnet_id, networking_config=networking_config ) attach_vnic_details = oci.core.models.AttachVnicDetails( # noqa: E501 create_vnic_details=create_vnic_details, @@ -304,13 +320,29 @@ def add_network_interface( desired_state=vnic_attachment_data.LIFECYCLE_STATE_ATTACHED, ) vnic_data = self.network_client.get_vnic(vnic_attachment_data.vnic_id).data + self._log.debug( + "Newly attached vnic data:\n%s", + vnic_data, + ) + try: + new_ip = vnic_data.private_ip or vnic_data.ipv6_addresses[0] + except IndexError: + err_msg = ( + "Unexpected error occurred when trying to retrieve local IP address of the " + "newly attached NIC. No private IP or IPv6 address found." + ) + self._log.error( + err_msg + "Full vnic data for debugging purposes:\n%s", + vnic_data, + ) + raise PycloudlibError(err_msg) self._log.info( - "Added network interface with private IP %s to instance %s on nic #%s", - vnic_data.private_ip, + "Added network interface with IP %s to instance %s on nic #%s", + new_ip, self.instance_id, nic_index, ) - return vnic_data.private_ip + return new_ip def remove_network_interface(self, ip_address: str): """Remove network interface based on IP address. @@ -355,14 +387,20 @@ def configure_secondary_vnic(self) -> str: or if the IP address was not successfully assigned to the interface. PycloudlibError: If failed to fetch secondary VNIC data from the Oracle Cloud metadata service. """ - if not self.secondary_vnic_private_ip: + secondary_ip = self.secondary_vnic_private_ip + if not secondary_ip: raise ValueError("Cannot configure secondary VNIC without a secondary VNIC attached") + if ":" in secondary_ip: + imds_url = "http://[fd00:c1::a9fe:a9fe]/opc/v1/vnics" + else: + imds_url = "http://169.254.169.254/opc/v1/vnics" + secondary_vnic_imds_data: Optional[Dict[str, str]] = None # it can take a bit for the secondary VNIC to show up in the IMDS # so we need to retry fetching the data for roughly a minute for _ in range(60): # Fetch JSON data from the Oracle Cloud metadata service - imds_req = self.execute("curl -s http://169.254.169.254/opc/v1/vnics").stdout + imds_req = self.execute(f"curl -s {imds_url}").stdout vnics_data = json.loads(imds_req) if len(vnics_data) > 1: self._log.debug("Successfully fetched secondary VNIC data from IMDS") diff --git a/pycloudlib/oci/utils.py b/pycloudlib/oci/utils.py index 55340d89..d0446bd6 100644 --- a/pycloudlib/oci/utils.py +++ b/pycloudlib/oci/utils.py @@ -4,18 +4,24 @@ import logging import os import time -from typing import TYPE_CHECKING, Dict, Optional +from typing import Any, Dict, Optional +import oci import toml from oci.retry import DEFAULT_RETRY_STRATEGY # pylint: disable=E0611,E0401 from pycloudlib.errors import PycloudlibError, PycloudlibTimeoutError +from pycloudlib.types import NetworkingConfig, NetworkingType -if TYPE_CHECKING: - import oci +log = logging.getLogger(__name__) +OCI_SDK_NULL = "" +ORACLE_IMDS_NULL = "\u003cnull\u003e" -log = logging.getLogger(__name__) + +def _oci_sdk_string_is_truthy(value: Optional[str]) -> bool: + """Check if value returned by OCI SDK is truthy.""" + return value not in (OCI_SDK_NULL, ORACLE_IMDS_NULL, None, "") def wait_till_ready( @@ -71,7 +77,7 @@ def get_subnet_id_by_name( Returns: id of the subnet selected Raises: - `Exception` if unable to determine `subnet_id` for + `PycloudlibError` if unable to determine `subnet_id` for `availability_domain` """ subnets = network_client.list_subnets( @@ -84,14 +90,131 @@ def get_subnet_id_by_name( return subnets[0].id +def _get_subnet_features( + subnet: oci.core.models.Subnet, +) -> Dict[str, Any]: + """ + Get the core features of a subnet that can be used to determine compatibility. + + These features can be used to easily determine if the subnet is compatible with certain + restrictions. + + Args: + subnet: The subnet model to get the features of. + + Returns: + A dictionary containing the following keys: + - availability_domain: The availability domain of the subnet. + - private: Whether the subnet is private. + - networking_type: The networking type of the subnet. + + """ + availability_domain = subnet.availability_domain + private = subnet.prohibit_internet_ingress + has_ipv4_cidr_block = _oci_sdk_string_is_truthy(subnet.cidr_block) + has_ipv6_cidr_block = _oci_sdk_string_is_truthy(subnet.ipv6_cidr_block) + networking_type = None + if has_ipv4_cidr_block and not has_ipv6_cidr_block: + networking_type = NetworkingType.IPV4 + elif not has_ipv4_cidr_block and has_ipv6_cidr_block: + networking_type = NetworkingType.IPV6 + elif has_ipv4_cidr_block and has_ipv6_cidr_block: + networking_type = NetworkingType.DUAL_STACK + else: + log.warning( + "Unable to determine networking type for subnet %s [id: %s]", + subnet.display_name, + subnet.id, + ) + return { + "availability_domain": availability_domain, + "private": private, + "networking_type": networking_type, + } + + +def _subnet_is_compatible( + subnet: oci.core.models.Subnet, + availability_domain: str, + networking_config: NetworkingConfig, +) -> bool: + """ + Check if the subnet is compatible with the given restrictions. + + For each restriction, the following must be true: + availability_domain: + - if the subnet is tied to an availability domain, it must match the given availability domain + - if the subnet is not tied to an availability domain, then it is automatically compatible + + From the networking_config, we have the following restrictions: + + private: + - the subnet must match the given privacy setting + + networking_type: + - if None or AUTO, then the subnet is compatible + - if IPV4, then the subnet must have a cidr_block and not have an ipv6_cidr_block + - if IPV6, then the subnet must not have a cidr_block and have an ipv6_cidr_block + - if DUAL_STACK, then the subnet must have both a cidr_block and an ipv6_cidr_block + + Args: + subnet: The subnet information to check. + availability_domain: The availability domain to check against. + networking_config: The networking configuration to validate against. + + Returns: + True if the subnet is compatible, False otherwise. + """ + networking_type = networking_config.networking_type + private = networking_config.private + + # to do this, lets get the subnet features + features = _get_subnet_features(subnet) + log.debug("Subnet features: %s", features) + + if networking_type == NetworkingType.IPV4: + networking_type_compatible = ( + features["networking_type"] == NetworkingType.IPV4 + or features["networking_type"] == NetworkingType.DUAL_STACK + ) + elif networking_type == NetworkingType.IPV6: + networking_type_compatible = features["networking_type"] == NetworkingType.IPV6 + elif networking_type == NetworkingType.DUAL_STACK: + networking_type_compatible = features["networking_type"] == NetworkingType.DUAL_STACK + else: # networking type is AUTO or None + networking_type_compatible = True + + private_compatible = private == features["private"] + availability_domain_compatible = ( + features["availability_domain"] is None + or features["availability_domain"] == availability_domain + ) + compatible = ( + networking_type_compatible and private_compatible and availability_domain_compatible + ) + if not compatible: + log.debug( + "Subnet %s is NOT compatible. Restrictions met?:\n" + "availability_domain: %s\n" + "private: %s\n" + "networking_type: %s", + subnet.display_name, + availability_domain_compatible, + private_compatible, + networking_type_compatible, + ) + + return compatible + + def get_subnet_id( network_client: "oci.core.VirtualNetworkClient", compartment_id: str, availability_domain: str, vcn_name: Optional[str] = None, - private: bool = False, *, retry_strategy=DEFAULT_RETRY_STRATEGY, + networking_config: Optional[NetworkingConfig] = None, ) -> str: """Get a subnet id linked to `availability_domain`. @@ -105,12 +228,23 @@ def get_subnet_id( vcn_name: Exact name of the VCN to use. If not provided, the newest VCN in the given compartment will be used. retry_strategy: A retry strategy to apply to the API calls + networking_config: The networking configuration to use. This provides the `private` and + `networking_type` restrictions to use when selecting the subnet. Returns: id of the subnet selected Raises: - `Exception` if unable to determine `subnet_id` for - `availability_domain` + `PycloudlibError` if unable to determine `subnet_id` for `availabilitjy_domain`, + or if no relevant VCNs are found in the compartment. """ + if not networking_config: + networking_config = NetworkingConfig() + log.warning( + "No networking config provided. Using default networking config of " + "networking_type: %s, private: %s", + networking_config.networking_type, + networking_config.private, + ) + if vcn_name is not None: # if vcn_name specified, use that vcn vcns = network_client.list_vcns( compartment_id, @@ -129,38 +263,19 @@ def get_subnet_id( chosen_vcn_name = vcns[0].display_name subnets = network_client.list_subnets( - compartment_id, vcn_id=vcn_id, retry_strategy=retry_strategy + compartment_id=compartment_id, + vcn_id=vcn_id, + retry_strategy=retry_strategy, ).data subnet_id = None for subnet in subnets: - if subnet.prohibit_internet_ingress and not private: # skip subnet if it's private - log.debug( - "Ignoring private subnet: %s [id: %s]", - subnet.display_name, - subnet.id, - ) - continue - if not subnet.prohibit_internet_ingress and private: # skip subnet if it's public - log.debug( - "Ignoring public subnet: %s [id: %s]", - subnet.display_name, - subnet.id, - ) - continue - if subnet.availability_domain and subnet.availability_domain != availability_domain: - log.debug( - "Ignoring subnet in different availability domain: %s [id: %s]", - subnet.display_name, - subnet.id, - ) - continue - if not private and not subnet.prohibit_internet_ingress: - log.info("Using public subnet: %s [id: %s]", subnet.display_name, subnet.id) - subnet_id = subnet.id - break - if private and subnet.prohibit_internet_ingress: - log.info("Using private subnet: %s [id: %s]", subnet.display_name, subnet.id) + if _subnet_is_compatible( + subnet=subnet, + availability_domain=availability_domain, + networking_config=networking_config, + ): subnet_id = subnet.id + log.info("Found compatible subnet %s [id: %s]", subnet.display_name, subnet.id) break if not subnet_id: raise PycloudlibError(f"Unable to find suitable subnet in VCN {chosen_vcn_name}") @@ -237,3 +352,38 @@ def parse_oci_config_from_env_vars() -> Optional[Dict[str, str]]: log.info("Replacing existing key_file path in OCI config") oci_config["key_file"] = key_file_path return oci_config + + +def generate_create_vnic_details( + subnet_id: str, + networking_config: Optional[NetworkingConfig] = None, +) -> oci.core.models.CreateVnicDetails: + """ + Create a VNIC details object based on the primary network configuration. + + Args: + subnet_id: The subnet id to use for the VNIC. + networking_config: The network configuration to use for the VNIC. + + Returns: + vnic_details: The VNIC details object. + """ + # default to IPv4 with public IP + # this will be used if networking_config is not provided or if set to AUTO + vnic_details = oci.core.models.CreateVnicDetails( + subnet_id=subnet_id, + assign_ipv6_ip=False, # add IPv6 address + assign_public_ip=True, # assign public IPv4 address + ) + if networking_config: + if networking_config.networking_type == NetworkingType.IPV6: + vnic_details.assign_public_ip = False + vnic_details.assign_ipv6_ip = True + elif networking_config.networking_type == NetworkingType.DUAL_STACK: + vnic_details.assign_public_ip = not networking_config.private + vnic_details.assign_ipv6_ip = True + elif networking_config.networking_type == NetworkingType.IPV4: + vnic_details.assign_public_ip = not networking_config.private + vnic_details.assign_ipv6_ip = False + log.debug("Generated VNIC details: %s", vnic_details) + return vnic_details diff --git a/pycloudlib/openstack/cloud.py b/pycloudlib/openstack/cloud.py index ca009a1e..f9861048 100644 --- a/pycloudlib/openstack/cloud.py +++ b/pycloudlib/openstack/cloud.py @@ -56,6 +56,7 @@ def delete_image(self, image_id, **kwargs): image_id: string, id of the image to delete """ self.conn.delete_image(image_id, wait=True) + self._record_image_deletion(image_id) def released_image(self, release, **kwargs): """Not supported for openstack.""" @@ -171,12 +172,13 @@ def launch( self.created_instances.append(instance) return instance - def snapshot(self, instance, clean=True, **kwargs): + def snapshot(self, instance, *, clean=True, keep=False, **kwargs): """Snapshot an instance and generate an image from it. Args: instance: Instance to snapshot clean: run instance clean method before taking snapshot + keep: keep the snapshot after the cloud instance is cleaned up Returns: An image id @@ -188,7 +190,11 @@ def snapshot(self, instance, clean=True, **kwargs): image = self.conn.create_image_snapshot( "{}-snapshot".format(self.tag), instance.server.id, wait=True ) - self.created_images.append(image.id) + self._store_snapshot_info( + snapshot_name=image.name, + snapshot_id=image.id, + keep_snapshot=keep, + ) return image.id def use_key(self, public_key_path, private_key_path=None, name=None): diff --git a/pycloudlib/openstack/instance.py b/pycloudlib/openstack/instance.py index daeee811..46f567bb 100644 --- a/pycloudlib/openstack/instance.py +++ b/pycloudlib/openstack/instance.py @@ -66,13 +66,20 @@ def _get_existing_floating_ip(self): def _create_and_attach_floating_ip(self): floating_ip = self.conn.create_floating_ip(wait=True) - tries = 30 - for _ in range(tries): + for _ in range(30): try: - self.conn.compute.add_floating_ip_to_server( - self.server, floating_ip.floating_ip_address - ) + ports = [p for p in self.conn.network.ports(device_id=self.server.id)] + if not ports: + self._log.debug(f"Server {self.name} ports not yet available; sleeping") + time.sleep(1) + continue + # Assign IP to first port on the server + self.conn.network.update_ip(floating_ip, port_id=ports[0].id) break + except ResourceNotFound as e: + if "Floating IP" in str(e): + time.sleep(1) + continue except BadRequestException as e: if "Instance network is not ready yet" in str(e): time.sleep(1) diff --git a/pycloudlib/qemu/cloud.py b/pycloudlib/qemu/cloud.py index 76939ff2..8743a168 100644 --- a/pycloudlib/qemu/cloud.py +++ b/pycloudlib/qemu/cloud.py @@ -107,6 +107,7 @@ def delete_image(self, image_id, **kwargs): image_file = Path(image_id) if image_file.exists(): image_file.unlink() + self._record_image_deletion(image_id) else: self._log.debug("Cannot delete image %s as it does not exist", image_file) @@ -542,12 +543,13 @@ def launch( return instance - def snapshot(self, instance: QemuInstance, clean=True, **kwargs) -> str: + def snapshot(self, instance: QemuInstance, *, clean=True, keep=False, **kwargs) -> str: """Snapshot an instance and generate an image from it. Args: instance: Instance to snapshot clean: run instance clean method before taking snapshot + keep: keep the snapshot after the cloud instance is cleaned up Returns: An image id @@ -596,7 +598,11 @@ def snapshot(self, instance: QemuInstance, clean=True, **kwargs) -> str: snapshot_path, instance.instance_path, ) - self.created_images.append(str(snapshot_path)) + self._store_snapshot_info( + snapshot_name=snapshot_path.stem, + snapshot_id=str(snapshot_path), + keep_snapshot=keep, + ) return str(snapshot_path) diff --git a/pycloudlib/types.py b/pycloudlib/types.py new file mode 100644 index 00000000..ec003e56 --- /dev/null +++ b/pycloudlib/types.py @@ -0,0 +1,103 @@ +# This file is part of pycloudlib. See LICENSE file for license information. +"""This module contains types and enums used by pycloudlib.""" + +import enum +from dataclasses import dataclass + + +@enum.unique +class ImageType(enum.Enum): + """Allowed image types when launching cloud images.""" + + GENERIC = "generic" + MINIMAL = "minimal" + PRO = "Pro" + PRO_FIPS = "Pro FIPS" + PRO_FIPS_UPDATES = "Pro FIPS Updates" + + +@enum.unique +class NetworkingType(enum.Enum): + """Allowed networking configurations for instances.""" + + IPV4 = "ipv4" + IPV6 = "ipv6" + DUAL_STACK = "dual-stack" + AUTO = "auto" + + def __str__(self): + """Return the string representation of NetworkingType enum.""" + return self.value + + +@dataclass +class NetworkingConfig: + """ + Dataclass for specifying or representing networking configuration. + + By default, networking_type is set to AUTO and private is set to False to allow for a publicly + accessible instance. + + Descriptions of possible configurations: + - If private is set to True, the instance will be accessible only within the cloud network. + - If networking_type is set to AUTO, the cloud provider will automatically choose the + networking configuration (default/current behavior). + - If networking_type is set to IPV4, the instance will only be assigned IPv4 addresses + (if private is False, the instance will have a public IPv4 address). + - If networking_type is set to IPV6, the instance will only be assigned IPv6 addresses + (if private is False, the instance will have a public IPv6 address). + - If networking_type is set to DUAL_STACK, the instance will be assigned both IPv4 and IPv6 + addresses (if private is False, the instance will have both public IPv4 and IPv6 addresses). + """ + + networking_type: NetworkingType = NetworkingType.AUTO + private: bool = False + + def __post_init__(self): + """Post initialization checks for NetworkingConfig.""" + if not isinstance(self.networking_type, NetworkingType): + raise ValueError("Invalid networking type provided") + if not isinstance(self.private, bool): + raise ValueError("Invalid private value provided (must be a boolean)") + + def to_dict(self) -> dict: + """Convert the NetworkingConfig to a dictionary representation.""" + return { + "networking_type": self.networking_type.value, + "private": self.private, + } + + +@dataclass +class ImageInfo: + """Dataclass that represents an image on any given cloud.""" + + image_id: str + image_name: str + + def __str__(self): + """Return a human readable string representation of the image.""" + return f"{self.image_name} [id: {self.image_id}]" + + def __repr__(self): + """Return a string representation of the image.""" + return f"ImageInfo(id={self.image_id}, name={self.image_name})" + + def __eq__(self, other): + """ + Check if two ImageInfo objects represent the same image. + + Only the id is used for comparison since this should be the unique identifier for an image. + """ + # Allow for comparing an ImageInfo object with just an ID string + if isinstance(other, str): + return self.image_id == other + # Do not allow for comparing with other types + if not isinstance(other, ImageInfo): + return False + # Check if the image IDs are the same when comparing two ImageInfo objects + return self.image_id == other.image_id + + def __dict__(self): + """Return a dictionary representation of the image.""" + return {"image_id": self.image_id, "image_name": self.image_name} diff --git a/pycloudlib/util.py b/pycloudlib/util.py index 7eef7261..de723bbd 100644 --- a/pycloudlib/util.py +++ b/pycloudlib/util.py @@ -20,6 +20,7 @@ from pycloudlib.result import Result UBUNTU_RELEASE_VERSION_MAP = { + "questing": "25.10", "plucky": "25.04", "oracular": "24.10", "noble": "24.04", diff --git a/pycloudlib/vmware/cloud.py b/pycloudlib/vmware/cloud.py index 4eb71a14..f5e3ad3c 100644 --- a/pycloudlib/vmware/cloud.py +++ b/pycloudlib/vmware/cloud.py @@ -100,6 +100,8 @@ def delete_image(self, image_id, **kwargs): except subprocess.CalledProcessError as e: if "not found" not in str(e): raise + else: + self._record_image_deletion(image_id) def daily_image(self, release: str, **kwargs): """Return released_image for VMWare. @@ -220,12 +222,13 @@ def launch( instance.start() return instance - def snapshot(self, instance, clean=True, **kwargs): + def snapshot(self, instance, *, clean=True, keep=False, **kwargs): """Snapshot an instance and generate an image from it. Args: instance: Instance to snapshot clean: run instance clean method before taking snapshot + keep: keep the snapshot after the cloud instance is cleaned up Returns: An image id @@ -246,6 +249,10 @@ def snapshot(self, instance, clean=True, **kwargs): check=True, ) - self.created_images.append(image_name) + self._store_snapshot_info( + snapshot_name=image_name, + snapshot_id=image_name, + keep_snapshot=keep, + ) return image_name diff --git a/setup.cfg b/setup.cfg index 58bc6ebc..e6239477 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,7 +34,7 @@ install_requires = googleapis-common-protos >= 1.63.1 ibm-cloud-sdk-core >= 3.14.0 ibm-platform-services - ibm-vpc >= 0.10 + ibm-vpc >= 0.10, < 0.29.0 knack >= 0.7.1 oci >= 2.17.0 openstacksdk >= 1.1.0, < 1.5.0 diff --git a/tests/integration_tests/ec2/__init__.py b/tests/integration_tests/ec2/__init__.py new file mode 100644 index 00000000..ea323f90 --- /dev/null +++ b/tests/integration_tests/ec2/__init__.py @@ -0,0 +1 @@ +"""EC2 integration tests.""" diff --git a/tests/integration_tests/ec2/test_images.py b/tests/integration_tests/ec2/test_images.py new file mode 100644 index 00000000..16720aac --- /dev/null +++ b/tests/integration_tests/ec2/test_images.py @@ -0,0 +1,51 @@ +"""EC2 integration tests testing image related functionality.""" + +import logging + +import pytest + +from pycloudlib.cloud import ImageType +from pycloudlib.ec2.cloud import EC2 + +logger = logging.getLogger(__name__) + + +@pytest.fixture +def ec2_cloud(): + """ + Fixture to create an EC2 instance for testing. + + Yields: + EC2: An instance of the EC2 cloud class. + """ + with EC2(tag="integration-test-images") as ec2: + yield ec2 + + +def test_finding_all_image_types_focal(ec2_cloud: EC2): + """ + Tests that all image types are available for the focal suite and that they are all unique. + + As per issue #481, focal has both `fips` and `fips-updates` image types and previous to + introducing the `PRO_FIPS_UPDATES` image type, the `PRO_FIPS` image type could return a + `PRO_FIPS_UPDATES` image if it was newer. This test asserts that PR #483 prevents this from + happening. + + Test assertions: + - All image types are available for the focal suite (exception is raised if not). + - No daily images returned per image type are the same (same image ID). + """ + suite = "focal" + images: dict[ImageType, str] = {} + # iterate through all ImageType enum values + for image_type in ImageType: + images[image_type] = ec2_cloud.daily_image(release=suite, image_type=image_type) + logger.info( + "Found %s image for %s: %s", + image_type, + suite, + images[image_type], + ) + + # make sure that none of the images are the same + assert len(set(images.values())) == len(images), f"Not all images are unique: {images}" diff --git a/tests/integration_tests/gce/test_images.py b/tests/integration_tests/gce/test_images.py new file mode 100644 index 00000000..3391b740 --- /dev/null +++ b/tests/integration_tests/gce/test_images.py @@ -0,0 +1,73 @@ +"""GCE integration tests testing image related functionality.""" + +import logging + +import pytest + +from pycloudlib.cloud import ImageType +from pycloudlib.errors import ImageNotFoundError +from pycloudlib.gce.cloud import GCE + +logger = logging.getLogger(__name__) + + +@pytest.fixture +def gce_cloud(): + """ + Fixture to create a GCE instance for testing. + + Yields: + GCE: An instance of the GCE cloud class. + """ + with GCE(tag="integration-test-images") as gce: + yield gce + +@pytest.mark.parametrize( + "release, unavailable_image_types", + ( + pytest.param( + "focal", + [ImageType.PRO_FIPS_UPDATES], + id="focal", + ), + pytest.param( + "jammy", + [ImageType.PRO_FIPS], + id="jammy", + ), + ), +) +def test_finding_all_image_types_focal( + gce_cloud: GCE, + release: str, + unavailable_image_types: list[ImageType], +): + """ + Tests that all image types are available for the focal suite and that they are all unique. + + Test assertions: + - Certain image types are unavailable for the given release (exception is raised if not). + - No daily images returned per image type are the same (same image ID). + """ + images: dict[ImageType, str] = {} + # iterate through all ImageType enum values + for image_type in ImageType: + if image_type in unavailable_image_types: + with pytest.raises(ImageNotFoundError) as exc_info: + gce_cloud.daily_image(release=release, image_type=image_type) + logger.info( + "Confirmed that %s image for %s is unavailable.", + image_type, + release, + ) + else: + images[image_type] = gce_cloud.daily_image(release=release, image_type=image_type) + logger.info( + "Found %s image for %s: %s", + image_type, + release, + images[image_type], + ) + + # make sure that none of the images are the same + assert len(set(images.values())) == len(images), f"Not all images are unique: {images}" diff --git a/tests/integration_tests/oracle/test_launch.py b/tests/integration_tests/oracle/test_launch.py new file mode 100644 index 00000000..0b46a665 --- /dev/null +++ b/tests/integration_tests/oracle/test_launch.py @@ -0,0 +1,161 @@ +""" +Integration test that exercise functionality specific to Oracle's launch function. + +The basic lifecycle stuff is already tested in `tests/integration_tests/test_public_api.py`, but +these tests go beyond the standard tests that exercise the base cloud agnostic functionality. +""" + +import json +import logging + +import pytest + +from pycloudlib.oci.cloud import OCI +from pycloudlib.types import NetworkingConfig, NetworkingType + +logger = logging.getLogger(__name__) + + +# create fixture that provides the oracle cloud object +@pytest.fixture(scope="module") +def oracle_cloud(): + """Provide an OCI cloud instance for tests with automatic cleanup. + + Returns: + An OCI cloud instance configured for testing. + """ + # make sure region, AD, and compartment_id are set in your pycloudlib.toml config file + # use context manager - instances will be deleted automatically after the test + with OCI( + tag="oracle-integrations-test-launch", + ) as oracle_cloud: + yield oracle_cloud + + +class TestOracleLaunch: + """ + Test Oracle Cloud Infrastructure instance launch functionality. + + This class contains tests specific to the OCI launch method, + including various network configurations. + """ + + @pytest.mark.parametrize( + ("instance_type",), + [ + pytest.param( + "VM.Standard2.1", + id="VM", + ), + pytest.param( + "BM.Optimized3.36", + id="BM", + ), + ], + ) + @pytest.mark.parametrize( + ( + "primary_private", + "primary_networking_type", + "secondary_private", + "secondary_networking_type", + ), + [ + # both public ipv4 + pytest.param( + False, + NetworkingType.IPV4, + True, + NetworkingType.IPV4, + id="both_public_ipv4", + ), + # both public ipv6 + pytest.param( + False, + NetworkingType.IPV6, + True, + NetworkingType.IPV6, + id="both_public_ipv6", + ), + # primary public dual stack, secondary private ipv4 + pytest.param( + False, + NetworkingType.DUAL_STACK, + True, + NetworkingType.DUAL_STACK, + id="public_dual_stack_private_dual_stack", + ), + ], + ) + def test_launch_with_networking_configs( + self, + oracle_cloud: OCI, + primary_private: bool, + primary_networking_type: NetworkingType, + secondary_private: bool, + secondary_networking_type: NetworkingType, + instance_type: str, + ): + """Test OCI instance launch with various networking configurations. + + This test verifies that instances can be launched with different + combinations of networking configurations (IPv4, IPv6, dual-stack) + for both primary and secondary network interfaces. + + Args: + oracle_cloud (OCI): The OCI cloud fixture. + primary_private (bool): Whether primary NIC should be private. + primary_networking_type (NetworkingType): Network type for primary NIC. + secondary_private (bool): Whether secondary NIC should be private. + secondary_networking_type (NetworkingType): Network type for secondary NIC. + instance_type (str): OCI instance type to launch. + + Test Steps: + 1. Launch an instance with the specified primary networking configuration and + instance type + 2. Add a secondary network interface with the specified secondary networking + configuration + 3. Restart the instance to apply the changes (As of 20250226 cloud-init does not support + hotplugging nics on Oracle) + 4. Verify that the instance has the expected number of VNICs in IMDS + """ + primary_networking_config = NetworkingConfig( + private=primary_private, + networking_type=primary_networking_type, + ) + + logger.info("Launching instance...") + instance = oracle_cloud.launch( + image_id="ocid1.image.oc1.iad.aaaaaaaasukfowgzghuwrljl4ohlpv3uadhm5sn5dderkhhyymelebrzoima", + primary_network_config=primary_networking_config, + instance_type=instance_type, + ) + logger.info("Instance launched. Waiting for instance to be ready...") + instance.wait() + logger.info("Instance is ready!") + assert instance.execute("true").ok + + if primary_networking_config.networking_type == NetworkingType.IPV6: + imds_vnics_url = "curl http://[fd00:c1::a9fe:a9fe]/opc/v1/vnics" + else: + imds_vnics_url = "curl http://169.254.169.254/opc/v1/vnics" + + secondary_networking_config = NetworkingConfig( + private=secondary_private, + networking_type=secondary_networking_type, + ) + instance.add_network_interface( + nic_index=(1 if instance_type == "BM.Optimized3.36" else 0), + networking_config=secondary_networking_config, + ) + + # run cloud-init clean and restart instance now that secondary NIC has been added + logger.info("Running cloud-init clean and restarting instance...") + instance.execute("cloud-init clean", use_sudo=True) + instance.restart(wait=True) + + logger.info("Getting VNIC data from IMDS at '%s'...", imds_vnics_url) + imds_response_2 = instance.execute(f"curl -s {imds_vnics_url}").stdout + vnic_data_2 = json.loads(imds_response_2) + logger.info("VNIC data from IMDS after adding secondary NIC: %s", imds_response_2) + assert len(vnic_data_2) == 2, "Expected IMDS to return 2 VNICs after adding secondary NIC" diff --git a/tests/integration_tests/oracle/test_utils.py b/tests/integration_tests/oracle/test_utils.py new file mode 100644 index 00000000..9b1b47f4 --- /dev/null +++ b/tests/integration_tests/oracle/test_utils.py @@ -0,0 +1,98 @@ +"""Integration tests for Oracle's utility functions.""" + +import logging + +import pytest + +from pycloudlib.oci.cloud import OCI +from pycloudlib.types import NetworkingConfig, NetworkingType + +logger = logging.getLogger(__name__) + + +@pytest.fixture +def oci_cloud(): + """Fixture to create an OCI cloud instance.""" + with OCI( + tag="oracle-integrations-test-utils", + vcn_name="ipv6-vcn", + region="us-ashburn-1", + compartment_id="ocid1.compartment.oc1..aaaaaaaayyvhlkxdjkhzu56is7qenv35h4jfh26oconxsro4qr2qx6ezgbpq", + availability_domain="qIZq:US-ASHBURN-AD-2", + ) as oracle_cloud: + yield oracle_cloud + + +# These are pre-existing subnets that I have created in my Oracle Cloud account. +# this is not immediately reproducible by others, but all they need to do is create 3 subnets +# that match the below configurations and update the following variables with the new subnet ids. +# This is the only way I could feel confident that my subnet selection logic is working with the +# new networking configuration options as expected. +IPV6_PUBLIC_SUBNET_ID = ( + "ocid1.subnet.oc1.iad.FILL_THIS_IN" +) +DUAL_STACK_PUBLIC_SUBNET_ID = ( + "ocid1.subnet.oc1.iad.FILL_THIS_IN" +) +DUAL_STACK_PRIVATE_SUBNET_ID = ( + "ocid1.subnet.oc1.iad.FILL_THIS_IN" +) + + +@pytest.mark.parametrize( + ["networking_type", "private", "expected_subnet_id"], + [ + pytest.param( + NetworkingType.IPV6, + False, + IPV6_PUBLIC_SUBNET_ID, + id="ipv6_public", + ), + pytest.param( + NetworkingType.DUAL_STACK, + True, + DUAL_STACK_PRIVATE_SUBNET_ID, + id="dual_stack_private", + ), + pytest.param( + NetworkingType.DUAL_STACK, + False, + DUAL_STACK_PUBLIC_SUBNET_ID, + id="dual_stack_public", + ), + pytest.param( + NetworkingType.IPV4, + False, + DUAL_STACK_PUBLIC_SUBNET_ID, + id="ipv4_public", + ), + pytest.param( + NetworkingType.IPV4, + True, + DUAL_STACK_PRIVATE_SUBNET_ID, + id="ipv4_private", + ), + ], +) +def test_oci_subnet_finding(oci_cloud: OCI, networking_type, private, expected_subnet_id): + """ + Test finding a subnet in OCI. + + We are validating that the correct subnet is found based on the type of networking and whether + the instance should be publicly accessible or not. + """ + network_config: NetworkingConfig = NetworkingConfig( + networking_type=networking_type, + private=private, + ) + subnet_id = oci_cloud.find_compatible_subnet( + networking_config=network_config, + ) + + logger.info( + f"Found subnet ID: {subnet_id} for networking type: {networking_type} " + f"and privacy: {private}" + ) + assert subnet_id == expected_subnet_id, ( + f"Expected subnet ID: {expected_subnet_id} but got: {subnet_id}", + ) diff --git a/tests/integration_tests/test_public_api.py b/tests/integration_tests/test_public_api.py index da05df0f..b734f449 100644 --- a/tests/integration_tests/test_public_api.py +++ b/tests/integration_tests/test_public_api.py @@ -8,7 +8,8 @@ import pytest import pycloudlib -from pycloudlib.cloud import BaseCloud, ImageType +from pycloudlib.cloud import BaseCloud +from pycloudlib.types import ImageType from pycloudlib.instance import BaseInstance from pycloudlib.util import LTS_RELEASES, UBUNTU_RELEASE_VERSION_MAP diff --git a/tests/unit_tests/ec2/test_cloud.py b/tests/unit_tests/ec2/test_cloud.py index 0e6dc30c..937a6d6a 100644 --- a/tests/unit_tests/ec2/test_cloud.py +++ b/tests/unit_tests/ec2/test_cloud.py @@ -3,7 +3,7 @@ import mock import pytest -from pycloudlib.cloud import ImageType +from pycloudlib.types import ImageType from pycloudlib.ec2.cloud import EC2 # mock module path @@ -70,9 +70,16 @@ class TestEC2: "noble", ImageType.PRO_FIPS, False, - "ubuntu-pro-fips*/images/hvm-ssd-gp3/ubuntu-noble-24.04-*", + "ubuntu-pro-fips-server/images/hvm-ssd-gp3/ubuntu-noble-24.04-*", id="pro-fips-lts", ), + pytest.param( + "jammy", + ImageType.PRO_FIPS_UPDATES, + False, + "ubuntu-pro-fips-updates-server/images/hvm-ssd/ubuntu-jammy-22.04-*", + id="pro-fips-updates-lts", + ), # Test GENERIC with non-LTS release and daily = False pytest.param( "oracular", @@ -140,6 +147,7 @@ def test_get_owner_for_all_image_types(self): ImageType.MINIMAL: "099720109477", ImageType.PRO: "099720109477", ImageType.PRO_FIPS: "aws-marketplace", + ImageType.PRO_FIPS_UPDATES: "aws-marketplace", } ec2 = FakeEC2() diff --git a/tests/unit_tests/gce/test_cloud.py b/tests/unit_tests/gce/test_cloud.py index ae9e6264..ae5ccc41 100644 --- a/tests/unit_tests/gce/test_cloud.py +++ b/tests/unit_tests/gce/test_cloud.py @@ -3,7 +3,7 @@ import mock import pytest -from pycloudlib.cloud import ImageType +from pycloudlib.types import ImageType from pycloudlib.gce.cloud import GCE from pycloudlib.result import Result @@ -305,6 +305,11 @@ def test_daily_image_returns_latest_from_query( # noqa: D102 ImageType.PRO_FIPS, "ubuntu-pro-fips-2004-focal-*", ), + pytest.param( + "jammy", + ImageType.PRO_FIPS_UPDATES, + "ubuntu-pro-fips-updates-2204-jammy-*", + ), ], ) def test_get_name_filter(self, release, image_type, expected_name_filter, gce): diff --git a/tests/unit_tests/ibm/test_instance.py b/tests/unit_tests/ibm/test_instance.py index ff68791f..ea573a07 100644 --- a/tests/unit_tests/ibm/test_instance.py +++ b/tests/unit_tests/ibm/test_instance.py @@ -3,7 +3,7 @@ import pytest from unittest import mock -from pycloudlib.ibm.instance import IBMInstance, _IBMInstanceType +from pycloudlib.ibm.instance import IBMInstance, _IBMInstanceType, _Status SAMPLE_RAW_INSTANCE = { "id": "ibm1", @@ -39,6 +39,7 @@ def test_type_from_raw_instance(self, client, raw_instance, inst_id, zone_id, in assert zone_id == inst.zone assert inst_type == inst._ibm_instance_type + @mock.patch("time.sleep") # bypass the time.sleep call in _attach_floating_ip() @mock.patch(M_PATH + "VpcV1", autospec=True) def test_attach_floating_ip(self, m_sleep, client, caplog): @@ -94,3 +95,55 @@ def test_attach_floating_ip(self, m_sleep, client, caplog): assert inst._floating_ip["id"] == fi_id assert inst._floating_ip["name"] == fi_name + + + @pytest.mark.parametrize( + "kwargs, expected_timeout", + [ + pytest.param({}, 900, id="default_timeout"), + pytest.param({"start_timeout": 123}, 123, id="custom_timeout"), + ], + ) + @mock.patch(M_PATH + "VpcV1", autospec=True) + def test_wait_for_instance_start(self, m_client, kwargs, expected_timeout, caplog): + """Test _wait_for_instance_start calls _wait_for_status correctly.""" + inst = IBMInstance.from_raw_instance( + key_pair=None, + client=m_client, + instance=SAMPLE_RAW_INSTANCE, + ) + + with mock.patch.object(inst, "_wait_for_status") as m_wait_for_status: + inst._wait_for_instance_start(**kwargs) + assert "Waiting for instance to finish provisioning." in caplog.text + + m_wait_for_status.assert_called_once_with( + _Status.RUNNING, + sleep_seconds=expected_timeout, + side_effect_fn=inst._check_instance_failed_status, + ) + + + @mock.patch(M_PATH + "VpcV1", autospec=True) + def test_wait_passes_kwargs(self, m_client): + """Test that wait() correctly passes kwargs to internal methods.""" + inst = IBMInstance.from_raw_instance( + key_pair=None, + client=m_client, + instance=SAMPLE_RAW_INSTANCE, + ) + custom_timeout = 123 + + with mock.patch.multiple( + inst, + _wait_for_instance_start=mock.DEFAULT, + _wait_for_execute=mock.DEFAULT, + _wait_for_cloudinit=mock.DEFAULT, + ) as mocks: + inst.wait(start_timeout=custom_timeout) + + mocks["_wait_for_instance_start"].assert_called_once_with( + start_timeout=custom_timeout + ) + mocks["_wait_for_execute"].assert_called_once() + mocks["_wait_for_cloudinit"].assert_called_once() diff --git a/tests/unit_tests/lxd/test_cloud.py b/tests/unit_tests/lxd/test_cloud.py index c6b4c214..336eda5a 100644 --- a/tests/unit_tests/lxd/test_cloud.py +++ b/tests/unit_tests/lxd/test_cloud.py @@ -6,7 +6,7 @@ import pytest -from pycloudlib.cloud import ImageType +from pycloudlib.types import ImageType from pycloudlib.lxd.cloud import LXDContainer, LXDVirtualMachine M_PATH = "pycloudlib.lxd.cloud." diff --git a/tests/unit_tests/lxd/test_images.py b/tests/unit_tests/lxd/test_images.py index e59cbdd4..287b7317 100644 --- a/tests/unit_tests/lxd/test_images.py +++ b/tests/unit_tests/lxd/test_images.py @@ -5,7 +5,7 @@ import pytest -from pycloudlib.cloud import ImageType +from pycloudlib.types import ImageType from pycloudlib.lxd import _images M_PATH = "pycloudlib.lxd._images." diff --git a/tests/unit_tests/oci/test_cloud.py b/tests/unit_tests/oci/test_cloud.py index b32a7d2a..d73f7920 100644 --- a/tests/unit_tests/oci/test_cloud.py +++ b/tests/unit_tests/oci/test_cloud.py @@ -290,6 +290,64 @@ def test_launch_instance(self, mock_wait_till_ready, oci_cloud, oci_mock): mock_network_client.list_subnets.assert_called_once() assert oci_cloud.get_instance.call_count == 2 + # Ensure when a subnet_id is directly passed to launch + # no functions to obtain a subnet-id are called. + with mock.patch("pycloudlib.oci.cloud.get_subnet_id") as m_subnet_id, \ + mock.patch("pycloudlib.oci.cloud.get_subnet_id_by_name") as m_subnet_name: + instance = oci_cloud.launch( + "test-image-id", instance_type="VM.Standard2.1", subnet_id="subnet-id" + ) + m_subnet_name.assert_not_called() + m_subnet_id.assert_not_called() + + # The first arg is the LaunchInstanceDetails object + args, _ = oci_cloud.compute_client.launch_instance.call_args + launch_instance_details = args[0] + assert launch_instance_details.subnet_id == "subnet-id" + assert oci_cloud.get_instance.call_count == 3 + + + @mock.patch("pycloudlib.oci.cloud.wait_till_ready") + def test_launch_custom_metadata(self, mock_wait_till_ready, oci_cloud): + """Test launch method with valid inputs.""" + # mock the key pair + oci_cloud.key_pair = mock.Mock(public_key_config="ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC") + oci_cloud.compute_client.launch_instance.return_value = mock.Mock( + data=mock.Mock(id="instance-id") + ) + oci_cloud.get_instance = mock.Mock(return_value=mock.Mock()) + + # Ensure metdata gets combined with defaults + metadata = {"metadata_key": "metadata_value"} + default_metadata = {"ssh_authorized_keys": oci_cloud.key_pair.public_key_content} + expected_metadata = {**default_metadata, **metadata} + instance = oci_cloud.launch( + "test-image-id", instance_type="VM.Standard2.1", subnet_id="subnet-id", + metadata=metadata, + ) + + # The first arg is the LaunchInstanceDetails object + args, _ = oci_cloud.compute_client.launch_instance.call_args + launch_instance_details = args[0] + assert launch_instance_details.metadata == expected_metadata + assert instance is not None + oci_cloud.get_instance.assert_called_once() + + # Ensure default_metadata values can be overridden + metadata = {"ssh_authorized_keys": "override", "metadata_key": "metadata_value"} + expected_metadata = {**default_metadata, **metadata} + instance = oci_cloud.launch( + "test-image-id", instance_type="VM.Standard2.1", subnet_id="subnet-id", + metadata=metadata, + ) + + # The first arg is the LaunchInstanceDetails object + args, _ = oci_cloud.compute_client.launch_instance.call_args + launch_instance_details = args[0] + assert launch_instance_details.metadata == expected_metadata + assert oci_cloud.get_instance.call_count == 2 + + def test_launch_instance_invalid_image(self, oci_cloud): """Test launch method raises ValueError when no image_id is provided.""" with pytest.raises(ValueError, match="launch requires image_id param"): diff --git a/tests/unit_tests/oci/test_instance.py b/tests/unit_tests/oci/test_instance.py index e9046903..c16d74a4 100644 --- a/tests/unit_tests/oci/test_instance.py +++ b/tests/unit_tests/oci/test_instance.py @@ -197,7 +197,7 @@ def mock_vnic_pagination(self): def test_add_network_interface(self, setup_vnic_mocks): """Test add_network_interface() method.""" - oci_instance = setup_vnic_mocks + oci_instance: OciInstance = setup_vnic_mocks # Call add_network_interface and check result result = oci_instance.add_network_interface() diff --git a/tests/unit_tests/oci/test_utils.py b/tests/unit_tests/oci/test_utils.py index 1ed2a6b3..32eee02f 100644 --- a/tests/unit_tests/oci/test_utils.py +++ b/tests/unit_tests/oci/test_utils.py @@ -5,7 +5,9 @@ get_subnet_id_by_name, parse_oci_config_from_env_vars, _load_and_preprocess_oci_toml_file, + generate_create_vnic_details, ) +from pycloudlib.types import NetworkingConfig, NetworkingType from pycloudlib.errors import PycloudlibError from oci.retry import DEFAULT_RETRY_STRATEGY # pylint: disable=E0611,E0401 import os @@ -34,9 +36,9 @@ def test_get_subnet_id_fails_with_vcn_name_not_found(self, setup_environment): network_client.list_vcns.return_value.data = [] with pytest.raises(PycloudlibError, match="Unable to determine vcn name"): get_subnet_id( - network_client, - compartment_id, - availability_domain, + network_client=network_client, + compartment_id=compartment_id, + availability_domain=availability_domain, vcn_name="vcn_name", ) @@ -46,9 +48,9 @@ def test_get_subnet_id_fails_with_multiple_vcns_found(self, setup_environment): network_client.list_vcns.return_value.data = [MagicMock(), MagicMock()] with pytest.raises(PycloudlibError, match="Found multiple vcns with name"): get_subnet_id( - network_client, - compartment_id, - availability_domain, + network_client=network_client, + compartment_id=compartment_id, + availability_domain=availability_domain, vcn_name="vcn_name", ) @@ -57,7 +59,11 @@ def test_get_subnet_id_fails_with_no_vcns_found(self, setup_environment): network_client, compartment_id, availability_domain = setup_environment network_client.list_vcns.return_value.data = [] with pytest.raises(PycloudlibError, match="No VCNs found in compartment"): - get_subnet_id(network_client, compartment_id, availability_domain) + get_subnet_id( + network_client=network_client, + compartment_id=compartment_id, + availability_domain=availability_domain, + ) def test_get_subnet_id_fails_with_no_suitable_subnet_found(self, setup_environment): """Test get_subnet_id fails when no suitable subnet is found.""" @@ -68,7 +74,11 @@ def test_get_subnet_id_fails_with_no_suitable_subnet_found(self, setup_environme network_client.list_vcns.return_value.data = [vcn] network_client.list_subnets.return_value.data = [] with pytest.raises(PycloudlibError, match="Unable to find suitable subnet in VCN"): - get_subnet_id(network_client, compartment_id, availability_domain) + get_subnet_id( + network_client=network_client, + compartment_id=compartment_id, + availability_domain=availability_domain, + ) def test_get_subnet_id_fails_with_private_subnet(self, setup_environment): """Test that passing private=False ignores private subnets.""" @@ -82,10 +92,10 @@ def test_get_subnet_id_fails_with_private_subnet(self, setup_environment): network_client.list_subnets.return_value.data = [subnet] with pytest.raises(PycloudlibError, match="Unable to find suitable subnet"): get_subnet_id( - network_client, - compartment_id, - availability_domain, - private=False, + network_client=network_client, + compartment_id=compartment_id, + availability_domain=availability_domain, + networking_config=NetworkingConfig(private=False), ) def test_get_subnet_id_fails_with_different_availability_domain(self, setup_environment): @@ -100,7 +110,11 @@ def test_get_subnet_id_fails_with_different_availability_domain(self, setup_envi subnet.availability_domain = "different_availability_domain" network_client.list_subnets.return_value.data = [subnet] with pytest.raises(PycloudlibError, match="Unable to find suitable subnet in VCN"): - get_subnet_id(network_client, compartment_id, availability_domain) + get_subnet_id( + network_client=network_client, + compartment_id=compartment_id, + availability_domain=availability_domain, + ) def test_get_subnet_id_suceeds_without_vcn_name(self, setup_environment): """Test get_subnet_id suceeds without specifying a VCN name.""" @@ -117,11 +131,17 @@ def test_get_subnet_id_suceeds_without_vcn_name(self, setup_environment): subnet.availability_domain = None subnet.id = "subnet_id" network_client.list_subnets.return_value.data = [subnet] - result = get_subnet_id(network_client, compartment_id, availability_domain) + result = get_subnet_id( + network_client=network_client, + compartment_id=compartment_id, + availability_domain=availability_domain, + ) assert result == "subnet_id" # Ensure that list_subnets is called with the first VCN's ID, not the second network_client.list_subnets.assert_called_with( - compartment_id, vcn_id="vcn1_id", retry_strategy=DEFAULT_RETRY_STRATEGY + compartment_id=compartment_id, + vcn_id="vcn1_id", + retry_strategy=DEFAULT_RETRY_STRATEGY, ) def test_get_subnet_id_suceeds_with_vcn_name(self, setup_environment): @@ -137,15 +157,17 @@ def test_get_subnet_id_suceeds_with_vcn_name(self, setup_environment): subnet.id = "subnet_id" network_client.list_subnets.return_value.data = [subnet] result = get_subnet_id( - network_client, - compartment_id, - availability_domain, + network_client=network_client, + compartment_id=compartment_id, + availability_domain=availability_domain, vcn_name="vcn_name", ) assert result == "subnet_id" # Ensure that list_subnets is called with the specified VCN's ID network_client.list_subnets.assert_called_with( - compartment_id, vcn_id="vcn_id", retry_strategy=DEFAULT_RETRY_STRATEGY + compartment_id=compartment_id, + vcn_id="vcn_id", + retry_strategy=DEFAULT_RETRY_STRATEGY, ) def test_get_subnet_id_succeeds_with_private_subnet(self, setup_environment): @@ -164,7 +186,12 @@ def test_get_subnet_id_succeeds_with_private_subnet(self, setup_environment): public_subnet.availability_domain = None public_subnet.id = "public_subnet_id" network_client.list_subnets.return_value.data = [public_subnet, private_subnet] - result = get_subnet_id(network_client, compartment_id, availability_domain, private=True) + result = get_subnet_id( + network_client=network_client, + compartment_id=compartment_id, + availability_domain=availability_domain, + networking_config=NetworkingConfig(private=True), + ) assert result == "private_subnet_id" @@ -402,3 +429,165 @@ def test_get_subnet_id_by_name_succeeds(self): network_client.list_subnets.return_value.data = [subnet_mock] result = get_subnet_id_by_name(network_client, "compartment_id", "single_subnet") assert result == "subnet_id" + + +class TestGetSubnetIdParameterized: + @pytest.fixture + def setup_environment(self): + """Set up the test environment.""" + network_client = MagicMock() + compartment_id = "compartment_id" + availability_domain = "availability_domain" + vcn = MagicMock() + vcn.id = "vcn_id" + vcn.display_name = "vcn_name" + network_client.list_vcns.return_value.data = [vcn] + return network_client, compartment_id, availability_domain, vcn + + @pytest.mark.parametrize( + "networking_type, private, expected_subnet_id", + [ + (NetworkingType.IPV4, False, "ipv4_public_subnet_id"), + (NetworkingType.IPV4, True, "ipv4_private_subnet_id"), + (NetworkingType.IPV6, False, "ipv6_public_subnet_id"), + (NetworkingType.IPV6, True, "ipv6_private_subnet_id"), + (NetworkingType.DUAL_STACK, False, "dual_stack_public_subnet_id"), + (NetworkingType.DUAL_STACK, True, "dual_stack_private_subnet_id"), + ], + ) + def test_get_subnet_id_parameterized( + self, + setup_environment, + networking_type, + private, + expected_subnet_id, + ): + """Test get_subnet_id with different networking types and private flag.""" + ( + network_client, + compartment_id, + availability_domain, + vcn, + ) = setup_environment + + # Create subnet mocks based on the parameters + ipv4_public_subnet = MagicMock() + ipv4_public_subnet.availability_domain = None + ipv4_public_subnet.prohibit_internet_ingress = False + ipv4_public_subnet.cidr_block = "10.0.0.0/24" + ipv4_public_subnet.ipv6_cidr_block = None + ipv4_public_subnet.id = "ipv4_public_subnet_id" + + ipv4_private_subnet = MagicMock() + ipv4_private_subnet.availability_domain = None + ipv4_private_subnet.prohibit_internet_ingress = True + ipv4_private_subnet.cidr_block = "10.0.1.0/24" + ipv4_private_subnet.ipv6_cidr_block = None + ipv4_private_subnet.id = "ipv4_private_subnet_id" + + ipv6_public_subnet = MagicMock() + ipv6_public_subnet.availability_domain = None + ipv6_public_subnet.prohibit_internet_ingress = False + ipv6_public_subnet.cidr_block = None + ipv6_public_subnet.ipv6_cidr_block = "2603:c020:400d:5d7e::/64" + ipv6_public_subnet.id = "ipv6_public_subnet_id" + + ipv6_private_subnet = MagicMock() + ipv6_private_subnet.availability_domain = None + ipv6_private_subnet.prohibit_internet_ingress = True + ipv6_private_subnet.cidr_block = None + ipv6_private_subnet.ipv6_cidr_block = "2603:c020:400d:5d7f::/64" + ipv6_private_subnet.id = "ipv6_private_subnet_id" + + dual_stack_public_subnet = MagicMock() + dual_stack_public_subnet.availability_domain = None + dual_stack_public_subnet.prohibit_internet_ingress = False + dual_stack_public_subnet.cidr_block = "10.0.2.0/24" + dual_stack_public_subnet.ipv6_cidr_block = "2603:c020:400d:5d80::/64" + dual_stack_public_subnet.id = "dual_stack_public_subnet_id" + + dual_stack_private_subnet = MagicMock() + dual_stack_private_subnet.availability_domain = None + dual_stack_private_subnet.prohibit_internet_ingress = True + dual_stack_private_subnet.cidr_block = "10.0.3.0/24" + dual_stack_private_subnet.ipv6_cidr_block = "2603:c020:400d:5d81::/64" + dual_stack_private_subnet.id = "dual_stack_private_subnet_id" + + network_client.list_subnets.return_value.data = [ + ipv4_public_subnet, + ipv4_private_subnet, + ipv6_public_subnet, + ipv6_private_subnet, + dual_stack_public_subnet, + dual_stack_private_subnet, + ] + + networking_config = NetworkingConfig( + networking_type=networking_type, + private=private, + ) + + result = get_subnet_id( + network_client=network_client, + compartment_id=compartment_id, + availability_domain=availability_domain, + networking_config=networking_config, + ) + + assert result == expected_subnet_id + + +class TestGenerateCreateVnicDetails: + subnet_id = "subnet_id" + + def test_generate_create_vnic_details_default(self): + """Test generate_create_vnic_details with default parameters.""" + + vnic_details = generate_create_vnic_details(self.subnet_id) + assert vnic_details.subnet_id == self.subnet_id + assert vnic_details.assign_ipv6_ip is False + assert vnic_details.assign_public_ip is True + + def test_generate_create_vnic_details_ipv4_public(self): + """Test generate_create_vnic_details with IPv4 public configuration.""" + networking_config = NetworkingConfig(networking_type=NetworkingType.IPV4, private=False) + vnic_details = generate_create_vnic_details(self.subnet_id, networking_config) + assert vnic_details.subnet_id == self.subnet_id + assert vnic_details.assign_ipv6_ip is False + assert vnic_details.assign_public_ip is True + + def test_generate_create_vnic_details_ipv4_private(self): + """Test generate_create_vnic_details with IPv4 private configuration.""" + networking_config = NetworkingConfig(networking_type=NetworkingType.IPV4, private=True) + vnic_details = generate_create_vnic_details(self.subnet_id, networking_config) + assert vnic_details.subnet_id == self.subnet_id + assert vnic_details.assign_ipv6_ip is False + assert vnic_details.assign_public_ip is False + + def test_generate_create_vnic_details_ipv6(self): + """Test generate_create_vnic_details with IPv6 configuration.""" + networking_config = NetworkingConfig(networking_type=NetworkingType.IPV6) + vnic_details = generate_create_vnic_details(self.subnet_id, networking_config) + assert vnic_details.subnet_id == self.subnet_id + assert vnic_details.assign_ipv6_ip is True + assert vnic_details.assign_public_ip is False + + def test_generate_create_vnic_details_dual_stack_public(self): + """Test generate_create_vnic_details with dual stack public configuration.""" + networking_config = NetworkingConfig( + networking_type=NetworkingType.DUAL_STACK, private=False + ) + vnic_details = generate_create_vnic_details(self.subnet_id, networking_config) + assert vnic_details.subnet_id == self.subnet_id + assert vnic_details.assign_ipv6_ip is True + assert vnic_details.assign_public_ip is True + + def test_generate_create_vnic_details_dual_stack_private(self): + """Test generate_create_vnic_details with dual stack private configuration.""" + networking_config = NetworkingConfig( + networking_type=NetworkingType.DUAL_STACK, private=True + ) + vnic_details = generate_create_vnic_details(self.subnet_id, networking_config) + assert vnic_details.subnet_id == self.subnet_id + assert vnic_details.assign_ipv6_ip is True + assert vnic_details.assign_public_ip is False diff --git a/tests/unit_tests/openstack/test_instance.py b/tests/unit_tests/openstack/test_instance.py index 78b3ca7d..a26016b8 100644 --- a/tests/unit_tests/openstack/test_instance.py +++ b/tests/unit_tests/openstack/test_instance.py @@ -1,5 +1,7 @@ """Openstack instance tests.""" +import pytest + from unittest import mock from pycloudlib.openstack.instance import OpenstackInstance @@ -37,38 +39,45 @@ ] -@mock.patch("pycloudlib.openstack.instance.OpenstackInstance._create_and_attach_floating_ip") class TestAttachFloatingIp: """Ensure we create/use floating IPs accordingly.""" - def test_existing_floating_ip(self, m_create): - """Test that if a server has an existing floating IP, we use it.""" - m_connection = mock.Mock() - m_server = m_connection.compute.get_server.return_value + @pytest.fixture(autouse=True) + def setup_connection(self): + self.conn = mock.Mock() + m_server = self.conn.compute.get_server.return_value m_server.addresses = SERVER_ADDRESSES - m_connection.network.ips.return_value = NETWORK_IPS + m_create_floating_ip = self.conn.create_floating_ip.return_value + m_create_floating_ip.floating_ip_address = "10.42.42.42" + self.conn.network.ports.return_value = [ + mock.Mock(id="port1"), mock.Mock(id="port2") + ] + + def test_existing_floating_ip(self): + """Test that if a server has an existing floating IP, we use it.""" + self.conn.network.ips.return_value = NETWORK_IPS instance = OpenstackInstance( key_pair=None, instance_id=None, network_id=None, - connection=m_connection, + connection=self.conn, ) assert "10.0.0.3" == instance.floating_ip["floating_ip_address"] - assert 0 == m_create.call_count + assert 0 == self.conn.create_floating_ip.call_count - def test_no_matching_floating_ip(self, m_create): + def test_no_matching_floating_ip(self): """Test that if a server doesn't have a floating IP, we create it.""" - m_connection = mock.Mock() - m_server = m_connection.compute.get_server.return_value = mock.Mock() - m_server.addresses = SERVER_ADDRESSES - m_connection.network.ips.return_value = [] + self.conn.network.ips.return_value = [] instance = OpenstackInstance( key_pair=None, instance_id=None, network_id=None, - connection=m_connection, + connection=self.conn, + ) + assert instance.floating_ip is self.conn.create_floating_ip.return_value + assert 1 == self.conn.create_floating_ip.call_count + self.conn.network.update_ip.assert_called_once_with( + self.conn.create_floating_ip.return_value, port_id='port1' ) - assert instance.floating_ip is m_create.return_value - assert 1 == m_create.call_count diff --git a/tests/unit_tests/test_cloud.py b/tests/unit_tests/test_cloud.py index 158e6703..a9cf44c3 100644 --- a/tests/unit_tests/test_cloud.py +++ b/tests/unit_tests/test_cloud.py @@ -40,7 +40,7 @@ def get_instance(self, instance_id): def launch(self, image_id, instance_type=None, user_data=None, **kwargs): """Skeletal launch.""" - def snapshot(self, instance, clean=True, **kwargs): + def snapshot(self, instance, *, clean=True, keep=False, **kwargs): """Skeletal snapshot.""" def list_keys(self): @@ -237,3 +237,85 @@ def test_validate_tag(self, tag: str, rules_failed: List[str]): assert tag in str(exc_info.value) for rule in rules_failed: assert rule in str(exc_info.value) + + +class TestSnapshotHelpers: + """ + Tests covering both the _store_snapshot_info and _record_image_deletion methods of BaseCloud. + """ + + @pytest.fixture + def cloud(self): + """Fixture to create a CloudSubclass instance for testing.""" + return CloudSubclass(tag="tag", timestamp_suffix=False, config_file=StringIO(CONFIG)) + + def test_store_snapshot_info_temporary(self, cloud, caplog): + """Test storing snapshot info as temporary.""" + snapshot_id = "snap-123" + snapshot_name = "snapshot-temp" + keep_snapshot = False + + caplog.set_level(logging.DEBUG) + image_info = cloud._store_snapshot_info(snapshot_id, snapshot_name, keep_snapshot) + + assert image_info.image_id == snapshot_id + assert image_info.image_name == snapshot_name + assert image_info in cloud.created_images + assert image_info not in cloud.preserved_images + assert f"Created temporary snapshot {image_info}" in caplog.text + + def test_store_snapshot_info_permanent(self, cloud, caplog): + """Test storing snapshot info as permanent.""" + snapshot_id = "snap-456" + snapshot_name = "snapshot-perm" + keep_snapshot = True + + caplog.set_level(logging.DEBUG) + image_info = cloud._store_snapshot_info(snapshot_id, snapshot_name, keep_snapshot) + + assert image_info.image_id == snapshot_id + assert image_info.image_name == snapshot_name + assert image_info not in cloud.created_images + assert image_info in cloud.preserved_images + assert f"Created permanent snapshot {image_info}" in caplog.text + + def test_record_image_deletion_created_image(self, cloud, caplog): + """Test recording deletion of a created image.""" + snapshot_id = "snap-789" + snapshot_name = "snapshot-created" + keep_snapshot = False + + image_info = cloud._store_snapshot_info(snapshot_id, snapshot_name, keep_snapshot) + caplog.set_level(logging.DEBUG) + cloud._record_image_deletion(snapshot_id) + + assert image_info not in cloud.created_images + assert image_info not in cloud.preserved_images + assert ( + f"Snapshot {image_info} has been deleted. Will no longer need to be cleaned up later." + in caplog.text + ) + + def test_record_image_deletion_preserved_image(self, cloud, caplog): + """Test recording deletion of a preserved image.""" + snapshot_id = "snap-101" + snapshot_name = "snapshot-preserved" + keep_snapshot = True + + image_info = cloud._store_snapshot_info(snapshot_id, snapshot_name, keep_snapshot) + caplog.set_level(logging.DEBUG) + cloud._record_image_deletion(snapshot_id) + + assert image_info not in cloud.created_images + assert image_info not in cloud.preserved_images + assert ( + f"Snapshot {image_info} has been deleted. This snapshot was taken with keep=True, " + "but since it has been manually deleted, it will not be preserved." + ) in caplog.text + + def test_record_image_deletion_nonexistent_image(self, cloud, caplog): + """Test recording deletion of a non-existent image.""" + snapshot_id = "snap-999" + caplog.set_level(logging.DEBUG) + cloud._record_image_deletion(snapshot_id) + assert f"Deleted image {snapshot_id}" in caplog.text diff --git a/tests/unit_tests/test_types.py b/tests/unit_tests/test_types.py new file mode 100644 index 00000000..e1b6df87 --- /dev/null +++ b/tests/unit_tests/test_types.py @@ -0,0 +1,19 @@ +"""Test types module.""" + +from pycloudlib.types import NetworkingConfig +import pytest +import re + + +def test_networking_config_post_init_raises_exceptions(): + """Test NetworkingConfig post init checks.""" + with pytest.raises( + ValueError, + match="Invalid networking type provided", + ): + NetworkingConfig(networking_type="invalid") + with pytest.raises( + ValueError, + match=re.escape("Invalid private value provided (must be a boolean)"), + ): + NetworkingConfig(private="not_a_boolean")