Skip to content

Commit 817199e

Browse files
authored
feat(RELEASE-1985): add create_advisory python script (#778)
This commit adds a python script to replicate the functionality of the inline bash script in the create-advisory internal task in the catalog repo. The task will be updated to use this python module instead. It also updates some helper modules so that future converted tasks can use them instead of it being inside the create_advisory module. Finally, it reworks apply_template.py a bit from the utils dir so that the create_advisory python script can call it natively instead of using subprocess to call apply_template.py. Assisted-By: Cursor Signed-off-by: Johnny Bieren <jbieren@redhat.com>
1 parent 3c26308 commit 817199e

22 files changed

Lines changed: 3521 additions & 53 deletions

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ ENV PATH="$PATH:/home/publish-to-cgw-wrapper"
160160
# Flat imports: helpers and task scripts must be importable.
161161
# Tests use the same layout via pyproject [tool.pytest.ini_options] pythonpath.
162162
# Keep /home for other modules (e.g. pyxis, sbom) that expect it.
163-
ENV PYTHONPATH="/home:/home/scripts/python/helpers:/home/scripts/python/tasks/internal"
163+
ENV PYTHONPATH="/home:/home/utils:/home/scripts/python/helpers:/home/scripts/python/tasks/internal"
164164

165165
# uv installs newer requests and certifi which don't use the system CA like the one installed via
166166
# dnf. So we need to point requests to the system CA bundle explicitly.

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@ description = "Konflux Release Service Utils"
55
readme = "README.md"
66
requires-python = ">=3.12"
77
dependencies = [
8+
"GitPython>=3.1.40",
9+
"PyYAML>=6.0.1",
810
"requests",
911
"requests-kerberos",
1012
"jinja2",
1113
"check-jsonschema",
14+
"jsonschema>=4.23",
1215
"jinja2-ansible-filters",
1316
"packaging",
1417
"packageurl-python",
@@ -32,6 +35,7 @@ pythonpath = [
3235
"integration-tests/lib",
3336
"scripts/python/helpers",
3437
"scripts/python/tasks/internal",
38+
"utils",
3539
]
3640

3741
[dependency-groups]
Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
"""Decode advisory task payloads and content filtering (idempotency rules)."""
2+
3+
from __future__ import annotations
4+
5+
import base64
6+
import gzip
7+
import json
8+
import re
9+
from pathlib import Path
10+
from typing import Any
11+
12+
import yaml
13+
14+
15+
def _strip_checksum_from_purl(purl: str) -> str:
16+
"""Strip `checksum` query/fragment parts from a package URL for comparison."""
17+
# purl checksum may appear as `&checksum=` (extra param), `?checksum=` in
18+
# the query, or trailing `?checksum=` before end — strip all three shapes.
19+
stripped = re.sub(r"&checksum=[^&]*", "", purl)
20+
stripped = re.sub(r"\?checksum=[^&]*&", "?", stripped)
21+
stripped = re.sub(r"\?checksum=[^&]*$", "", stripped)
22+
return stripped
23+
24+
25+
def _filter_image(content: list[Any], existing: list[Any]) -> list[Any]:
26+
"""Drop image rows that already exist (same image, tags, repository)."""
27+
# `tags` is compared as JSON lists (order and length must match).
28+
out: list[Any] = []
29+
for item in content:
30+
if not isinstance(item, dict):
31+
continue
32+
container_image = item.get("containerImage")
33+
tags = item.get("tags")
34+
repo = item.get("repository")
35+
is_duplicate = False
36+
for existing_row in existing:
37+
if not isinstance(existing_row, dict):
38+
continue
39+
if (
40+
existing_row.get("containerImage") == container_image
41+
and existing_row.get("tags") == tags
42+
and existing_row.get("repository") == repo
43+
):
44+
is_duplicate = True
45+
break
46+
if not is_duplicate:
47+
out.append(item)
48+
return out
49+
50+
51+
def _filter_rpm(content: list[Any], existing: list[Any]) -> list[Any]:
52+
"""Drop artifact rows whose `purl` exactly matches an existing row."""
53+
existing_purls = {
54+
existing_row.get("purl")
55+
for existing_row in existing
56+
if isinstance(existing_row, dict) and existing_row.get("purl") is not None
57+
}
58+
out: list[Any] = []
59+
for item in content:
60+
if not isinstance(item, dict):
61+
continue
62+
purl = item.get("purl")
63+
# Without `purl` there is nothing to compare; skip the row.
64+
if purl is None or purl not in existing_purls:
65+
out.append(item)
66+
return out
67+
68+
69+
def _filter_generic_binary(content: list[Any], existing: list[Any]) -> list[Any]:
70+
"""Drop rows whose `purl` matches existing after stripping `checksum`."""
71+
# Re-signing can change checksum query params; compare logical purls only.
72+
stripped_existing = {
73+
_strip_checksum_from_purl(str(existing_row["purl"]))
74+
for existing_row in existing
75+
if isinstance(existing_row, dict) and existing_row.get("purl") is not None
76+
}
77+
out: list[Any] = []
78+
for item in content:
79+
if not isinstance(item, dict) or item.get("purl") is None:
80+
continue
81+
if _strip_checksum_from_purl(str(item["purl"])) not in stripped_existing:
82+
out.append(item)
83+
return out
84+
85+
86+
def decode_advisory_param(advisory_b64gzip: str) -> dict[str, Any]:
87+
"""Decode `ADVISORY_JSON` (base64 + gzip) to a dict."""
88+
# Task param is a single string; pipeline supplies gzip then base64.
89+
b64_decoded = base64.standard_b64decode(advisory_b64gzip.strip())
90+
gzip_decoded = gzip.decompress(b64_decoded)
91+
return json.loads(gzip_decoded.decode("utf-8"))
92+
93+
94+
def content_array_from_decoded(root: dict[str, Any], content_list_path: str) -> list[Any]:
95+
"""
96+
Return the list at *content_list_path* under advisory *root*, or `[]` if
97+
missing or not a list.
98+
99+
*content_list_path* is a dotted path with an optional leading dot, for
100+
example `.content.images` or `.content.artifacts`.
101+
102+
Each segment is the next dict key under *root*; the value stepped through
103+
along the path must stay a dict until the final key, which must hold a list.
104+
"""
105+
# Decoded advisory JSON stores `content` at the top level (not under `spec`).
106+
segments = [s for s in content_list_path.strip(".").split(".") if s]
107+
current_value: Any = root
108+
for segment in segments:
109+
if not isinstance(current_value, dict):
110+
return []
111+
current_value = current_value.get(segment)
112+
if current_value is None:
113+
return []
114+
return current_value if isinstance(current_value, list) else []
115+
116+
117+
def set_decoded_content_array(
118+
root: dict[str, Any],
119+
content_list_path: str,
120+
content_rows: list[Any],
121+
) -> None:
122+
"""Set `root['content'][images|artifacts]` from *content_list_path*."""
123+
# Only `.content.images` and `.content.artifacts` are valid paths here.
124+
segments = [s for s in content_list_path.strip(".").split(".") if s]
125+
if len(segments) != 2 or segments[0] != "content":
126+
msg = f"unsupported content list path for merge: {content_list_path!r}"
127+
raise ValueError(msg)
128+
images_or_artifacts_key = segments[1]
129+
if "content" not in root or not isinstance(root["content"], dict):
130+
root["content"] = {}
131+
root["content"][images_or_artifacts_key] = content_rows
132+
133+
134+
def append_signing_key_to_content(
135+
root: dict[str, Any],
136+
content_list_path: str,
137+
signing_key: str,
138+
) -> None:
139+
"""Set `signingKey` on each content row that does not already have one."""
140+
# Mutates *root* in place.
141+
for item in content_array_from_decoded(root, content_list_path):
142+
if isinstance(item, dict) and not item.get("signingKey"):
143+
item["signingKey"] = signing_key
144+
145+
146+
def load_advisory_yaml(path: Path) -> dict[str, Any]:
147+
"""Load `advisory.yaml` (or any YAML document) as a `dict`."""
148+
yaml_source = path.read_text(encoding="utf-8")
149+
data = yaml.safe_load(yaml_source)
150+
if data is None:
151+
return {}
152+
if not isinstance(data, dict):
153+
msg = f"YAML root must be a mapping: {path}"
154+
raise TypeError(msg)
155+
return data
156+
157+
158+
def spec_content_array_from_advisory_yaml(
159+
doc: dict[str, Any],
160+
content_list_path: str,
161+
) -> list[Any]:
162+
"""
163+
Return the list at *content_list_path* under `doc['spec']` from an advisory
164+
YAML document (`metadata` / `spec` layout).
165+
166+
Use the same dotted path as for decoded JSON (e.g. `.content.images`); it
167+
is applied under `spec`, not at the document root.
168+
169+
Walking starts at `doc['spec']`; each segment is the next dict key, same
170+
rules as `content_array_from_decoded`.
171+
"""
172+
# Repo advisories nest the payload under `spec` (alongside `metadata`).
173+
segments = [s for s in content_list_path.strip(".").split(".") if s]
174+
current_value: Any = doc.get("spec")
175+
if not isinstance(current_value, dict):
176+
return []
177+
for segment in segments:
178+
if not isinstance(current_value, dict):
179+
return []
180+
current_value = current_value.get(segment)
181+
if current_value is None:
182+
return []
183+
return current_value if isinstance(current_value, list) else []
184+
185+
186+
def get_advisory_spec_type(doc: dict[str, Any]) -> str:
187+
"""Return `spec.type` from an advisory YAML document."""
188+
spec = doc.get("spec")
189+
if isinstance(spec, dict) and spec.get("type") is not None:
190+
return str(spec["type"])
191+
return ""
192+
193+
194+
def get_advisory_metadata_name(doc: dict[str, Any]) -> str:
195+
"""Return `metadata.name` from an advisory YAML document."""
196+
metadata = doc.get("metadata")
197+
if isinstance(metadata, dict) and metadata.get("name") is not None:
198+
return str(metadata["name"])
199+
return ""
200+
201+
202+
def template_data_for_apply(keyed_advisory: dict[str, Any]) -> dict[str, Any]:
203+
"""Build the `{"advisory": {"spec": ...}}` object for `apply_template`."""
204+
return {"advisory": {"spec": keyed_advisory}}
205+
206+
207+
def template_context_merge(
208+
tmpl_data: dict[str, Any],
209+
advisory_name: str,
210+
ship_date: str,
211+
) -> dict[str, Any]:
212+
"""
213+
Merge *advisory_name* and *ship_date* into *tmpl_data* for Jinja.
214+
215+
Duplicate top-level keys keep the value already in *tmpl_data* (later dict
216+
in the merge wins).
217+
"""
218+
# `{**a, **b}`: keys from *b* overwrite duplicates from *a*.
219+
return {
220+
**{"advisory_name": advisory_name, "advisory_ship_date": ship_date},
221+
**tmpl_data,
222+
}
223+
224+
225+
def json_dict_to_yaml_text(document: Any) -> str:
226+
"""Serialize *document* to multi-line YAML (readable advisory file)."""
227+
# `sort_keys=False` keeps stable-ish ordering for tag-preservation checks.
228+
return yaml.safe_dump(
229+
document,
230+
default_flow_style=False,
231+
sort_keys=False,
232+
allow_unicode=True,
233+
)
234+
235+
236+
def spec_content_json_pointer(content_type: str) -> str:
237+
"""Return the dotted *content_list_path* for *content_type* (under `spec` in YAML)."""
238+
if content_type == "image":
239+
return ".content.images"
240+
if content_type in ("binary", "generic", "rpm"):
241+
return ".content.artifacts"
242+
msg = f"Unsupported contentType: {content_type}"
243+
raise ValueError(msg)
244+
245+
246+
def advisory_url_prefix(git_repo: str) -> str:
247+
"""Return the customer portal errata URL for *git_repo*."""
248+
if "/rhtap-release/" in git_repo:
249+
return "https://access.stage.redhat.com/errata"
250+
return "https://access.redhat.com/errata"
251+
252+
253+
def list_existing_advisory_subdirs(advisory_base: Path) -> list[str]:
254+
"""List `year/num` paths under *advisory_base*, newest leaf mtime first."""
255+
if not advisory_base.is_dir():
256+
return []
257+
pairs: list[tuple[float, str]] = []
258+
for year_dir in advisory_base.iterdir():
259+
if not year_dir.is_dir():
260+
continue
261+
for num_dir in year_dir.iterdir():
262+
if not num_dir.is_dir():
263+
continue
264+
rel = f"{year_dir.name}/{num_dir.name}"
265+
# Sort by leaf dir mtime so `year/num` matches `find … -printf %T@`.
266+
pairs.append((num_dir.stat().st_mtime, rel))
267+
pairs.sort(key=lambda mtime_and_relpath: -mtime_and_relpath[0])
268+
return [relpath for _mtime, relpath in pairs]
269+
270+
271+
def filter_content_by_existing(
272+
content_type: str,
273+
content_file: Path,
274+
existing_file: Path,
275+
*,
276+
stderr_path: Path | None,
277+
) -> str:
278+
"""
279+
Return compact JSON array text: *content_file* rows not already in
280+
*existing_file* (idempotency rules for image / rpm / generic / binary).
281+
"""
282+
try:
283+
content_rows = json.loads(content_file.read_text(encoding="utf-8"))
284+
existing_rows = json.loads(existing_file.read_text(encoding="utf-8"))
285+
except json.JSONDecodeError as exc:
286+
if stderr_path is not None:
287+
with open(
288+
stderr_path,
289+
"a",
290+
encoding="utf-8",
291+
errors="replace",
292+
) as errf:
293+
errf.write(f"\nfilter_content_by_existing: invalid JSON: {exc}\n")
294+
raise
295+
if not isinstance(content_rows, list) or not isinstance(existing_rows, list):
296+
msg = "content and existing JSON must be arrays"
297+
raise TypeError(msg)
298+
299+
if content_type in ("generic", "binary"):
300+
filtered = _filter_generic_binary(content_rows, existing_rows)
301+
elif content_type == "rpm":
302+
filtered = _filter_rpm(content_rows, existing_rows)
303+
else:
304+
filtered = _filter_image(content_rows, existing_rows)
305+
306+
# Compact JSON (no extra spaces) for small temp files in the idempotency loop.
307+
return json.dumps(filtered, separators=(",", ":"))

scripts/python/helpers/http_client.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,36 @@
2323
BASE_SLEEP_TIME_SECONDS = 1
2424

2525

26-
def _retries() -> Retry:
27-
"""Transients similar to common ``curl --retry 3`` (connection + some HTTP)."""
26+
def _retries(*, allowed_methods: frozenset[str]) -> Retry:
27+
"""Transients similar to common `curl --retry 3` (connection + some HTTP)."""
2828
return Retry(
2929
total=3,
3030
connect=3,
3131
read=3,
3232
status=2,
3333
backoff_factor=0.4,
3434
status_forcelist=(500, 502, 503, 504),
35-
allowed_methods=frozenset({"GET"}),
35+
allowed_methods=allowed_methods,
3636
raise_on_status=False,
3737
)
3838

3939

40+
def post_session() -> requests.Session:
41+
"""`requests.Session` with retries on transient connection/HTTP errors for POST."""
42+
session = requests.Session()
43+
adapter = HTTPAdapter(max_retries=_retries(allowed_methods=frozenset({"POST"})))
44+
session.mount("https://", adapter)
45+
session.mount("http://", adapter)
46+
return session
47+
48+
4049
def get_session() -> requests.Session:
4150
"""
4251
Build a ``requests.Session`` with the shared retry policy on http and https.
4352
``get_text`` uses this. Tests can patch this function to supply a custom session.
4453
"""
4554
s = requests.Session()
46-
a = HTTPAdapter(max_retries=_retries())
55+
a = HTTPAdapter(max_retries=_retries(allowed_methods=frozenset({"GET"})))
4756
s.mount("https://", a)
4857
s.mount("http://", a)
4958
return s

0 commit comments

Comments
 (0)