Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions lib/cpu_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from dataclasses import dataclass

from lib.host import Host

from typing import Literal

NO_SUBLEAF = 0xffffffff

@dataclass(frozen=True)
class CpuidRegisters:
eax: int
ebx: int
ecx: int
edx: int

class CpuPolicy:
"""A specific CPU policy (PV, HVM, ...)"""
# (leaf, subleaf) -> (eax, ebx, ecx, edx)
cpuid: dict[tuple[int, int], CpuidRegisters]
# msr -> val
msr: dict[int, int]

def __init__(self, cpuid: dict[tuple[int, int], CpuidRegisters], msr: dict[int, int]):
self.cpuid = cpuid
self.msr = msr

class HostCpuPolicy:
"""All CPU policies of a host (CPUID, specific MSRs)"""
policies: dict[str, CpuPolicy] = {}

def __init__(self, host: Host):
text = host.ssh("xen-cpuid --policy")
current_policy: CpuPolicy | None = None
mode: Literal["cpuid", "msr"] | None = None

lines = text.splitlines()

for line in lines:
line = line.rstrip()

# ---- Policy header ----
if "policy:" in line:
# Example: "Raw policy: 32 leaves, 2 MSRs"
name, rest = line.split("policy:", 1)
name = name.strip()

self.policies[name] = CpuPolicy(dict(), dict())
current_policy = self.policies[name]
mode = None
continue

if current_policy is None:
continue

# ---- Section switches ----
if line.strip() == "CPUID:":
mode = "cpuid"
continue

if line.strip() == "MSRs:":
mode = "msr"
continue

# Skip table headers
if "leaf" in line or "index" in line or not line.strip():
continue

# ---- CPUID parsing ----
if mode == "cpuid":
# Example:
# 00000004:00000003 -> 1c03c163:02c0003f:00001fff:00000006
left, right = line.split("->")
leaf_hex, subleaf_hex = left.strip().split(":")
eax, ebx, ecx, edx = right.strip().split(":")

key = (int(leaf_hex, 16), int(subleaf_hex, 16))
current_policy.cpuid[key] = CpuidRegisters(
int(eax, 16),
int(ebx, 16),
int(ecx, 16),
int(edx, 16)
)
continue

# ---- MSR parsing ----
if mode == "msr":
# Example:
# 0000010a -> 400000000c000000
idx_hex, val_hex = line.split("->")
idx = int(idx_hex.strip(), 16)
val = int(val_hex.strip(), 16)
current_policy.msr[idx] = val
continue
29 changes: 29 additions & 0 deletions tests/xen/test_cpu_policy_collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import pytest

import logging

from lib.cpu_policy import NO_SUBLEAF, HostCpuPolicy
from lib.host import Host

Comment thread
TSnake41 marked this conversation as resolved.
# Xen CPU Policy gathering test
#
# # Requirements:
# - XCP-ng host

class TestCpuPolicy:
def test_cpu_policy_collection(self, host: Host) -> None:
"""
This test simply collects the CPU policy (CPUIDs and MSRs) from the host.
We only check that we collected the information successfully, without errors.
No extra check is made on the accuracy or completeness of the information.
A side effect is the logging of the collected information in the test output.
"""
cpu_policy = HostCpuPolicy(host)

for name, policy in cpu_policy.policies.items():
for ((leaf, subleaf), regs) in policy.cpuid.items():
eax, ebx, ecx, edx = regs.eax, regs.ebx, regs.ecx, regs.edx
logging.info(f"CPUID[{name}]: {leaf:08x}:{subleaf:08x} -> {eax:08x}:{ebx:08x}:{ecx:08x}:{edx:08x}")

for (msr, val) in policy.msr.items():
logging.info(f"MSR [{name}]: {msr:08x} -> {val:016x}")
Loading