Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,48 +1,96 @@
import os

import leapp.libraries.common.config as ipu_config
from leapp.libraries.common.distro import distro_id_to_pretty_name
from leapp.libraries.common.mounting import LoopMount, MountError
from leapp.libraries.stdlib import api, CalledProcessError, run
from leapp.models import CustomTargetRepository, TargetOSInstallationImage


def determine_rhel_version_from_iso_mountpoint(iso_mountpoint):
RELEASE_PKG_NAME_PREFIX = {
'rhel': 'redhat-release',
'centos': 'centos-stream-release',
'almalinux': 'almalinux-release',
}

DISTRO_RELEASE_FILES = {
'rhel': '/etc/redhat-release',
'centos': '/etc/centos-release',
'almalinux': '/etc/almalinux-release',
}

def etc_release_extract_version(etc_release_contents, target_distro):
"""
Extract product release version from /etc/release content

:return: The parse version or None if it couldn't be determined
:rtype: str | None
"""
# 'Red Hat Enterprise Linux Server release 7.9 (Maipo)' -> ['Red Hat...', '7.9 (Maipo)']
# Red Hat Enterprise Linux release 8.10 (Ootpa)
# CentOS Stream release 8
product_release_fragments = etc_release_contents['stdout'].split('release')
if len(product_release_fragments) != 2:
return None # Unlikely. Either way we failed to parse the release

if not product_release_fragments[0].startswith(distro_id_to_pretty_name(target_distro)):
return None

determined_ver = product_release_fragments[1].strip().split(' ', 1)[0] # Remove release name (Maipo)
return determined_ver


def determine_distro_version_from_iso_mountpoint(iso_mountpoint):
baseos_packages = os.path.join(iso_mountpoint, 'BaseOS/Packages')
if os.path.isdir(baseos_packages):
def is_rh_release_pkg(pkg_name):
return pkg_name.startswith('redhat-release') and 'eula' not in pkg_name
target_distro = ipu_config.get_target_distro_id()

def is_release_pkg(pkg_name):
return (
pkg_name.startswith(RELEASE_PKG_NAME_PREFIX[target_distro])
and "eula" not in pkg_name
)

redhat_release_pkgs = [pkg for pkg in os.listdir(baseos_packages) if is_rh_release_pkg(pkg)]
distro_release_pkgs = [pkg for pkg in os.listdir(baseos_packages) if is_release_pkg(pkg)]

if not redhat_release_pkgs:
if not distro_release_pkgs:
return '' # We did not determine anything

if len(redhat_release_pkgs) > 1:
api.current_logger().warning('Multiple packages with name redhat-release* found when '
'determining RHEL version of the supplied installation ISO.')
if len(distro_release_pkgs) > 1:
api.current_logger().warning(
"Multiple packages with name {}* found when determining target version of the supplied"
" installation ISO.".format(RELEASE_PKG_NAME_PREFIX[target_distro])
)

redhat_release_pkg = redhat_release_pkgs[0]
distro_release_pkg = distro_release_pkgs[0]

determined_rhel_ver = ''
try:
rh_release_pkg_path = os.path.join(baseos_packages, redhat_release_pkg)
release_pkg_path = os.path.join(baseos_packages, distro_release_pkg)
# rpm2cpio is provided by rpm; cpio is a dependency of yum (rhel7) and a dependency of dracut which is
# a dependency for leapp (rhel8+)
cpio_archive = run(['rpm2cpio', rh_release_pkg_path])
etc_rh_release_contents = run(['cpio', '--extract', '--to-stdout', './etc/redhat-release'],
stdin=cpio_archive['stdout'])

# 'Red Hat Enterprise Linux Server release 7.9 (Maipo)' -> ['Red Hat...', '7.9 (Maipo']
product_release_fragments = etc_rh_release_contents['stdout'].split('release')
if len(product_release_fragments) != 2:
return '' # Unlikely. Either way we failed to parse the release
cpio_archive = run(['rpm2cpio', release_pkg_path])
etc_release_contents = run(
[
"cpio",
"--extract",
"--to-stdout",
f".{DISTRO_RELEASE_FILES[target_distro]}",
],
stdin=cpio_archive["stdout"],
)

return etc_release_extract_version(etc_release_contents, target_distro) or ''
except CalledProcessError:
# FIXME?: This might fail e.g. if the ISO isn't complete
# (download/scp/...) interrupted. Maybe we should at include
# info that in the report?
# Leaving an exact example from the logs (yes the empty line is there):
# error:
# /var/lib/leapp/iso_scan_mountpoint/BaseOS/Packages/centos-stream-release-9.0-26.el9.noarch.rpm:
# read failed: Input/output error (5)

if not product_release_fragments[0].startswith('Red Hat'):
return ''
# error reading header from package

determined_rhel_ver = product_release_fragments[1].strip().split(' ', 1)[0] # Remove release name (Maipo)
return determined_rhel_ver
except CalledProcessError:
return ''
return ''

Expand All @@ -62,7 +110,7 @@ def inform_ipu_about_request_to_use_target_iso():
was_mounted_successfully=False))
return

# Mount the given ISO, extract the available repositories and determine provided RHEL version
# Mount the given ISO, extract the available repositories and determine provided target version
iso_scan_mountpoint = '/var/lib/leapp/iso_scan_mountpoint'
try:
with LoopMount(source=target_iso_path, target=iso_scan_mountpoint):
Expand All @@ -80,7 +128,7 @@ def inform_ipu_about_request_to_use_target_iso():
api.produce(iso_repo)
iso_repos.append(iso_repo)

rhel_version = determine_rhel_version_from_iso_mountpoint(iso_scan_mountpoint)
rhel_version = determine_distro_version_from_iso_mountpoint(iso_scan_mountpoint)

api.produce(TargetOSInstallationImage(path=target_iso_path,
repositories=iso_repos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def fail_if_called(fail_reason, *args, **kwargs):
assert False, fail_reason


def test_determine_rhel_version_determination_unexpected_iso_structure_or_invalid_mountpoint(monkeypatch):
def test_determine_distro_version_determination_unexpected_iso_structure_or_invalid_mountpoint(monkeypatch):
iso_mountpoint = '/some/mountpoint'

run_mocked = partial(fail_if_called,
Expand All @@ -28,43 +28,87 @@ def isdir_mocked(path):

monkeypatch.setattr(os.path, 'isdir', isdir_mocked)

determined_version = scan_target_os_iso.determine_rhel_version_from_iso_mountpoint(iso_mountpoint)
determined_version = scan_target_os_iso.determine_distro_version_from_iso_mountpoint(iso_mountpoint)
assert not determined_version


def test_determine_rhel_version_valid_iso(monkeypatch):
@pytest.mark.parametrize(
"distro, pkgs, etc_release_fname, etc_release_content, expect",
[
(
"rhel",
[
"redhat-release-8.7-0.3.el8.x86_64.rpm",
"redhat-release-eula-8.7-0.3.el8.x86_64.rpm", # test that this one isn't picked up
],
"./etc/redhat-release",
"Red Hat Enterprise Linux Server release 7.9 (Maipo)",
"7.9",
),
(
"centos",
["centos-stream-release-9.0-34.el9.x86_64.rpm"],
"./etc/centos-release",
"CentOS Stream release 9",
"9"
),
(
"centos",
["centos-stream-release-10.0-19.el10.noarch.rpm"],
"./etc/centos-release",
"CentOS Stream release 10 (Coughlan)",
"10"
),
(
"almalinux",
["almalinux-release-9.7-1.el9.x86_64.rpm"],
"./etc/almalinux-release",
"AlmaLinux release 9.7 (Moss Jungle Cat)",
"9.7"
),
(
"almalinux",
["almalinux-release-10.1-16.el10.x86_64.rpm"],
"./etc/almalinux-release",
"AlmaLinux release 10.1 (Heliotrope Lion)",
"10.1"
),

],
)
def test_determine_distro_version_valid_iso(
monkeypatch, distro, pkgs, etc_release_fname, etc_release_content, expect
):
monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(release_id=distro))
iso_mountpoint = '/some/mountpoint'

def isdir_mocked(path):
return True

def listdir_mocked(path):
assert path == '/some/mountpoint/BaseOS/Packages', 'Only the contents of BaseOS/Packages should be examined.'
return ['xz-5.2.4-4.el8_6.x86_64.rpm',
'libmodman-2.0.1-17.el8.i686.rpm',
'redhat-release-8.7-0.3.el8.x86_64.rpm',
'redhat-release-eula-8.7-0.3.el8.x86_64.rpm']
return ['xz-5.2.4-4.el8_6.x86_64.rpm', 'libmodman-2.0.1-17.el8.i686.rpm'] + pkgs

def run_mocked(cmd, *args, **kwargs):
rpm2cpio_output = 'rpm2cpio_output'
if cmd[0] == 'rpm2cpio':
assert cmd == ['rpm2cpio', '/some/mountpoint/BaseOS/Packages/redhat-release-8.7-0.3.el8.x86_64.rpm']
assert cmd == ['rpm2cpio', f'/some/mountpoint/BaseOS/Packages/{pkgs[0]}']
return {'stdout': rpm2cpio_output}
if cmd[0] == 'cpio':
assert cmd == ['cpio', '--extract', '--to-stdout', './etc/redhat-release']
assert cmd == ['cpio', '--extract', '--to-stdout', etc_release_fname]
assert kwargs['stdin'] == rpm2cpio_output
return {'stdout': 'Red Hat Enterprise Linux Server release 7.9 (Maipo)'}
return {'stdout': etc_release_content}
raise ValueError('Unexpected command has been called.')

monkeypatch.setattr(os.path, 'isdir', isdir_mocked)
monkeypatch.setattr(os, 'listdir', listdir_mocked)
monkeypatch.setattr(scan_target_os_iso, 'run', run_mocked)

determined_version = scan_target_os_iso.determine_rhel_version_from_iso_mountpoint(iso_mountpoint)
assert determined_version == '7.9'
determined_version = scan_target_os_iso.determine_distro_version_from_iso_mountpoint(iso_mountpoint)
assert determined_version == expect


def test_determine_rhel_version_valid_iso_no_rh_release(monkeypatch):
def test_determine_distro_version_valid_iso_no_release_file(monkeypatch):
iso_mountpoint = '/some/mountpoint'

def isdir_mocked(path):
Expand All @@ -81,12 +125,13 @@ def listdir_mocked(path):
monkeypatch.setattr(os.path, 'isdir', isdir_mocked)
monkeypatch.setattr(os, 'listdir', listdir_mocked)
monkeypatch.setattr(scan_target_os_iso, 'run', run_mocked)
monkeypatch.setattr(api, "current_actor", CurrentActorMocked())

determined_version = scan_target_os_iso.determine_rhel_version_from_iso_mountpoint(iso_mountpoint)
determined_version = scan_target_os_iso.determine_distro_version_from_iso_mountpoint(iso_mountpoint)
assert determined_version == ''


def test_determine_rhel_version_rpm_extract_fails(monkeypatch):
def test_determine_distro_version_rpm_extract_fails(monkeypatch):
iso_mountpoint = '/some/mountpoint'

def isdir_mocked(path):
Expand All @@ -102,15 +147,17 @@ def run_mocked(cmd, *args, **kwargs):
monkeypatch.setattr(os.path, 'isdir', isdir_mocked)
monkeypatch.setattr(os, 'listdir', listdir_mocked)
monkeypatch.setattr(scan_target_os_iso, 'run', run_mocked)
monkeypatch.setattr(api, "current_actor", CurrentActorMocked())

determined_version = scan_target_os_iso.determine_rhel_version_from_iso_mountpoint(iso_mountpoint)
determined_version = scan_target_os_iso.determine_distro_version_from_iso_mountpoint(iso_mountpoint)
assert determined_version == ''


@pytest.mark.parametrize('etc_rh_release_contents', ('',
'Red Hat Enterprise Linux Server',
'Fedora release 35 (Thirty Five)'))
def test_determine_rhel_version_unexpected_etc_rh_release_contents(monkeypatch, etc_rh_release_contents):
@pytest.mark.parametrize(
"etc_rh_release_contents",
("", "Red Hat Enterprise Linux Server", "Fedora release 35 (Thirty Five)"),
)
def test_determine_distro_version_unexpected_etc_distro_release_contents(monkeypatch, etc_rh_release_contents):
iso_mountpoint = '/some/mountpoint'

def isdir_mocked(path):
Expand All @@ -130,8 +177,9 @@ def run_mocked(cmd, *args, **kwargs):
monkeypatch.setattr(os.path, 'isdir', isdir_mocked)
monkeypatch.setattr(os, 'listdir', listdir_mocked)
monkeypatch.setattr(scan_target_os_iso, 'run', run_mocked)
monkeypatch.setattr(api, "current_actor", CurrentActorMocked())

determined_version = scan_target_os_iso.determine_rhel_version_from_iso_mountpoint(iso_mountpoint)
determined_version = scan_target_os_iso.determine_distro_version_from_iso_mountpoint(iso_mountpoint)
assert determined_version == ''


Expand Down Expand Up @@ -192,7 +240,11 @@ def mocked_os_listdir(path):
monkeypatch.setattr(scan_target_os_iso, 'LoopMount', always_successful_loop_mount)
monkeypatch.setattr(os.path, 'exists', mocked_os_path_exits)
monkeypatch.setattr(os, 'listdir', mocked_os_listdir)
monkeypatch.setattr(scan_target_os_iso, 'determine_rhel_version_from_iso_mountpoint', lambda iso_mountpoint: '7.9')
monkeypatch.setattr(
scan_target_os_iso,
"determine_distro_version_from_iso_mountpoint",
lambda iso_mountpoint: "7.9",
)

scan_target_os_iso.inform_ipu_about_request_to_use_target_iso()

Expand Down
Loading