Skip to content

Commit fbf0f92

Browse files
AlldakosciCZ
andauthored
feat(ISV-7116): create script for direct container signing (#819)
A new script rh_direct_sign_image.py is created to support direct container signing method. The script will be used in the new Tekton task that replaces current signing method. The script resolves all digests for components, verifies if images are already signed and filter them out. Whatever remains is signed using internal request and new direct signing pipeline. - PyxisSignature/SigningItem dataclasses for structured data - Multi-arch digest collection via skopeo inspect --raw - Source container resolution via select-oci-auth + oras resolve - ConfigMap reading for SIG_KEY_NAME(S) - Greedy base64 batching with --batch-max-size and --output args - Full unit test suite (34 tests) Assisted-by: Claude Sonnet 4.6 Signed-off-by: Ales Raszka <araszka@redhat.com> Co-authored-by: Jan Koscielniak <jakoscie@redhat.com>
1 parent 8a04c8d commit fbf0f92

11 files changed

Lines changed: 2267 additions & 2 deletions

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ ENV PATH="$PATH:/home/pubtools-pulp-wrapper"
172172
ENV PATH="$PATH:/home/pubtools-marketplacesvm-wrapper"
173173
ENV PATH="$PATH:/home/developer-portal-wrapper"
174174
ENV PATH="$PATH:/home/publish-to-cgw-wrapper"
175+
ENV PATH="$PATH:/home/scripts/python/tasks/managed"
175176
# Flat imports: helpers and task scripts must be importable.
176177
# Tests use the same layout via pyproject [tool.pytest.ini_options] pythonpath.
177178
# Keep /home for other modules (e.g. pyxis, sbom) that expect it.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ line-length = 95
3434
pythonpath = [
3535
".",
3636
"integration-tests/lib",
37+
"pyxis",
3738
"scripts/python/helpers",
3839
"scripts/python/tasks/internal",
3940
"scripts/python/tasks/managed",
@@ -45,6 +46,7 @@ pythonpath = [
4546
[dependency-groups]
4647
dev = [
4748
"black>=25.11.0",
49+
"flake8>=7.3.0",
4850
"pytest>=8.4.2",
4951
"pytest-cov>=6.0",
5052
]

scripts/python/helpers/kubectl.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
"""Helpers for interacting with Kubernetes via kubectl."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
from typing import Any
7+
8+
from subprocess_cmd import run_cmd
9+
10+
11+
def get_configmap(name: str) -> dict[str, Any]:
12+
"""Fetch a Kubernetes ConfigMap by name and return its parsed JSON.
13+
14+
Args:
15+
name: The ConfigMap resource name to retrieve.
16+
17+
Returns:
18+
The full ConfigMap object as a parsed dictionary.
19+
20+
Raises:
21+
RuntimeError: If kubectl exits with a non-zero return code.
22+
23+
"""
24+
result = run_cmd(["kubectl", "get", f"cm/{name}", "-ojson"], check=False)
25+
if result.returncode != 0:
26+
raise RuntimeError(f"Failed to retrieve ConfigMap '{name}': {result.stderr.strip()}")
27+
return json.loads(result.stdout)

scripts/python/helpers/oras_utils.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,37 @@
44

55
import re
66
import subprocess
7+
import tempfile
78
from pathlib import Path
89

10+
from subprocess_cmd import run_cmd
11+
12+
13+
def oras_resolve(reference: str) -> str:
14+
"""Resolve the digest of an OCI image reference using oras.
15+
16+
Obtains registry credentials via ``select-oci-auth``, writes them to a
17+
temporary auth file, then runs ``oras resolve`` and returns the digest.
18+
19+
Raises ``RuntimeError`` if oras resolve exits non-zero.
20+
"""
21+
with tempfile.NamedTemporaryFile(mode="w", suffix=".json") as auth_file:
22+
select_auth = run_cmd(["select-oci-auth", reference])
23+
auth_file.write(select_auth.stdout)
24+
auth_file.flush()
25+
26+
result = run_cmd(
27+
["oras", "resolve", "--registry-config", auth_file.name, reference],
28+
check=False,
29+
)
30+
31+
if result.returncode != 0:
32+
raise RuntimeError(
33+
f"oras resolve failed for {reference!r} (exit {result.returncode}):"
34+
f" {result.stderr.strip()}"
35+
)
36+
return result.stdout.strip()
37+
938

1039
def oras_login(registry: str, username: str, password: str) -> None:
1140
"""Log in to an OCI registry via oras using username/password credentials.

scripts/python/helpers/subprocess_cmd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def run_cmd(
2626
"""Run *cmd*; capture stdout as text; optionally append stderr to *stderr_path*."""
2727
# Child must inherit pod env (PATH, KUBECONFIG, etc.); only overlay *env*.
2828
merged: dict[str, str] = {**os.environ, **dict(env or {})}
29-
err_f: Any = subprocess.DEVNULL
29+
err_f: Any = subprocess.PIPE
3030
fh: Any = None
3131
try:
3232
if stderr_path is not None:
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
"""Tests for `kubect`."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
from unittest.mock import MagicMock, patch
7+
8+
from kubectl import get_configmap
9+
10+
11+
def test_get_configmap_runs_kubectl_and_returns_parsed_json() -> None:
12+
"""Kubectl is called with the correct arguments and its output is parsed as JSON."""
13+
cm_json = json.dumps({"data": {"SIG_KEY_NAME": "some-key"}})
14+
with patch("kubectl.run_cmd") as mock_run:
15+
mock_run.return_value = MagicMock(stdout=cm_json, returncode=0)
16+
result = get_configmap("signing-config-map")
17+
18+
mock_run.assert_called_once_with(
19+
["kubectl", "get", "cm/signing-config-map", "-ojson"], check=False
20+
)
21+
assert result == {"data": {"SIG_KEY_NAME": "some-key"}}
22+
23+
24+
def test_get_configmap_raises_on_kubectl_failure() -> None:
25+
"""RuntimeError is raised with the configmap name and stderr when kubectl fails."""
26+
import pytest
27+
28+
with patch("kubectl.run_cmd") as mock_run:
29+
mock_run.return_value = MagicMock(
30+
returncode=1, stderr="Error from server (NotFound): configmaps not found"
31+
)
32+
with pytest.raises(RuntimeError, match="signing-config-map"):
33+
get_configmap("signing-config-map")
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
"""Unit tests for oras_utils."""
2+
3+
from __future__ import annotations
4+
5+
from unittest.mock import MagicMock, call, patch
6+
7+
import pytest
8+
9+
from oras_utils import oras_resolve
10+
11+
12+
def test_oras_resolve_calls_select_oci_auth_with_reference() -> None:
13+
"""select-oci-auth is called with the image reference."""
14+
with patch("oras_utils.run_cmd") as mock_run:
15+
mock_run.side_effect = [
16+
MagicMock(stdout='{"auths": {}}'),
17+
MagicMock(returncode=0, stdout="sha256:abc\n"),
18+
]
19+
oras_resolve("registry.io/repo:tag")
20+
21+
first_call = mock_run.call_args_list[0]
22+
assert first_call == call(["select-oci-auth", "registry.io/repo:tag"])
23+
24+
25+
def test_oras_resolve_passes_auth_file_to_oras() -> None:
26+
"""Oras resolve is called with --registry-config pointing to the auth temp file."""
27+
with patch("oras_utils.run_cmd") as mock_run:
28+
mock_run.side_effect = [
29+
MagicMock(stdout='{"auths": {}}'),
30+
MagicMock(returncode=0, stdout="sha256:abc\n"),
31+
]
32+
oras_resolve("registry.io/repo:tag")
33+
34+
second_call = mock_run.call_args_list[1]
35+
cmd = second_call.args[0]
36+
assert cmd[0] == "oras"
37+
assert cmd[1] == "resolve"
38+
assert "--registry-config" in cmd
39+
assert "registry.io/repo:tag" in cmd
40+
41+
42+
def test_oras_resolve_returns_stripped_digest() -> None:
43+
"""Returns the digest from oras resolve output, stripped of whitespace."""
44+
with patch("oras_utils.run_cmd") as mock_run:
45+
mock_run.side_effect = [
46+
MagicMock(stdout="{}"),
47+
MagicMock(returncode=0, stdout="sha256:deadbeef\n"),
48+
]
49+
result = oras_resolve("registry.io/repo:tag")
50+
51+
assert result == "sha256:deadbeef"
52+
53+
54+
def test_oras_resolve_raises_on_nonzero_returncode() -> None:
55+
"""Raises RuntimeError when oras resolve exits non-zero."""
56+
with patch("oras_utils.run_cmd") as mock_run:
57+
mock_run.side_effect = [
58+
MagicMock(stdout="{}"),
59+
MagicMock(returncode=1, stdout="", stderr="unauthorized"),
60+
]
61+
with pytest.raises(RuntimeError):
62+
oras_resolve("registry.io/repo:tag")

scripts/python/helpers/test_subprocess_cmd.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ def test_run_cmd_stderr_path_on_success(tmp_path) -> None:
4141
assert log.read_text(encoding="utf-8") == ""
4242

4343

44+
def test_run_cmd_captures_stderr_when_no_path_given() -> None:
45+
"""Stderr is captured in the return value when no stderr_path is provided."""
46+
r = subprocess_cmd.run_cmd(["sh", "-c", "echo error-msg >&2"], check=True)
47+
assert "error-msg" in r.stderr
48+
49+
4450
def test_run_cmd_text_success() -> None:
4551
"""``run_cmd_text`` returns stdout from a successful subprocess."""
4652
with mock.patch("subprocess_cmd.subprocess.run") as run:

0 commit comments

Comments
 (0)