Skip to content

Commit 5a0ab16

Browse files
committed
feat(RELEASE-2479): add collect_index_images.py script
This commit adds a python script to replicate the functionality of the inline bash script in the collect-index-images managed task in the catalog repo. The task will be updated to use this python module instead. Assisted-By: Cursor Signed-off-by: Johnny Bieren <jbieren@redhat.com>
1 parent a0c68a7 commit 5a0ab16

4 files changed

Lines changed: 695 additions & 0 deletions

File tree

scripts/python/helpers/image_ref.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import json
66
import re
7+
import sys
78

89
import http_client
910
import requests
@@ -12,6 +13,60 @@
1213
_MAX_QUAY_TAG_PAGES = 50
1314

1415

16+
def translate_delivery_repo(repo: str) -> list[dict[str, str]]:
17+
"""Translate a Quay delivery-repo reference to public registry URLs.
18+
19+
Return two dicts with `repo` and `url` keys: one for `redhat.io` and
20+
one for `access.redhat.com`.
21+
"""
22+
if not repo.strip():
23+
msg = "Please pass a repo to translate like 'quay.io/redhat-prod/product----repo'"
24+
raise ValueError(msg)
25+
26+
normalized = repo.replace("----", "/")
27+
io_url: str
28+
access_url: str
29+
30+
if normalized.startswith("quay.io/redhat-prod/"):
31+
io_url = "registry.redhat.io" + normalized[len("quay.io/redhat-prod") :]
32+
access_url = "registry.access.redhat.com" + normalized[len("quay.io/redhat-prod") :]
33+
elif normalized.startswith("quay.io/redhat-pending/"):
34+
io_url = "registry.stage.redhat.io" + normalized[len("quay.io/redhat-pending") :]
35+
access_url = (
36+
"registry.access.stage.redhat.com" + normalized[len("quay.io/redhat-pending") :]
37+
)
38+
elif normalized.startswith("quay.io/rh-flatpaks-prod/"):
39+
io_url = "flatpaks.registry.redhat.io" + normalized[len("quay.io/rh-flatpaks-prod") :]
40+
access_url = (
41+
"registry.access.redhat.com" + normalized[len("quay.io/rh-flatpaks-prod") :]
42+
)
43+
elif normalized.startswith("quay.io/rh-flatpaks-stage/"):
44+
io_url = (
45+
"flatpaks.registry.stage.redhat.io"
46+
+ normalized[len("quay.io/rh-flatpaks-stage") :]
47+
)
48+
access_url = (
49+
"registry.access.stage.redhat.com" + normalized[len("quay.io/rh-flatpaks-stage") :]
50+
)
51+
elif normalized.startswith("quay.io/redhat/"):
52+
io_url = "registry.redhat.io" + normalized[len("quay.io/redhat") :]
53+
access_url = "registry.access.redhat.com" + normalized[len("quay.io/redhat") :]
54+
else:
55+
print(
56+
"Warning: Repo to translate is not in expected format. If this is not "
57+
"an index image, the expected format is: "
58+
"quay.io/redhat-[prod,pending]/product----repo",
59+
file=sys.stderr,
60+
)
61+
io_url = normalized
62+
access_url = ""
63+
64+
return [
65+
{"repo": "redhat.io", "url": io_url},
66+
{"repo": "access.redhat.com", "url": access_url},
67+
]
68+
69+
1570
def resolve_quay_digest_to_git_sha(digest: str, container_image: str) -> str | None:
1671
"""Resolve an image digest to a git commit SHA via the Quay public API.
1772

scripts/python/helpers/test_image_ref.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,89 @@ def test_resolve_quay_digest_handles_exception() -> None:
132132
"quay.io/org/repo@sha256:abc",
133133
)
134134
assert out is None
135+
136+
137+
def test_translate_delivery_repo_rejects_empty_repo() -> None:
138+
"""Empty repo input raises `ValueError`."""
139+
with pytest.raises(ValueError, match="Please pass a repo"):
140+
image_ref.translate_delivery_repo("")
141+
142+
143+
def test_translate_delivery_repo_redhat_prod() -> None:
144+
"""Translate quay.io/redhat-prod delivery repos to public registries."""
145+
out = image_ref.translate_delivery_repo("quay.io/redhat-prod/product----repo:v1.0")
146+
assert out == [
147+
{"repo": "redhat.io", "url": "registry.redhat.io/product/repo:v1.0"},
148+
{
149+
"repo": "access.redhat.com",
150+
"url": "registry.access.redhat.com/product/repo:v1.0",
151+
},
152+
]
153+
154+
155+
def test_translate_delivery_repo_redhat_pending() -> None:
156+
"""Translate quay.io/redhat-pending delivery repos to stage registries."""
157+
out = image_ref.translate_delivery_repo("quay.io/redhat-pending/product----repo:v1.0")
158+
assert out == [
159+
{"repo": "redhat.io", "url": "registry.stage.redhat.io/product/repo:v1.0"},
160+
{
161+
"repo": "access.redhat.com",
162+
"url": "registry.access.stage.redhat.com/product/repo:v1.0",
163+
},
164+
]
165+
166+
167+
def test_translate_delivery_repo_flatpaks_prod() -> None:
168+
"""Translate quay.io/rh-flatpaks-prod delivery repos."""
169+
out = image_ref.translate_delivery_repo("quay.io/rh-flatpaks-prod/product----repo:v1")
170+
assert out == [
171+
{"repo": "redhat.io", "url": "flatpaks.registry.redhat.io/product/repo:v1"},
172+
{
173+
"repo": "access.redhat.com",
174+
"url": "registry.access.redhat.com/product/repo:v1",
175+
},
176+
]
177+
178+
179+
def test_translate_delivery_repo_flatpaks_stage() -> None:
180+
"""Translate quay.io/rh-flatpaks-stage delivery repos."""
181+
out = image_ref.translate_delivery_repo("quay.io/rh-flatpaks-stage/product----repo:v1")
182+
assert out == [
183+
{
184+
"repo": "redhat.io",
185+
"url": "flatpaks.registry.stage.redhat.io/product/repo:v1",
186+
},
187+
{
188+
"repo": "access.redhat.com",
189+
"url": "registry.access.stage.redhat.com/product/repo:v1",
190+
},
191+
]
192+
193+
194+
def test_translate_delivery_repo_index_image() -> None:
195+
"""Translate quay.io/redhat index image repos."""
196+
out = image_ref.translate_delivery_repo(
197+
"quay.io/redhat/redhat----fbc-target-index:v4.12",
198+
)
199+
assert out == [
200+
{
201+
"repo": "redhat.io",
202+
"url": "registry.redhat.io/redhat/fbc-target-index:v4.12",
203+
},
204+
{
205+
"repo": "access.redhat.com",
206+
"url": "registry.access.redhat.com/redhat/fbc-target-index:v4.12",
207+
},
208+
]
209+
210+
211+
def test_translate_delivery_repo_unknown_format_warns(
212+
capsys: pytest.CaptureFixture[str],
213+
) -> None:
214+
"""Unknown formats pass through the repo and emit a warning."""
215+
out = image_ref.translate_delivery_repo("registry.example.com/org/repo:tag")
216+
assert out == [
217+
{"repo": "redhat.io", "url": "registry.example.com/org/repo:tag"},
218+
{"repo": "access.redhat.com", "url": ""},
219+
]
220+
assert "Warning: Repo to translate is not in expected format" in capsys.readouterr().err
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
#!/usr/bin/env python3
2+
"""Build a Pyxis index-image snapshot JSON from internal-request results."""
3+
4+
from __future__ import annotations
5+
6+
import json
7+
import re
8+
from pathlib import Path
9+
from typing import Any
10+
11+
import file
12+
import image_ref
13+
import tekton
14+
from logger import logger
15+
16+
SNAPSHOT_FILENAME = "index_image_snapshot.json"
17+
_BARE_OCP_TAG = re.compile(r"^v[0-9]+\.[0-9]+$")
18+
19+
20+
def split_target_index(target_index: str) -> tuple[str, str]:
21+
"""Split *target_index* into repository and tag at the last colon."""
22+
repository, _, tag = target_index.rpartition(":")
23+
if not repository or not tag:
24+
msg = f"target_index must contain a repository and tag: {target_index!r}"
25+
raise ValueError(msg)
26+
return repository, tag
27+
28+
29+
def build_tags(tag: str, build_timestamp: str) -> list[str]:
30+
"""Return snapshot tags, appending a timestamp tag for bare OCP versions."""
31+
tags = [tag]
32+
if _BARE_OCP_TAG.fullmatch(tag):
33+
tags.append(f"{tag}-{build_timestamp}")
34+
return tags
35+
36+
37+
def translation_repo_url(translated: list[dict[str, Any]], repo_key: str) -> str:
38+
"""Return the host/path portion of a translated delivery-repo URL."""
39+
for entry in translated:
40+
if entry.get("repo") != repo_key:
41+
continue
42+
url = entry.get("url")
43+
if not isinstance(url, str) or not url:
44+
return ""
45+
return url.split(":", 1)[0]
46+
return ""
47+
48+
49+
def build_repo_object(
50+
repository: str,
51+
tags: list[str],
52+
rh_registry_repo: str,
53+
registry_access_repo: str,
54+
) -> dict[str, Any]:
55+
"""Build the repositories entry for one index image component."""
56+
repo_object: dict[str, Any] = {
57+
"url": repository,
58+
"tags": tags,
59+
}
60+
if rh_registry_repo:
61+
repo_object["rh-registry-repo"] = rh_registry_repo
62+
if registry_access_repo:
63+
repo_object["registry-access-repo"] = registry_access_repo
64+
return repo_object
65+
66+
67+
def build_index_component(
68+
*,
69+
source_index: str,
70+
repository: str,
71+
tags: list[str],
72+
image_digests: list[Any],
73+
repo_object: dict[str, Any],
74+
) -> dict[str, Any]:
75+
"""Build one snapshot component object."""
76+
return {
77+
"containerImage": source_index,
78+
"repository": repository,
79+
"repositories": [repo_object],
80+
"tags": tags,
81+
"imageDigests": image_digests,
82+
}
83+
84+
85+
def collect_index_image_components(
86+
results: dict[str, Any],
87+
build_timestamp: str,
88+
) -> dict[str, Any]:
89+
"""Transform internal-request results into an index-image snapshot."""
90+
components_in = results.get("components")
91+
if not isinstance(components_in, list):
92+
msg = "internal request results components must be a JSON array"
93+
raise ValueError(msg)
94+
95+
snapshot_components: list[dict[str, Any]] = []
96+
for index, row in enumerate(components_in):
97+
if not isinstance(row, dict):
98+
msg = f"components[{index}] must be a JSON object"
99+
raise ValueError(msg)
100+
101+
target_index = row.get("target_index")
102+
source_index = row.get("index_image_resolved")
103+
if not isinstance(target_index, str) or not target_index.strip():
104+
msg = f"components[{index}].target_index must be a non-empty string"
105+
raise ValueError(msg)
106+
if not isinstance(source_index, str) or not source_index.strip():
107+
msg = f"components[{index}].index_image_resolved must be a non-empty string"
108+
raise ValueError(msg)
109+
110+
repository, tag = split_target_index(target_index)
111+
tags = build_tags(tag, build_timestamp)
112+
image_digests = row.get("image_digests", [])
113+
if not isinstance(image_digests, list):
114+
msg = f"components[{index}].image_digests must be a JSON array"
115+
raise ValueError(msg)
116+
117+
logger.info(
118+
"Processing index image %s (%d/%d)",
119+
target_index,
120+
index + 1,
121+
len(components_in),
122+
)
123+
translated = image_ref.translate_delivery_repo(target_index)
124+
rh_registry_repo = translation_repo_url(translated, "redhat.io")
125+
registry_access_repo = translation_repo_url(translated, "access.redhat.com")
126+
repo_object = build_repo_object(
127+
repository,
128+
tags,
129+
rh_registry_repo,
130+
registry_access_repo,
131+
)
132+
snapshot_components.append(
133+
build_index_component(
134+
source_index=source_index,
135+
repository=repository,
136+
tags=tags,
137+
image_digests=image_digests,
138+
repo_object=repo_object,
139+
),
140+
)
141+
142+
return {"components": snapshot_components}
143+
144+
145+
def run_collect_index_images(
146+
*,
147+
data_dir: Path,
148+
internal_request_results_file: Path,
149+
build_timestamp: str,
150+
snapshot_path: Path,
151+
index_image_snapshot_result_path: Path,
152+
) -> None:
153+
"""Load results, build the snapshot JSON, and write Tekton outputs."""
154+
results_path = data_dir / internal_request_results_file
155+
if not results_path.is_file():
156+
msg = f"internal request results file not found: {results_path}"
157+
raise FileNotFoundError(msg)
158+
159+
logger.info("Loading internal request results from %s", results_path)
160+
results = file.load_json_dict(results_path)
161+
snapshot = collect_index_image_components(results, build_timestamp)
162+
163+
snapshot_path.parent.mkdir(parents=True, exist_ok=True)
164+
snapshot_path.write_text(json.dumps(snapshot) + "\n", encoding="utf-8")
165+
index_image_snapshot_result_path.write_text(SNAPSHOT_FILENAME, encoding="utf-8")
166+
logger.info("Wrote index image snapshot to %s", snapshot_path)
167+
168+
169+
def main() -> int:
170+
"""Run the collect-index-images workflow."""
171+
data_dir = Path(tekton.require_env("PARAM_DATA_DIR"))
172+
run_collect_index_images(
173+
data_dir=data_dir,
174+
internal_request_results_file=Path(
175+
tekton.require_env("PARAM_INTERNAL_REQUEST_RESULTS_FILE"),
176+
),
177+
build_timestamp=tekton.require_env("PARAM_BUILD_TIMESTAMP"),
178+
snapshot_path=data_dir / SNAPSHOT_FILENAME,
179+
index_image_snapshot_result_path=tekton.result_paths_from_env(
180+
"RESULT_INDEX_IMAGE_SNAPSHOT_PATH",
181+
)[0],
182+
)
183+
return 0
184+
185+
186+
if __name__ == "__main__":
187+
raise SystemExit(main())

0 commit comments

Comments
 (0)