Skip to content

Commit 4810f01

Browse files
committed
feat(RELEASE-2509): add python scripts for publish-to-nrrc task
add python scripts for publish-to-nrrc task Signed-off-by: Elena German <elgerman@redhat.com> Assisted-by: Claude
1 parent 0ecbc12 commit 4810f01

8 files changed

Lines changed: 759 additions & 0 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ dependencies = [
2525
"diffused-lib==0.3.0",
2626
"confluent-kafka",
2727
"python-gitlab>=4.0",
28+
"python-dotenv>=1.0.0",
2829
]
2930

3031
[tool.black]
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Parse charon parameter files used by NRRC/MRRC publish tasks."""
2+
3+
from __future__ import annotations
4+
5+
import re
6+
import shutil
7+
from pathlib import Path
8+
9+
from dotenv import dotenv_values
10+
11+
import file
12+
13+
_REGISTRY_SPLIT = re.compile(r"%")
14+
15+
16+
def load_charon_env(path: Path) -> dict[str, str]:
17+
"""Load charon parameters from a dotenv file (``KEY=value`` lines)."""
18+
if not path.is_file():
19+
raise FileNotFoundError(f"charon env file not found: {path}")
20+
values = dotenv_values(path, encoding="utf-8")
21+
return {key: value for key, value in values.items() if value is not None}
22+
23+
24+
def split_oci_registries(value: str) -> list[str]:
25+
"""Split ``CHARON_OCI_REGISTRY`` on ``%`` into non-empty registry references."""
26+
return [part.strip() for part in _REGISTRY_SPLIT.split(value) if part.strip()]
27+
28+
29+
def short_sha256_prefix(registry: str) -> str:
30+
"""Return the first six characters of the digest in *registry*."""
31+
marker = "@sha256:"
32+
if marker not in registry:
33+
raise ValueError(f"registry reference missing @sha256: digest: {registry!r}")
34+
return registry.split(marker, 1)[1][:6]
35+
36+
37+
def source_repo(registry: str) -> str:
38+
"""Return the repository part of an OCI reference (before ``@sha256:``)."""
39+
return registry.split("@sha256:", 1)[0]
40+
41+
42+
def require_env_keys(env: dict[str, str], *keys: str) -> None:
43+
"""Raise ValueError when any *keys* are missing from *env*."""
44+
for key in keys:
45+
if key not in env:
46+
raise ValueError(f"missing required charon env variable: {key}")
47+
48+
49+
def require_oci_registries(env: dict[str, str]) -> list[str]:
50+
"""Return non-empty ``CHARON_OCI_REGISTRY`` entries from *env*."""
51+
try:
52+
value = env["CHARON_OCI_REGISTRY"]
53+
except KeyError as e:
54+
raise ValueError("CHARON_OCI_REGISTRY is required in charon env file") from e
55+
registries = split_oci_registries(value)
56+
if not registries:
57+
raise ValueError("CHARON_OCI_REGISTRY must list at least one registry reference")
58+
return registries
59+
60+
61+
def charon_config_path(*, home: Path | None = None) -> Path:
62+
"""Return the default charon configuration file path under *home* or ``Path.home()``."""
63+
root = home if home is not None else Path.home()
64+
return root / ".charon" / "charon.yaml"
65+
66+
67+
def install_charon_config(config_source: Path, *, home: Path | None = None) -> Path:
68+
"""Copy the charon config into ``$HOME/.charon/charon.yaml``."""
69+
if not config_source.is_file():
70+
raise FileNotFoundError(f"charon config file not found: {config_source}")
71+
dest = charon_config_path(home=home)
72+
dest.parent.mkdir(parents=True, exist_ok=True)
73+
shutil.copy2(config_source, dest)
74+
return dest
75+
76+
77+
NRRC_WORK_DIR_DEFAULT = Path("/var/workdir/nrrc")
78+
79+
80+
def nrrc_work_dir() -> Path:
81+
"""Return the NRRC staging directory from ``WORK_DIR`` or ``/var/workdir/nrrc``."""
82+
return file.path_from_env_variable("WORK_DIR", NRRC_WORK_DIR_DEFAULT)

scripts/python/helpers/file.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
import gzip
88
import io
99
import os
10+
import re
11+
import subprocess
1012
import tempfile
13+
from collections.abc import Callable, Sequence
1114
from pathlib import Path
1215
from typing import Any
1316

@@ -23,6 +26,7 @@ def load_json_dict(path: Path) -> dict[str, Any]:
2326

2427

2528
_GZIP_READ_CHUNK_SIZE = 64 * 1024
29+
_ARCHIVE_TYPE = re.compile(r"(gzip compressed data|POSIX tar archive)")
2630

2731

2832
def sha256(path: Path) -> str:
@@ -57,6 +61,25 @@ def path_from_env_variable(
5761
return default if isinstance(default, Path) else Path(default)
5862

5963

64+
def resolve_path_under_base(base: Path, relative: str | Path) -> Path:
65+
"""Resolve *relative* under *base* and ensure the result stays inside *base*.
66+
67+
Rejects absolute paths and ``..`` traversal after resolution. Typical use:
68+
Tekton passes a path relative to a data directory (e.g. charon env/config files).
69+
"""
70+
text = str(relative).strip()
71+
if not text:
72+
raise ValueError(f"path must be relative to {base}: {relative!r}")
73+
rel = Path(text)
74+
if rel.is_absolute():
75+
raise ValueError(f"path must be relative to {base}: {relative!r}")
76+
root = base.resolve()
77+
candidate = (root / rel).resolve()
78+
if not candidate.is_relative_to(root):
79+
raise ValueError(f"path must stay under {base}: {relative!r}")
80+
return candidate
81+
82+
6083
def make_tempfile_path(
6184
prefix: str,
6285
data: bytes | None = None,
@@ -97,3 +120,13 @@ def decompress_gzip_bounded(data: bytes, *, max_bytes: int) -> bytes:
97120
msg = f"decompressed data exceeds {max_bytes} bytes (possible gzip bomb)"
98121
raise ValueError(msg)
99122
return bytes(output)
123+
124+
125+
def is_gzip_or_tar_archive(
126+
path: Path,
127+
*,
128+
file_cmd: Callable[[Sequence[str | Path]], subprocess.CompletedProcess[str]],
129+
) -> bool:
130+
"""Return True when ``file -b`` reports gzip or tar content for *path*."""
131+
result = file_cmd(["file", "-b", str(path)])
132+
return _ARCHIVE_TYPE.search(result.stdout) is not None
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
"""Tests for ``charon_env``."""
2+
3+
from __future__ import annotations
4+
5+
from pathlib import Path
6+
7+
import charon_env
8+
import pytest
9+
10+
11+
def test_load_charon_env_parses_dotenv_lines(tmp_path: Path) -> None:
12+
"""Dotenv lines are parsed into a key/value mapping."""
13+
env_file = tmp_path / "charon.env"
14+
env_file.write_text(
15+
"\nCHARON_OCI_REGISTRY=repo@sha256:abcdef0123456789\n"
16+
"CHARON_TARGET=dev-npm-ga\n"
17+
'CHARON_PRODUCT_NAME="Test Product"\n',
18+
encoding="utf-8",
19+
)
20+
env = charon_env.load_charon_env(env_file)
21+
assert env["CHARON_OCI_REGISTRY"] == "repo@sha256:abcdef0123456789"
22+
assert env["CHARON_TARGET"] == "dev-npm-ga"
23+
assert env["CHARON_PRODUCT_NAME"] == "Test Product"
24+
25+
26+
def test_load_charon_env_missing_file(tmp_path: Path) -> None:
27+
"""Missing env files raise FileNotFoundError."""
28+
with pytest.raises(FileNotFoundError):
29+
charon_env.load_charon_env(tmp_path / "missing.env")
30+
31+
32+
def test_load_charon_env_rejects_invalid_utf8(tmp_path: Path) -> None:
33+
"""Invalid UTF-8 bytes fail instead of being silently replaced."""
34+
env_file = tmp_path / "charon.env"
35+
env_file.write_bytes(b"CHARON_TARGET=dev\xff\n")
36+
with pytest.raises(UnicodeDecodeError):
37+
charon_env.load_charon_env(env_file)
38+
39+
40+
def test_split_oci_registries() -> None:
41+
"""Registry references are split on percent signs."""
42+
value = "quay.io/a@sha256:111111%quay.io/b@sha256:222222"
43+
assert charon_env.split_oci_registries(value) == [
44+
"quay.io/a@sha256:111111",
45+
"quay.io/b@sha256:222222",
46+
]
47+
48+
49+
def test_short_sha256_prefix() -> None:
50+
"""Short hash uses the first six digest characters."""
51+
assert charon_env.short_sha256_prefix("repo@sha256:0b15aad24f1b847") == "0b15aa"
52+
53+
54+
def test_source_repo() -> None:
55+
"""Source repo strips the digest suffix."""
56+
assert charon_env.source_repo("quay.io/org/app@sha256:abc") == "quay.io/org/app"
57+
58+
59+
def test_load_charon_env_skips_malformed_lines(tmp_path: Path) -> None:
60+
"""Lines without ``=`` are ignored."""
61+
env_file = tmp_path / "charon.env"
62+
env_file.write_text("CHARON_TARGET=dev\nnot-a-variable\n", encoding="utf-8")
63+
env = charon_env.load_charon_env(env_file)
64+
assert env == {"CHARON_TARGET": "dev"}
65+
66+
67+
def test_short_sha256_prefix_requires_digest() -> None:
68+
"""Registry references without a digest raise ValueError."""
69+
with pytest.raises(ValueError, match="@sha256:"):
70+
charon_env.short_sha256_prefix("quay.io/org/app:latest")
71+
72+
73+
def test_split_oci_registries_ignores_empty_segments() -> None:
74+
"""Trailing or duplicate ``%`` separators yield no empty entries."""
75+
assert charon_env.split_oci_registries("quay.io/a@sha256:111111%") == [
76+
"quay.io/a@sha256:111111",
77+
]
78+
assert charon_env.split_oci_registries("%quay.io/a@sha256:111111%") == [
79+
"quay.io/a@sha256:111111",
80+
]
81+
82+
83+
def test_load_charon_env_skips_comments_and_blank_lines(tmp_path: Path) -> None:
84+
"""Comments and blank lines are ignored."""
85+
env_file = tmp_path / "charon.env"
86+
env_file.write_text(
87+
"# comment\n\nCHARON_TARGET=dev\n# CHARON_FOO=bar\n",
88+
encoding="utf-8",
89+
)
90+
assert charon_env.load_charon_env(env_file) == {"CHARON_TARGET": "dev"}
91+
92+
93+
def test_load_charon_env_strips_single_quotes(tmp_path: Path) -> None:
94+
"""Single-quoted values are unquoted."""
95+
env_file = tmp_path / "charon.env"
96+
env_file.write_text("CHARON_PRODUCT_NAME='Test Product'\n", encoding="utf-8")
97+
assert charon_env.load_charon_env(env_file)["CHARON_PRODUCT_NAME"] == "Test Product"
98+
99+
100+
def test_require_env_keys_raises_for_missing_key() -> None:
101+
"""Missing required keys raise ValueError."""
102+
with pytest.raises(ValueError, match="missing required charon env variable"):
103+
charon_env.require_env_keys({"CHARON_TARGET": "dev"}, "CHARON_PRODUCT_NAME")
104+
105+
106+
def test_require_oci_registries(tmp_path: Path) -> None:
107+
"""Non-empty registry lists are returned from parsed env data."""
108+
env_file = tmp_path / "charon.env"
109+
env_file.write_text(
110+
"CHARON_OCI_REGISTRY=quay.io/a@sha256:111111%quay.io/b@sha256:222222\n",
111+
encoding="utf-8",
112+
)
113+
env = charon_env.load_charon_env(env_file)
114+
assert charon_env.require_oci_registries(env) == [
115+
"quay.io/a@sha256:111111",
116+
"quay.io/b@sha256:222222",
117+
]
118+
119+
120+
def test_require_oci_registries_missing_key() -> None:
121+
"""Missing ``CHARON_OCI_REGISTRY`` raises ValueError."""
122+
with pytest.raises(ValueError, match="CHARON_OCI_REGISTRY is required"):
123+
charon_env.require_oci_registries({})
124+
125+
126+
def test_require_oci_registries_empty_value() -> None:
127+
"""Blank ``CHARON_OCI_REGISTRY`` values raise ValueError."""
128+
with pytest.raises(ValueError, match="at least one registry"):
129+
charon_env.require_oci_registries({"CHARON_OCI_REGISTRY": ""})
130+
131+
132+
def test_nrrc_work_dir_default() -> None:
133+
"""Default NRRC work directory uses the image writable root."""
134+
assert charon_env.nrrc_work_dir() == Path("/var/workdir/nrrc")
135+
136+
137+
def test_nrrc_work_dir_from_env(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
138+
"""``WORK_DIR`` overrides the default NRRC staging directory."""
139+
custom = tmp_path / "staging"
140+
monkeypatch.setenv("WORK_DIR", str(custom))
141+
assert charon_env.nrrc_work_dir() == custom
142+
143+
144+
def test_charon_config_path_uses_explicit_home(tmp_path: Path) -> None:
145+
"""An explicit *home* overrides ``Path.home()``."""
146+
assert charon_env.charon_config_path(home=tmp_path) == tmp_path / ".charon" / "charon.yaml"
147+
148+
149+
def test_install_charon_config(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
150+
"""Charon config is copied into ``$HOME/.charon/charon.yaml``."""
151+
monkeypatch.setenv("HOME", str(tmp_path))
152+
config_path = charon_env.charon_config_path()
153+
source = tmp_path / "input.yaml"
154+
source.write_text("charon-config\n", encoding="utf-8")
155+
dest = charon_env.install_charon_config(source)
156+
assert dest == config_path
157+
assert config_path.read_text(encoding="utf-8") == "charon-config\n"
158+
159+
160+
def test_install_charon_config_missing_source(tmp_path: Path) -> None:
161+
"""Missing config sources raise FileNotFoundError."""
162+
with pytest.raises(FileNotFoundError, match="charon config file not found"):
163+
charon_env.install_charon_config(tmp_path / "missing.yaml")

scripts/python/helpers/test_file.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import json
66
import gzip
7+
import subprocess
8+
from collections.abc import Sequence
79
from pathlib import Path
810

911
import file
@@ -63,6 +65,31 @@ def test_load_json_dict_rejects_non_object(tmp_path: Path) -> None:
6365
file.load_json_dict(path)
6466

6567

68+
def test_resolve_path_under_base_relative_file(tmp_path: Path) -> None:
69+
"""A normal relative path resolves under *base*."""
70+
target = tmp_path / "uid" / "charon.env"
71+
target.parent.mkdir(parents=True)
72+
assert file.resolve_path_under_base(tmp_path, "uid/charon.env") == target.resolve()
73+
74+
75+
def test_resolve_path_under_base_rejects_absolute(tmp_path: Path) -> None:
76+
"""Absolute paths are rejected even if they exist."""
77+
with pytest.raises(ValueError, match="must be relative"):
78+
file.resolve_path_under_base(tmp_path, "/etc/passwd")
79+
80+
81+
def test_resolve_path_under_base_rejects_traversal(tmp_path: Path) -> None:
82+
"""``..`` segments that escape *base* are rejected."""
83+
with pytest.raises(ValueError, match="must stay under"):
84+
file.resolve_path_under_base(tmp_path, "../outside")
85+
86+
87+
def test_resolve_path_under_base_rejects_blank(tmp_path: Path) -> None:
88+
"""Blank relative paths are rejected."""
89+
with pytest.raises(ValueError, match="must be relative"):
90+
file.resolve_path_under_base(tmp_path, " ")
91+
92+
6693
def test_make_tempfile_path_empty_file() -> None:
6794
"""A `None` payload leaves the created file with zero length."""
6895
p = file.make_tempfile_path("t-", None)
@@ -94,3 +121,35 @@ def test_decompress_gzip_bounded_rejects_oversized_output() -> None:
94121
compressed = gzip.compress(raw)
95122
with pytest.raises(ValueError, match="gzip bomb"):
96123
file.decompress_gzip_bounded(compressed, max_bytes=1000)
124+
125+
126+
def test_is_gzip_or_tar_archive_posix_tar() -> None:
127+
"""POSIX tar archives are recognized."""
128+
129+
def fake_file_cmd(
130+
cmd: Sequence[str | Path],
131+
) -> subprocess.CompletedProcess[str]:
132+
return subprocess.CompletedProcess(
133+
[str(x) for x in cmd],
134+
0,
135+
stdout="POSIX tar archive\n",
136+
stderr="",
137+
)
138+
139+
assert file.is_gzip_or_tar_archive(Path("/tmp/archive.tar"), file_cmd=fake_file_cmd)
140+
141+
142+
def test_is_gzip_or_tar_archive_rejects_other_types() -> None:
143+
"""Non-archive ``file -b`` output returns False."""
144+
145+
def fake_file_cmd(
146+
cmd: Sequence[str | Path],
147+
) -> subprocess.CompletedProcess[str]:
148+
return subprocess.CompletedProcess(
149+
[str(x) for x in cmd],
150+
0,
151+
stdout="ASCII text\n",
152+
stderr="",
153+
)
154+
155+
assert not file.is_gzip_or_tar_archive(Path("/tmp/readme.txt"), file_cmd=fake_file_cmd)

0 commit comments

Comments
 (0)