Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 324 additions & 0 deletions .github/scripts/install-slices
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
#!/usr/bin/python3

import argparse
import logging
import subprocess
import tempfile

import requests
import yaml


def configure_logging() -> None:
"""
Configure the logging options for this script.
"""
logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.DEBUG)


class CommandArgs:
arch: str
release: str
files: list[str]
ignore_missing: bool
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated


def parse_args() -> CommandArgs:
"""
Get an argument parser tailored to this script.
"""
parser = argparse.ArgumentParser(
description="Verify slice definition files by installing the slices",
)
parser.add_argument(
"--arch",
required=True,
help="Package architecture",
)
parser.add_argument(
"--release",
required=True,
help="Chisel-releases branch name or directory",
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated
)
parser.add_argument(
"--ignore-missing",
required=False,
action="store_true",
help=str(
"Ignore arch-specific package not found in archive errors. "
"If set, also ensure that packages exist for any arch."
),
)
parser.add_argument(
"files",
metavar="file",
help="Chisel slice definition file(s)",
nargs="*",
)
args = parser.parse_args()
cli_args = CommandArgs()
cli_args.arch = args.arch
cli_args.release = args.release
cli_args.files = args.files
cli_args.ignore_missing = args.ignore_missing
return cli_args


class Archive:
"""
Minimal data class replicating ubuntu archive in chisel.yaml
"""

version: str
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated
components: list[str]
suites: list[str]


def parse_archive(release: str) -> Archive:
"""
Parse the "ubuntu" archive from the chisel.yaml file of a release.
The chisel.yaml file has the following structure:
...
archives:
ubuntu:
version: 22.04
components: [main, universe]
suites: [jammy, jammy-security, jammy-updates]
...
...
"""
logging.debug("Parsing ubuntu archive info...")
# (download and) parse chisel.yaml for ubuntu archive info
try:
if release.endswith("/"):
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated
with open(f"{release}chisel.yaml", "r") as stream:
data = yaml.safe_load(stream)
else:
req_url = f"https://raw.githubusercontent.com/canonical/chisel-releases/{release}/chisel.yaml"
response = requests.get(req_url)
if response.status_code < 200 or response.status_code > 299:
logging.error(f"Cannot download chisel.yaml from remote")
exit(1)
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated
data = yaml.safe_load(response.content)
except yaml.YAMLError as e:
logging.error(f"chisel.yaml: {e}")
exit(1)
except Exception as e:
logging.error(e)
exit(1)
# load the yaml data into Archive
archive = Archive()
try:
archive_data = data["archives"]["ubuntu"]
archive.version = archive_data["version"]
archive.components = archive_data["components"]
archive.suites = archive_data["suites"]
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated
except KeyError as e:
logging.error(f"{release}: key {e} not found")
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated
exit(1)
return archive


class Package:
"""
Minimal data class to store package info.
"""

path: str
package: str
slices: list[str]

def full_slice_name(self, slice: str) -> str:
return f"{self.package}_{slice}"


def parse_package(filepath: str) -> Package:
"""
Parse a slice definition file and return the Package.
"""
logging.debug(f"Parsing {filepath}...")
with open(filepath, "r") as stream:
try:
data = yaml.safe_load(stream)
except yaml.YAMLError as e:
logging.error(f"{filepath}: {e}")
exit(1)
p = Package()
p.path = filepath
try:
p.package = data["package"]
p.slices = []
for key in data["slices"]:
p.slices.append(key)
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated
p.slices = sorted(p.slices)
except KeyError as e:
logging.error(f"{filepath}: key {e} not found")
exit(1)
except Exception as e:
logging.error(f"{filepath}: {e}")
exit(1)
return p


def query_package_existence(
packages: list[str],
arch: list[str] = [],
components: list[str] = [],
suites: list[str] = [],
) -> tuple[list[str], list[str]]:
"""
Check which packages exist in the archive. Return a list of packages
that exist and another list for which do not.
"""
# prepare cmd
args = ["rmadison"]
if len(arch) > 0:
args += ["--architecture", ",".join(arch)]
if len(components) > 0:
args += ["--component", ",".join(components)]
if len(suites) > 0:
args += ["--suite", ",".join(suites)]
args.append(" ".join(packages))
# query the archives using rmadison
logging.debug("Querying the archives for packages...")
logging.debug("Executing " + " ".join(args))
res = subprocess.run(args, capture_output=True, text=True)
if res.returncode != 0:
logging.error(f"Failed to query the archives {res.returncode}")
exit(res.returncode)
output = res.stdout.rstrip()
logging.debug(f"Archive query output:\n{output}")
# parse the output for available packages
available = {}
for line in output.split("\n"):
line = line.strip()
if line == "":
continue
pkg = line.split("|")[0].strip()
available[pkg] = True
found = []
missing = []
for p in packages:
if available.get(p):
found.append(p)
else:
missing.append(p)
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated
return found, missing


def ensure_package_existence(packages: list[str], archive: Archive) -> None:
"""
Ensure that packages exist in the archive for any arch.
"""
_, missing = query_package_existence(
packages,
components=archive.components,
suites=archive.suites,
)
if len(missing) > 0:
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated
logging.error(
"The following packages do not exist:\n"
+ "\n".join(f" - {p}" for p in missing)
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated
)
exit(1)


def filter_packages(
packages: list[Package],
arch: str,
release: str,
ignore_missing: bool = False,
) -> tuple[list[Package], list[Package]]:
"""
Filter the Packages using the following criteria:
- if "ignore_missing" is true, ignore package if it does not exist
in the archive for [arch, release].
"""
ignored = []
if ignore_missing:
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated
package_names = [p.package for p in packages]
archive = parse_archive(release)
# ensure that packages exist in the archive.
ensure_package_existence(package_names, archive)
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated
# check package existence for particular arch
found, _ = query_package_existence(
packages=package_names,
arch=[arch],
components=archive.components,
suites=archive.suites,
)
available = {}
for p in found:
available[p] = True
filtered = []
for p in packages:
if available.get(p.package):
filtered.append(p)
else:
ignored.append(p)
packages = filtered
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated
return packages, ignored


def install_slice(slice: str, arch: str, release: str) -> None:
"""
Install the slice by running "chisel cut".
"""
logging.info(f"Installing {slice} on {arch}...")
with tempfile.TemporaryDirectory() as tmpfs:
res = subprocess.run(
args=[
"chisel",
"cut",
"--arch",
arch,
"--release",
release,
"--root",
tmpfs,
slice,
],
capture_output=True,
text=True,
)
if res.returncode != 0:
print("==============================================")
print(res.stderr, end="")
exit(res.returncode)


def main():
configure_logging()
# parse CLI args
cli_args = parse_args()
# parse slice definition files
packages = []
for file in cli_args.files:
pkg = parse_package(file)
packages.append(pkg)
# choose the packages to install slices from
packages, ignored = filter_packages(
packages,
cli_args.arch,
cli_args.release,
cli_args.ignore_missing,
)
if len(ignored) > 0:
logging.info("The following packages will be IGNORED:")
for pkg in ignored:
logging.info(f" - {pkg.package}")
if len(packages) > 0:
logging.info("Slices of the following packages will be INSTALLED:")
for pkg in packages:
logging.info(f" - {pkg.package}")
else:
logging.info("No slices will be installed.")
exit(0)
Comment thread
rebornplusplus marked this conversation as resolved.
Outdated
# install the slices
for pkg in packages:
for slice in pkg.slices:
install_slice(pkg.full_slice_name(slice), cli_args.arch, cli_args.release)


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions .github/scripts/requirements.install-slices.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pyyaml
requests
Loading