Skip to content

Commit 7446db0

Browse files
authored
feat(RELEASE-2479): add collect_index_images.py script (#851)
This commit adds a python script to replicate the functionality of the inline bash script in the collect-index-images managed task in the catalog repo. The task will be updated to use this python module instead. Assisted-By: Cursor Signed-off-by: Johnny Bieren <jbieren@redhat.com>
1 parent 86975a4 commit 7446db0

4 files changed

Lines changed: 705 additions & 0 deletions

File tree

scripts/python/helpers/image_ref.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,65 @@
77

88
import http_client
99
import requests
10+
from logger import logger
1011

1112
_QUAY_SHA_TAG = re.compile(r"^[0-9a-f]{40}$")
1213
_MAX_QUAY_TAG_PAGES = 50
1314

1415

16+
def translate_delivery_repo(repo: str) -> list[dict[str, str]]:
17+
"""Translate a Quay delivery-repo reference to public registry URLs.
18+
19+
Return two dicts with `repo` and `url` keys: one for `redhat.io` and
20+
one for `access.redhat.com`.
21+
"""
22+
if not repo.strip():
23+
msg = "Please pass a repo to translate like 'quay.io/redhat-prod/product----repo'"
24+
raise ValueError(msg)
25+
26+
normalized = repo.replace("----", "/")
27+
io_url: str
28+
access_url: str
29+
30+
if normalized.startswith("quay.io/redhat-prod/"):
31+
io_url = "registry.redhat.io" + normalized[len("quay.io/redhat-prod") :]
32+
access_url = "registry.access.redhat.com" + normalized[len("quay.io/redhat-prod") :]
33+
elif normalized.startswith("quay.io/redhat-pending/"):
34+
io_url = "registry.stage.redhat.io" + normalized[len("quay.io/redhat-pending") :]
35+
access_url = (
36+
"registry.access.stage.redhat.com" + normalized[len("quay.io/redhat-pending") :]
37+
)
38+
elif normalized.startswith("quay.io/rh-flatpaks-prod/"):
39+
io_url = "flatpaks.registry.redhat.io" + normalized[len("quay.io/rh-flatpaks-prod") :]
40+
access_url = (
41+
"registry.access.redhat.com" + normalized[len("quay.io/rh-flatpaks-prod") :]
42+
)
43+
elif normalized.startswith("quay.io/rh-flatpaks-stage/"):
44+
io_url = (
45+
"flatpaks.registry.stage.redhat.io"
46+
+ normalized[len("quay.io/rh-flatpaks-stage") :]
47+
)
48+
access_url = (
49+
"registry.access.stage.redhat.com" + normalized[len("quay.io/rh-flatpaks-stage") :]
50+
)
51+
elif normalized.startswith("quay.io/redhat/"):
52+
io_url = "registry.redhat.io" + normalized[len("quay.io/redhat") :]
53+
access_url = "registry.access.redhat.com" + normalized[len("quay.io/redhat") :]
54+
else:
55+
logger.warning(
56+
"Repo to translate is not in expected format. If this is not "
57+
"an index image, the expected format is: "
58+
"quay.io/redhat-[prod,pending]/product----repo",
59+
)
60+
io_url = normalized
61+
access_url = ""
62+
63+
return [
64+
{"repo": "redhat.io", "url": io_url},
65+
{"repo": "access.redhat.com", "url": access_url},
66+
]
67+
68+
1569
def resolve_quay_digest_to_git_sha(digest: str, container_image: str) -> str | None:
1670
"""Resolve an image digest to a git commit SHA via the Quay public API.
1771

scripts/python/helpers/test_image_ref.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,23 @@
33
from __future__ import annotations
44

55
import json
6+
import logging
67
from unittest import mock
78

89
import image_ref
910
import pytest
1011
import requests
1112

1213

14+
@pytest.fixture(autouse=True)
15+
def _propagate_release_logger() -> None:
16+
"""Allow caplog to capture records from the 'release' logger."""
17+
release_logger = logging.getLogger("release")
18+
release_logger.propagate = True
19+
yield
20+
release_logger.propagate = False
21+
22+
1323
def test_pyxis_url_for_pull_spec_with_tag_and_registry_rewrite() -> None:
1424
"""Tagged refs map to `.../tag/<tag>` and rewrite registry.redhat.io host."""
1525
out = image_ref.pyxis_url_for_pull_spec(
@@ -132,3 +142,90 @@ def test_resolve_quay_digest_handles_exception() -> None:
132142
"quay.io/org/repo@sha256:abc",
133143
)
134144
assert out is None
145+
146+
147+
def test_translate_delivery_repo_rejects_empty_repo() -> None:
148+
"""Empty repo input raises `ValueError`."""
149+
with pytest.raises(ValueError, match="Please pass a repo"):
150+
image_ref.translate_delivery_repo("")
151+
152+
153+
def test_translate_delivery_repo_redhat_prod() -> None:
154+
"""Translate quay.io/redhat-prod delivery repos to public registries."""
155+
out = image_ref.translate_delivery_repo("quay.io/redhat-prod/product----repo:v1.0")
156+
assert out == [
157+
{"repo": "redhat.io", "url": "registry.redhat.io/product/repo:v1.0"},
158+
{
159+
"repo": "access.redhat.com",
160+
"url": "registry.access.redhat.com/product/repo:v1.0",
161+
},
162+
]
163+
164+
165+
def test_translate_delivery_repo_redhat_pending() -> None:
166+
"""Translate quay.io/redhat-pending delivery repos to stage registries."""
167+
out = image_ref.translate_delivery_repo("quay.io/redhat-pending/product----repo:v1.0")
168+
assert out == [
169+
{"repo": "redhat.io", "url": "registry.stage.redhat.io/product/repo:v1.0"},
170+
{
171+
"repo": "access.redhat.com",
172+
"url": "registry.access.stage.redhat.com/product/repo:v1.0",
173+
},
174+
]
175+
176+
177+
def test_translate_delivery_repo_flatpaks_prod() -> None:
178+
"""Translate quay.io/rh-flatpaks-prod delivery repos."""
179+
out = image_ref.translate_delivery_repo("quay.io/rh-flatpaks-prod/product----repo:v1")
180+
assert out == [
181+
{"repo": "redhat.io", "url": "flatpaks.registry.redhat.io/product/repo:v1"},
182+
{
183+
"repo": "access.redhat.com",
184+
"url": "registry.access.redhat.com/product/repo:v1",
185+
},
186+
]
187+
188+
189+
def test_translate_delivery_repo_flatpaks_stage() -> None:
190+
"""Translate quay.io/rh-flatpaks-stage delivery repos."""
191+
out = image_ref.translate_delivery_repo("quay.io/rh-flatpaks-stage/product----repo:v1")
192+
assert out == [
193+
{
194+
"repo": "redhat.io",
195+
"url": "flatpaks.registry.stage.redhat.io/product/repo:v1",
196+
},
197+
{
198+
"repo": "access.redhat.com",
199+
"url": "registry.access.stage.redhat.com/product/repo:v1",
200+
},
201+
]
202+
203+
204+
def test_translate_delivery_repo_index_image() -> None:
205+
"""Translate quay.io/redhat index image repos."""
206+
out = image_ref.translate_delivery_repo(
207+
"quay.io/redhat/redhat----fbc-target-index:v4.12",
208+
)
209+
assert out == [
210+
{
211+
"repo": "redhat.io",
212+
"url": "registry.redhat.io/redhat/fbc-target-index:v4.12",
213+
},
214+
{
215+
"repo": "access.redhat.com",
216+
"url": "registry.access.redhat.com/redhat/fbc-target-index:v4.12",
217+
},
218+
]
219+
220+
221+
def test_translate_delivery_repo_unknown_format_warns(
222+
caplog: pytest.LogCaptureFixture,
223+
) -> None:
224+
"""Unknown formats pass through the repo and emit a warning."""
225+
with caplog.at_level(logging.WARNING, logger="release"):
226+
out = image_ref.translate_delivery_repo("registry.example.com/org/repo:tag")
227+
assert out == [
228+
{"repo": "redhat.io", "url": "registry.example.com/org/repo:tag"},
229+
{"repo": "access.redhat.com", "url": ""},
230+
]
231+
assert "Repo to translate is not in expected format" in caplog.text
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
#!/usr/bin/env python3
2+
"""Build a Pyxis index-image snapshot JSON from internal-request results."""
3+
4+
from __future__ import annotations
5+
6+
import json
7+
import re
8+
from pathlib import Path
9+
from typing import Any
10+
11+
import file
12+
import image_ref
13+
import tekton
14+
from logger import logger
15+
16+
SNAPSHOT_FILENAME = "index_image_snapshot.json"
17+
_BARE_OCP_TAG = re.compile(r"^v[0-9]+\.[0-9]+$")
18+
19+
20+
def split_target_index(target_index: str) -> tuple[str, str]:
21+
"""Split *target_index* into repository and tag at the last colon."""
22+
repository, _, tag = target_index.rpartition(":")
23+
if not repository or not tag:
24+
msg = f"target_index must contain a repository and tag: {target_index!r}"
25+
raise ValueError(msg)
26+
return repository, tag
27+
28+
29+
def build_tags(tag: str, build_timestamp: str) -> list[str]:
30+
"""Return snapshot tags, appending a timestamp tag for bare OCP versions."""
31+
tags = [tag]
32+
if _BARE_OCP_TAG.fullmatch(tag):
33+
tags.append(f"{tag}-{build_timestamp}")
34+
return tags
35+
36+
37+
def translation_repo_url(translated: list[dict[str, Any]], repo_key: str) -> str:
38+
"""Return the host/path portion of a translated delivery-repo URL."""
39+
for entry in translated:
40+
if entry.get("repo") != repo_key:
41+
continue
42+
url = entry.get("url")
43+
if not isinstance(url, str) or not url:
44+
return ""
45+
return url.split(":", 1)[0]
46+
return ""
47+
48+
49+
def build_repo_object(
50+
repository: str,
51+
tags: list[str],
52+
rh_registry_repo: str,
53+
registry_access_repo: str,
54+
) -> dict[str, Any]:
55+
"""Build the repositories entry for one index image component."""
56+
repo_object: dict[str, Any] = {
57+
"url": repository,
58+
"tags": tags,
59+
}
60+
if rh_registry_repo:
61+
repo_object["rh-registry-repo"] = rh_registry_repo
62+
if registry_access_repo:
63+
repo_object["registry-access-repo"] = registry_access_repo
64+
return repo_object
65+
66+
67+
def build_index_component(
68+
*,
69+
source_index: str,
70+
repository: str,
71+
tags: list[str],
72+
image_digests: list[Any],
73+
repo_object: dict[str, Any],
74+
) -> dict[str, Any]:
75+
"""Build one snapshot component object."""
76+
return {
77+
"containerImage": source_index,
78+
"repository": repository,
79+
"repositories": [repo_object],
80+
"tags": tags,
81+
"imageDigests": image_digests,
82+
}
83+
84+
85+
def collect_index_image_components(
86+
results: dict[str, Any],
87+
build_timestamp: str,
88+
) -> dict[str, Any]:
89+
"""Transform internal-request results into an index-image snapshot."""
90+
components_in = results.get("components")
91+
if not isinstance(components_in, list):
92+
msg = "internal request results components must be a JSON array"
93+
raise ValueError(msg)
94+
95+
snapshot_components: list[dict[str, Any]] = []
96+
for index, row in enumerate(components_in):
97+
if not isinstance(row, dict):
98+
msg = f"components[{index}] must be a JSON object"
99+
raise ValueError(msg)
100+
101+
target_index = row.get("target_index")
102+
source_index = row.get("index_image_resolved")
103+
if not isinstance(target_index, str) or not target_index.strip():
104+
msg = f"components[{index}].target_index must be a non-empty string"
105+
raise ValueError(msg)
106+
if not isinstance(source_index, str) or not source_index.strip():
107+
msg = f"components[{index}].index_image_resolved must be a non-empty string"
108+
raise ValueError(msg)
109+
110+
repository, tag = split_target_index(target_index)
111+
tags = build_tags(tag, build_timestamp)
112+
image_digests = row.get("image_digests", [])
113+
if not isinstance(image_digests, list):
114+
msg = f"components[{index}].image_digests must be a JSON array"
115+
raise ValueError(msg)
116+
117+
logger.info(
118+
"Processing index image %s (%d/%d)",
119+
target_index,
120+
index + 1,
121+
len(components_in),
122+
)
123+
translated = image_ref.translate_delivery_repo(repository)
124+
rh_registry_repo = translation_repo_url(translated, "redhat.io")
125+
registry_access_repo = translation_repo_url(translated, "access.redhat.com")
126+
repo_object = build_repo_object(
127+
repository,
128+
tags,
129+
rh_registry_repo,
130+
registry_access_repo,
131+
)
132+
snapshot_components.append(
133+
build_index_component(
134+
source_index=source_index,
135+
repository=repository,
136+
tags=tags,
137+
image_digests=image_digests,
138+
repo_object=repo_object,
139+
),
140+
)
141+
142+
return {"components": snapshot_components}
143+
144+
145+
def run_collect_index_images(
146+
*,
147+
data_dir: Path,
148+
internal_request_results_file: Path,
149+
build_timestamp: str,
150+
snapshot_path: Path,
151+
index_image_snapshot_result_path: Path,
152+
) -> None:
153+
"""Load results, build the snapshot JSON, and write Tekton outputs."""
154+
results_path = data_dir / internal_request_results_file
155+
if not results_path.is_file():
156+
msg = f"internal request results file not found: {results_path}"
157+
raise FileNotFoundError(msg)
158+
159+
logger.info("Loading internal request results from %s", results_path)
160+
results = file.load_json_dict(results_path)
161+
snapshot = collect_index_image_components(results, build_timestamp)
162+
163+
snapshot_path.parent.mkdir(parents=True, exist_ok=True)
164+
snapshot_path.write_text(json.dumps(snapshot) + "\n", encoding="utf-8")
165+
index_image_snapshot_result_path.write_text(SNAPSHOT_FILENAME, encoding="utf-8")
166+
logger.info("Wrote index image snapshot to %s", snapshot_path)
167+
168+
169+
def main() -> int:
170+
"""Run the collect-index-images workflow."""
171+
data_dir = Path(tekton.require_env("PARAM_DATA_DIR"))
172+
run_collect_index_images(
173+
data_dir=data_dir,
174+
internal_request_results_file=Path(
175+
tekton.require_env("PARAM_INTERNAL_REQUEST_RESULTS_FILE"),
176+
),
177+
build_timestamp=tekton.require_env("PARAM_BUILD_TIMESTAMP"),
178+
snapshot_path=data_dir / SNAPSHOT_FILENAME,
179+
index_image_snapshot_result_path=tekton.result_paths_from_env(
180+
"RESULT_INDEX_IMAGE_SNAPSHOT_PATH",
181+
)[0],
182+
)
183+
return 0
184+
185+
186+
if __name__ == "__main__":
187+
raise SystemExit(main())

0 commit comments

Comments
 (0)