Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pylintrc_utils
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ extension-pkg-whitelist=netifaces
# TO BE REMOVED:
# utils_scheduling.py (will be deleted from codebase)
#
ignore=CVS,accessors.py,address.py,aexpect.py,ah_ipv6.py,ah.py,all_ipv6.py,all.py,arch.py,arp.py,asset.py,audio.py,backup_xml.py,base_installer.py,base.py,block.py,block_volume.py,bootstrap.py,boottool.py,bridge.py,build_helper.py,capability_xml.py,cartesian_config.py,cb.py,ceph.py,channel.py,character.py,check_cpu_flag.py,checkpoint_xml.py,cmd_runner.py,compat.py,console.py,controller.py,core.py,cpu.py,curl.py,data_dir.py,dd.py,defaults.py,dir_pool.py,dir_volume.py,disk.py,domcapability_xml.py,drive.py,duplicate_pages.py,emulator.py,env_process.py,error_context.py,error_event.py,esp_ipv6.py,esp.py,filesystem.py,file_volume.py,filterref.py,funcatexit.py,gcov.py,gluster.py,graphical_console.py,graphics.py,guest_agent.py,hostdev.py,http_server.py,hub.py,icmp.py,icmpv6.py,idmap_xml.py,igmp.py,__init__.py,input.py,installer.py,interface.py,interrupted_thread.py,iommu.py,ip.py,ip_sniffing.py,ipv6.py,iscsi.py,kernelinstall.py,kernel_interface.py,kernel.py,key_event_form.py,ksm_overcommit_guest.py,lease.py,libguestfs.py,librarian.py,libvirt_bios.py,libvirt_ceph_utils.py,libvirt_cgroup.py,libvirt_config.py,libvirt_cpu.py,libvirtd_decorator.py,libvirt_device_utils.py,libvirt_disk.py,libvirt_domjobinfo.py,libvirt_embedded_qemu.py,libvirt_filesystem.py,libvirt_installer.py,libvirt_keywrap.py,libvirt_memory.py,libvirt_misc.py,libvirt_monitor.py,libvirt_nested.py,libvirt_network.py,libvirt_numa.py,libvirt_nwfilter.py,libvirt_pcicontr.py,libvirt.py,libvirt_remote.py,libvirt_secret.py,libvirt_service.py,libvirt_setup.py,libvirt_storage.py,libvirt_unprivileged.py,libvirt_usb.py,libvirt_version.py,libvirt_vfio.py,libvirt_virtio.py,libvirt_vm.py,libvirt_vmxml.py,logger.py,logging_manager.py,lvm.py,lvsb_base.py,lvsb.py,lvsbs.py,lv_utils.py,mac.py,macvtap.py,memballoon.py,memory.py,messenger.py,migration.py,migration_template.py,mock.py,multicast_guest.py,nbd.py,netperf_agent.py,net_volume.py,networking.py,network_xml.py,nfs_pool.py,nfs.py,nfs_volume.py,nodedev_xml.py,node_properties.py,node.py,nvme.py,nvram.py,nwfilter_binding.py,nwfilter_xml.py,openvswitch.py,os_posix.py,ovirt.py,ovs.py,ovs_utils.py,panic.py,parallel.py,pci_utils.py,pmsocat36.py,png_utils.py,pool.py,pool_selector.py,pool_xml.py,port_resource.py,postprocess_iozone.py,ppc.py,ppm_utils.py,propcan.py,proxy.py,pstore.py,qcontainer.py,qdevice_format.py,qdevices.py,qemu_capabilities.py,qemu_installer.py,qemu_io.py,qemu_migration.py,qemu_monitor.py,qemu_qtree.py,qemu_storage.py,qemu_utils.py,qemu_virtio_port.py,qemu_vm.py,rarp.py,redirdev.py,redirfilter.py,remote_build.py,remote_interface.py,remote_master.py,remote.py,remote_runner.py,requirement_checks.py,resource_manager.py,resource.py,RFBDes.py,rng_egd.py,rng.py,scan_autotest_results.py,scheduler.py,sctp_ipv6.py,sctp.py,seclabel.py,secret_xml.py,selector.py,serial_host_send_receive.py,serial.py,service.py,set_win_promisc.py,sev.py,shmem.py,smartcard.py,snapshot_xml.py,sound.py,ssh_key.py,standalone_test.py,step_editor.py,storage.py,storage_ssh.py,stp.py,syslog_server.py,system.py,sys_time.py,tap_network.py,tap_port.py,tap.py,tcp_ipv6.py,tcp.py,telnet.py,test_controller_xml.py,test_interface_xml.py,test_memory_xml.py,test_network_xml.py,test_utils_kernel_module.py,test_utils_misc.py,test_utils_test__init__.py,test_utils_zchannels.py,test_utils_zcrypt.py,test_vm_xml.py,tool.py,tpm.py,udp_ipv6.py,udplite_ipv6.py,udplite.py,udp.py,unattended_install.py,utils_backup.py,utils_cgroup.py,utils_config.py,utils_conn.py,utils_cpi.py,utils_disk.py,utils_env.py,utils_gdb.py,utils_hotplug.py,utils_ids.py,utils_iptables.py,utils_kernel_module.py,utils_koji.py,utils_libguestfs.py,utils_libvirtd.py,utils_linux_modules.py,utils_logfile.py,utils_memory.py,utils_misc.py,utils_nbd.py,utils_netperf.py,utils_net.py,utils_npiv.py,utils_numeric.py,utils_package.py,utils_params.py,utils.py,utils_pyvmomi.py,utils_qemu.py,utils_sasl.py,utils_scheduling.py,utils_selinux.py,utils_spice.py,utils_split_daemons.py,utils_sriov.py,utils_stress.py,utils_switchdev.py,utils_sys.py,utils_time.py,utils_v2v.py,utils_vdpa.py,utils_version.py,utils_virtio_port.py,utils_vsock.py,utils_zchannels.py,utils_zcrypt.py,vdpa_blk.py,verify.py,versionable_class.py,version.py,vhost_user_blk.py,video_maker.py,video.py,virsh.py,virt_admin.py,VirtIoChannel_guest_send_receive.py,virtio_console_guest.py,virtio_win.py,virt_vm.py,vlan.py,vms.py,vm_xml.py,volume.py,vol_xml.py,vsock.py,vt_console.py,vt_iothread.py,wait_for_quit.py,watchdog.py,windows_support.py,wmic.py,_wrappers.py,xcepts.py,xml_utils.py,yumrepo.py
ignore=CVS,accessors.py,address.py,aexpect.py,ah_ipv6.py,ah.py,all_ipv6.py,all.py,arch.py,arp.py,asset.py,audio.py,backup_xml.py,base_installer.py,base.py,block.py,block_volume.py,bootstrap.py,boottool.py,bridge.py,build_helper.py,capability_xml.py,cartesian_config.py,cb.py,ceph.py,channel.py,character.py,check_cpu_flag.py,checkpoint_xml.py,cmd_runner.py,compat.py,console.py,controller.py,core.py,cpu.py,curl.py,data_dir.py,dd.py,defaults.py,dir_pool.py,dir_volume.py,disk.py,domcapability_xml.py,drive.py,duplicate_pages.py,emulator.py,env_process.py,error_context.py,error_event.py,esp_ipv6.py,esp.py,filesystem.py,file_volume.py,filterref.py,funcatexit.py,gcov.py,gluster.py,graphical_console.py,graphics.py,guest_agent.py,hostdev.py,http_server.py,hub.py,icmp.py,icmpv6.py,idmap_xml.py,igmp.py,__init__.py,input.py,installer.py,interface.py,interrupted_thread.py,iommu.py,ip.py,ip_sniffing.py,ipv6.py,iscsi.py,kernelinstall.py,kernel_interface.py,kernel.py,key_event_form.py,ksm_overcommit_guest.py,lease.py,libguestfs.py,librarian.py,libvirt_bios.py,libvirt_ceph_utils.py,libvirt_cgroup.py,libvirt_config.py,libvirt_cpu.py,libvirtd_decorator.py,libvirt_device_utils.py,libvirt_disk.py,libvirt_domjobinfo.py,libvirt_embedded_qemu.py,libvirt_filesystem.py,libvirt_installer.py,libvirt_keywrap.py,libvirt_memory.py,libvirt_misc.py,libvirt_monitor.py,libvirt_nested.py,libvirt_network.py,libvirt_numa.py,libvirt_nwfilter.py,libvirt_pcicontr.py,libvirt.py,libvirt_remote.py,libvirt_secret.py,libvirt_service.py,libvirt_setup.py,libvirt_storage.py,libvirt_unprivileged.py,libvirt_usb.py,libvirt_version.py,libvirt_vfio.py,libvirt_virtio.py,libvirt_vm.py,libvirt_vmxml.py,logger.py,logging_manager.py,lvm.py,lvsb_base.py,lvsb.py,lvsbs.py,lv_utils.py,mac.py,macvtap.py,memballoon.py,memory.py,messenger.py,migration.py,migration_template.py,mock.py,multicast_guest.py,nbd.py,netperf_agent.py,net_volume.py,networking.py,network_xml.py,nfs_pool.py,nfs.py,nfs_volume.py,nodedev_xml.py,node_properties.py,node.py,nvme.py,nvram.py,nwfilter_binding.py,nwfilter_xml.py,openvswitch.py,os_posix.py,ovirt.py,ovs.py,ovs_utils.py,panic.py,parallel.py,pci_utils.py,pmsocat36.py,png_utils.py,pool.py,pool_selector.py,pool_xml.py,port_resource.py,postprocess_iozone.py,ppc.py,ppm_utils.py,propcan.py,proxy.py,pstore.py,qcontainer.py,qdevice_format.py,qdevices.py,qemu_capabilities.py,qemu_installer.py,qemu_io.py,qemu_migration.py,qemu_monitor.py,qemu_qtree.py,qemu_storage.py,qemu_virtio_port.py,qemu_vm.py,rarp.py,redirdev.py,redirfilter.py,remote_build.py,remote_interface.py,remote_master.py,remote.py,remote_runner.py,requirement_checks.py,resource_manager.py,resource.py,RFBDes.py,rng_egd.py,rng.py,scan_autotest_results.py,scheduler.py,sctp_ipv6.py,sctp.py,seclabel.py,secret_xml.py,selector.py,serial_host_send_receive.py,serial.py,service.py,set_win_promisc.py,sev.py,shmem.py,smartcard.py,snapshot_xml.py,sound.py,ssh_key.py,standalone_test.py,step_editor.py,storage.py,storage_ssh.py,stp.py,syslog_server.py,system.py,sys_time.py,tap_network.py,tap_port.py,tap.py,tcp_ipv6.py,tcp.py,telnet.py,test_controller_xml.py,test_interface_xml.py,test_memory_xml.py,test_network_xml.py,test_utils_kernel_module.py,test_utils_misc.py,test_utils_test__init__.py,test_utils_zchannels.py,test_utils_zcrypt.py,test_vm_xml.py,tool.py,tpm.py,udp_ipv6.py,udplite_ipv6.py,udplite.py,udp.py,unattended_install.py,utils_backup.py,utils_cgroup.py,utils_config.py,utils_conn.py,utils_cpi.py,utils_disk.py,utils_env.py,utils_gdb.py,utils_hotplug.py,utils_ids.py,utils_iptables.py,utils_kernel_module.py,utils_koji.py,utils_libguestfs.py,utils_libvirtd.py,utils_linux_modules.py,utils_logfile.py,utils_memory.py,utils_misc.py,utils_nbd.py,utils_netperf.py,utils_net.py,utils_npiv.py,utils_numeric.py,utils_package.py,utils_params.py,utils.py,utils_pyvmomi.py,utils_qemu.py,utils_sasl.py,utils_scheduling.py,utils_selinux.py,utils_spice.py,utils_split_daemons.py,utils_sriov.py,utils_stress.py,utils_switchdev.py,utils_sys.py,utils_time.py,utils_v2v.py,utils_vdpa.py,utils_version.py,utils_virtio_port.py,utils_vsock.py,utils_zchannels.py,utils_zcrypt.py,vdpa_blk.py,verify.py,versionable_class.py,version.py,vhost_user_blk.py,video_maker.py,video.py,virsh.py,virt_admin.py,VirtIoChannel_guest_send_receive.py,virtio_console_guest.py,virtio_win.py,virt_vm.py,vlan.py,vms.py,vm_xml.py,volume.py,vol_xml.py,vsock.py,vt_console.py,vt_iothread.py,wait_for_quit.py,watchdog.py,windows_support.py,wmic.py,_wrappers.py,xcepts.py,xml_utils.py,yumrepo.py

# regex matches against base names, not paths.
ignore-patterns=.git
Expand Down
Empty file.
150 changes: 150 additions & 0 deletions selftests/functional/test_vt_utils/test_qemu_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import os
import subprocess
import unittest

from virttest.vt_utils import qemu_utils as qemu


class QemuFunctionalTest(unittest.TestCase):
"""Functional tests for autils.hypervisor.qemu module"""

def setUp(self):
super().setUp()
self.available_qemu_paths = self._get_available_qemu_paths()

def _get_available_qemu_paths(self):
"""Get the available qemu paths on the system"""
qemu_paths = [
"/usr/bin/qemu-kvm",
"/usr/bin/qemu-system-x86_64",
"/usr/libexec/qemu-kvm",
"/usr/bin/qemu",
]

available_qemu_paths = []

for path in qemu_paths:
if os.path.exists(path):
try:
result = subprocess.run(
[path, "-help"], capture_output=True, timeout=10, check=False
)
if not result.returncode:
available_qemu_paths.append(path)
except (subprocess.TimeoutExpired, FileNotFoundError):
continue
return available_qemu_paths

def test_has_option_with_real_qemu(self):
"""Test has_option function with real QEMU binary"""
if not self.available_qemu_paths:
self.skipTest("QEMU not available on system")

default_qemu_path = "/usr/bin/qemu-kvm"
if default_qemu_path not in self.available_qemu_paths:
self.skipTest(
f"The default qemu path: {default_qemu_path} not available on system"
)

# Test common options that should exist in most QEMU versions
self.assertTrue(qemu.has_option("h"))
self.assertTrue(qemu.has_option("version"))

# Test option that should not exist
self.assertFalse(qemu.has_option("nonexistent-option-12345"))

def test_has_option_with_custom_path(self):
"""Test has_option with custom QEMU path"""
# Find an available QEMU binary
qemu_paths = [
"/usr/bin/qemu-system-x86_64",
"/usr/libexec/qemu-kvm",
"/usr/bin/qemu",
]

available_path = None
for path in qemu_paths:
if os.path.exists(path):
available_path = path
break

if not available_path:
self.skipTest(
f"The custom qemu paths: {qemu_paths} not available on system"
)

if available_path:
result = qemu.has_option("h", available_path)
self.assertTrue(result)

def test_get_support_machine_type_with_real_qemu(self):
"""Test get_support_machine_type function with real QEMU binary"""
if not self.available_qemu_paths:
self.skipTest("QEMU not available on system")

names, types, aliases = qemu.get_support_machine_type(
self.available_qemu_paths[0]
)

# Verify return types
self.assertIsInstance(names, list)
self.assertIsInstance(types, list)
self.assertIsInstance(aliases, list)

# All lists should have the same length
self.assertEqual(len(names), len(types))
self.assertEqual(len(names), len(aliases))

# Should have at least one machine type
self.assertGreater(len(names), 0)

# Each name should be a string
for name in names:
self.assertIsInstance(name, str)
self.assertGreater(len(name), 0)

# Each type should be a string
for machine_type in types:
self.assertIsInstance(machine_type, str)

# Aliases can be None or strings
for alias in aliases:
self.assertTrue(alias is None or isinstance(alias, str))

def test_get_support_machine_type_remove_alias(self):
"""Test get_support_machine_type with remove_alias=True"""
if not self.available_qemu_paths:
self.skipTest("QEMU not available on system")

_, _, aliases = qemu.get_support_machine_type(
self.available_qemu_paths[0], remove_alias=True
)

# With remove_alias=True, all aliases should be None
for alias in aliases:
self.assertIsNone(alias)

def test_get_support_machine_type_no_none_machines(self):
"""Test that 'none' machines are filtered out"""
if not self.available_qemu_paths:
self.skipTest("QEMU not available on system")

names, _, _ = qemu.get_support_machine_type(self.available_qemu_paths[0])

# Should not contain 'none' in machine names
self.assertNotIn("none", names)

def test_has_option_invalid_qemu_path(self):
"""Test has_option with invalid QEMU path"""
result = qemu.has_option("help", "/nonexistent/qemu/path")
# Should handle the error gracefully and return False
self.assertFalse(result)

def test_get_support_machine_type_invalid_path(self):
"""Test get_support_machine_type with invalid QEMU path"""
with self.assertRaises((subprocess.CalledProcessError, FileNotFoundError)):
qemu.get_support_machine_type("/nonexistent/qemu/path")


if __name__ == "__main__":
unittest.main()
Empty file.
Loading
Loading