Skip to content

Commit 4117785

Browse files
committed
utils/pci: Introduce functions for attaching driver and interrupt capability checks
This patch introduces a set of PCI utility functions to simplify device management and capability checks: * add_vendor_id() – retrieves the vendor ID of a PCI device and associates it with the specified driver * attach_driver() – unbinds a device from its current driver and rebinds it to a given driver * check_msix_capability() – verifies whether the device supports and has MSI-X capability enabled * device_supports_irqs() – checks if the device supports at least a specified number of IRQs These helpers provide a cleaner and reusable interface for driver binding and interrupt capability validation. Signed-off-by: Dheeraj Kumar Srivastava <[email protected]>
1 parent 95dc1d5 commit 4117785

File tree

3 files changed

+155
-2
lines changed

3 files changed

+155
-2
lines changed

avocado/utils/pci.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,96 @@ def get_vendor_id(full_pci_address):
376376
return out.split(" ")[2].strip()
377377

378378

379+
def add_vendor_id(full_pci_address, driver):
380+
"""
381+
Retrieve and add the vendor ID of a PCI device to the specified driver.
382+
383+
:param full_pci_address: Full PCI device address, including domain
384+
(e.g., 0000:03:00.0).
385+
:param driver: Driver to associate with the vendor ID of the PCI device.
386+
"""
387+
# Get vendor id of pci device
388+
output = get_vendor_id(full_pci_address)
389+
vid = output.replace(":", " ")
390+
391+
# Add device vendor id to the driver
392+
try:
393+
genio.write_file_or_fail(f"/sys/bus/pci/drivers/{driver}/new_id", f"{vid}\n")
394+
except genio.GenIOError as e:
395+
if "File exists" in str(e):
396+
return
397+
raise ValueError(
398+
f"Failed to add vendor ID '{vid}' to driver '{driver}': {e}"
399+
) from e
400+
401+
402+
def attach_driver(full_pci_address, driver):
403+
"""
404+
Unbind the device from its existing driver and bind it to the given driver.
405+
406+
:param full_pci_address: Full PCI device address (e.g., 0000:03:00.0)
407+
:param driver: Driver to be attached to the specified PCI device
408+
:raises ValueError: If `driver` or `full_pci_address` is None, or if the driver
409+
could not be attached successfully due to an OS error.
410+
:warning: This function may unbind the device from its current driver, which can
411+
temporarily make the device unavailable until it is reattached.
412+
"""
413+
414+
try:
415+
if not driver or not full_pci_address:
416+
raise ValueError("'driver' or 'full_pci_address' inputs are None")
417+
418+
# add vendor id of device to driver
419+
add_vendor_id(full_pci_address, driver)
420+
421+
# unbind the device from its initial driver
422+
cur_driver = get_driver(full_pci_address)
423+
if cur_driver is not None:
424+
unbind(cur_driver, full_pci_address)
425+
426+
# Bind device to driver
427+
bind(driver, full_pci_address)
428+
429+
except OSError as e:
430+
raise ValueError(
431+
f"Not able to attach {driver} to {full_pci_address}. Reason: {e}"
432+
) from e
433+
434+
435+
def check_msix_capability(full_pci_address):
436+
"""
437+
Check whether the PCI device supports Extended Message Signaled Interrupts
438+
and if it is currently enabled.
439+
440+
:param full_pci_address: Full PCI address including domain (0000:03:00.0)
441+
:return: True if supported, False otherwise
442+
"""
443+
out = process.run(f"lspci -vvs {full_pci_address}", ignore_status=True, shell=True)
444+
if out.exit_status:
445+
raise ValueError(f"lspci failed for {full_pci_address}: {out.stderr_text}")
446+
return any("MSI-X:" in line for line in out.stdout_text.splitlines())
447+
448+
449+
def device_supports_irqs(full_pci_address, count):
450+
"""
451+
Check if the device supports at least the specified number of interrupts.
452+
453+
:param full_pci_address: Full PCI device address including domain (e.g., 0000:03:00.0)
454+
:param count: Number of IRQs the device should support
455+
:return: True if supported, False otherwise
456+
"""
457+
out = process.run(
458+
f"lspci -vv -s {full_pci_address}", ignore_status=True, shell=True
459+
)
460+
if out.exit_status:
461+
raise ValueError(f"lspci failed for {full_pci_address}: {out.stderr_text}")
462+
match = re.search(r"MSI-X:.*?Count=(\d+)", out.stdout_text, re.S)
463+
if match:
464+
nirq = int(match.group(1))
465+
return nirq >= count
466+
return False
467+
468+
379469
def reset_check(full_pci_address):
380470
"""
381471
Check if reset for "full_pci_address" is successful

selftests/check.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
"job-api-check-tmp-directory-exists": 1,
2828
"nrunner-interface": 90,
2929
"nrunner-requirement": 28,
30-
"unit": 881,
30+
"unit": 889,
3131
"jobs": 11,
3232
"functional-parallel": 342,
3333
"functional-serial": 7,

selftests/unit/utils/pci.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import unittest.mock
22

3-
from avocado.utils import pci
3+
from avocado.utils import genio, pci
44

55

66
class UtilsPciTest(unittest.TestCase):
@@ -42,6 +42,69 @@ def test_get_slot_from_sysfs_negative(self):
4242
):
4343
self.assertRaises(ValueError, pci.get_slot_from_sysfs, "0002:01:00.1")
4444

45+
@unittest.mock.patch("avocado.utils.pci.get_vendor_id")
46+
@unittest.mock.patch("avocado.utils.pci.genio.write_file_or_fail")
47+
def test_add_vendor_id_success(self, mock_write, mock_get_vid):
48+
mock_get_vid.return_value = "1234:abcd"
49+
pci.add_vendor_id("0000:03:00.0", "driver")
50+
mock_get_vid.assert_called_once_with("0000:03:00.0")
51+
mock_write.assert_called_once_with(
52+
"/sys/bus/pci/drivers/driver/new_id", "1234 abcd\n"
53+
)
54+
55+
@unittest.mock.patch("avocado.utils.pci.get_vendor_id", return_value="1234:abcd")
56+
@unittest.mock.patch(
57+
"avocado.utils.pci.genio.write_file_or_fail",
58+
side_effect=genio.GenIOError("File exists"),
59+
)
60+
def test_add_vendor_id_already_exists(self, mock_write, mock_get_vid):
61+
pci.add_vendor_id("0000:03:00.0", "driver")
62+
mock_get_vid.assert_called_once_with("0000:03:00.0")
63+
mock_write.assert_called_once_with(
64+
"/sys/bus/pci/drivers/driver/new_id", "1234 abcd\n"
65+
)
66+
67+
@unittest.mock.patch("avocado.utils.pci.bind")
68+
@unittest.mock.patch("avocado.utils.pci.unbind")
69+
@unittest.mock.patch("avocado.utils.pci.get_driver")
70+
@unittest.mock.patch("avocado.utils.pci.add_vendor_id")
71+
def test_attach_driver(self, mock_add_vid, mock_get_driver, mock_unbind, mock_bind):
72+
mock_get_driver.return_value = "old_driver"
73+
pci.attach_driver("0000:03:00.0", "new_driver")
74+
mock_add_vid.assert_called_once()
75+
mock_unbind.assert_called_once_with("old_driver", "0000:03:00.0")
76+
mock_bind.assert_called_once_with("new_driver", "0000:03:00.0")
77+
78+
@unittest.mock.patch("avocado.utils.pci.process.run")
79+
def test_check_msix_capability_supported(self, mock_run):
80+
mock_run.return_value.exit_status = 0
81+
mock_run.return_value.stdout_text = "Capabilities: [90] MSI-X: Enable+ Count=16"
82+
self.assertTrue(pci.check_msix_capability("0000:03:00.0"))
83+
84+
@unittest.mock.patch("avocado.utils.pci.process.run")
85+
def test_check_msix_capability_unsupported(self, mock_run):
86+
mock_run.return_value.exit_status = 0
87+
mock_run.return_value.stdout_text = "Capabilities: [90] MSI: Enable+ Count=16"
88+
self.assertFalse(pci.check_msix_capability("0000:03:00.0"))
89+
90+
@unittest.mock.patch("avocado.utils.pci.process.run")
91+
def test_device_supports_irqs_enough(self, mock_run):
92+
mock_run.return_value.exit_status = 0
93+
mock_run.return_value.stdout_text = "MSI-X: Enable+ Count=64"
94+
self.assertTrue(pci.device_supports_irqs("0000:03:00.0", 32))
95+
96+
@unittest.mock.patch("avocado.utils.pci.process.run")
97+
def test_device_supports_irqs_insufficient(self, mock_run):
98+
mock_run.return_value.exit_status = 0
99+
mock_run.return_value.stdout_text = "MSI-X: Enable+ Count=4"
100+
self.assertFalse(pci.device_supports_irqs("0000:03:00.0", 16))
101+
102+
@unittest.mock.patch("avocado.utils.pci.process.run")
103+
def test_device_supports_irqs_no_msix(self, mock_run):
104+
mock_run.return_value.exit_status = 0
105+
mock_run.return_value.stdout_text = "Capabilities: [90] MSI: Enable+ Count=16"
106+
self.assertFalse(pci.device_supports_irqs("0000:03:00.0", 8))
107+
45108

46109
if __name__ == "__main__":
47110
unittest.main()

0 commit comments

Comments
 (0)