1+ from __future__ import annotations
2+ from collections import defaultdict
13from datetime import datetime , timezone
24from enum import StrEnum
3- from datetime import datetime
45from functools import cache
56import logging
67import os
7- from typing import overload
8+ import re
9+ from typing import DefaultDict , overload
810from typing_extensions import Literal
911
1012from bs4 import BeautifulSoup , Tag # type: ignore
@@ -29,11 +31,12 @@ def ET_verify() -> bool | str:
2931 return True
3032
3133
32- def ET_api_get (path : str ):
34+ def ET_api_get (path : str , * , params : dict | None = None ):
3335 response = requests_session ().get (
3436 f"{ ET_URL } /api/v1/{ path } " ,
3537 auth = HTTPSPNEGOAuth (opportunistic_auth = True ),
3638 verify = ET_verify (),
39+ params = params ,
3740 )
3841 response .raise_for_status ()
3942 return response .json ()
@@ -60,6 +63,12 @@ def ET_get_html(path: str):
6063 return response .text
6164
6265
66+ def get_utc_timestamp_from_str (timestamp_string : str ):
67+ return datetime .strptime (timestamp_string , "%Y-%m-%dT%H:%M:%SZ" ).replace (
68+ tzinfo = timezone .utc
69+ )
70+
71+
6372@overload
6473def get_erratum (erratum_id : str | int , full : Literal [False ] = False ) -> Erratum : ...
6574
@@ -84,9 +93,14 @@ def get_erratum(erratum_id: str | int, full: bool = False) -> Erratum | FullErra
8493
8594 jira_issues = [i ["jira_issue" ]["key" ] for i in data ["jira_issues" ]["jira_issues" ]]
8695
87- last_status_transition_timestamp = datetime .strptime (
88- details ["status_updated_at" ], "%Y-%m-%dT%H:%M:%SZ"
89- ).replace (tzinfo = timezone .utc )
96+ last_status_transition_timestamp = get_utc_timestamp_from_str (
97+ details ["status_updated_at" ]
98+ )
99+ publish_date = (
100+ get_utc_timestamp_from_str (details ["publish_date" ])
101+ if details ["publish_date" ] is not None
102+ else None
103+ )
90104
91105 base_erratum = Erratum (
92106 id = details ["id" ],
@@ -95,6 +109,8 @@ def get_erratum(erratum_id: str | int, full: bool = False) -> Erratum | FullErra
95109 synopsis = details ["synopsis" ],
96110 status = ErrataStatus (details ["status" ]),
97111 jira_issues = jira_issues ,
112+ release_id = details ["group_id" ],
113+ publish_date = publish_date ,
98114 last_status_transition_timestamp = last_status_transition_timestamp ,
99115 )
100116
@@ -140,6 +156,205 @@ def get_erratum_for_link(link: str, full: bool = False) -> Erratum | FullErratum
140156 return get_erratum (erratum_id , full = full )
141157
142158
159+ class RHELVersion (BaseModel ):
160+ major : int
161+ minor : int
162+ micro : int | None
163+ stream : str
164+
165+ def __str__ (self ):
166+ if self .micro is not None :
167+ return f"RHEL-{ self .major } .{ self .minor } .{ self .micro } .{ self .stream } "
168+
169+ return f"RHEL-{ self .major } .{ self .minor } .{ self .stream } "
170+
171+ @property
172+ def parent (self ) -> RHELVersion | None :
173+ """The release that the release inherits builds from."""
174+
175+ if self .stream != "GA" :
176+ return RHELVersion (
177+ major = self .major ,
178+ minor = self .minor ,
179+ micro = self .micro ,
180+ stream = "GA" ,
181+ )
182+
183+ if self .minor > 0 :
184+ one_minor_version_up = self .minor - 1
185+ match self .major :
186+ case 10 :
187+ return RHELVersion (
188+ major = self .major ,
189+ minor = one_minor_version_up ,
190+ micro = self .micro ,
191+ stream = "Z" ,
192+ )
193+ case 9 | 8 :
194+ if one_minor_version_up % 2 == 1 :
195+ return RHELVersion (
196+ major = self .major ,
197+ minor = one_minor_version_up ,
198+ micro = self .micro ,
199+ stream = "Z.MAIN" ,
200+ )
201+ else :
202+ return RHELVersion (
203+ major = self .major ,
204+ minor = one_minor_version_up ,
205+ micro = self .micro ,
206+ stream = "Z.MAIN+EUS" ,
207+ )
208+
209+ return None
210+
211+ @staticmethod
212+ def from_str (version_string : str ) -> RHELVersion | None :
213+ version_string = version_string .strip ()
214+ version_string = version_string .upper ()
215+ pattern = r"RHEL-(\d+)\.(\d+)(?:\.(\d+))?\.([^\d].*)$"
216+ match = re .match (pattern , version_string )
217+ if match is not None :
218+ version = RHELVersion (
219+ major = int (match .group (1 )),
220+ minor = int (match .group (2 )),
221+ micro = int (match .group (3 )) if match .group (3 ) else None ,
222+ stream = match .group (4 ),
223+ )
224+
225+ assert version_string == str (version )
226+
227+ return version
228+
229+
230+ class RHELRelease (BaseModel ):
231+ version : str
232+ # None means already shipped
233+ ship_date : datetime | None
234+
235+ @property
236+ def shipped (self ):
237+ return self .ship_date is None or self .ship_date < datetime .now (tz = timezone .utc )
238+
239+
240+ def get_RHEL_release (param : int | str ):
241+ response = (
242+ ET_api_get ("releases" , params = {"filter[id]" : param })
243+ if isinstance (param , int )
244+ else ET_api_get ("releases" , params = {"filter[name]" : param })
245+ )
246+ release_data = response ["data" ][0 ]
247+
248+ ship_date_string = release_data ["attributes" ]["ship_date" ]
249+ ship_date = (
250+ get_utc_timestamp_from_str (ship_date_string )
251+ if ship_date_string is not None
252+ else None
253+ )
254+
255+ return RHELRelease (
256+ version = release_data ["attributes" ]["name" ],
257+ ship_date = ship_date ,
258+ )
259+
260+
261+ def _get_rel_prep_lookup (package_name : str ) -> DefaultDict [str , list [Erratum ]]:
262+ """Builds a lookup of REL_PREP errata for a package, keyed by RHEL release version.
263+
264+ This function queries an API for all errata associated with a given package,
265+ filters for those in the "REL_PREP" (Release Preparation) state, and organizes
266+ them into a dictionary where each key is a RHEL version string.
267+
268+ Args:
269+ package_name: The name of the package to look up.
270+
271+ Returns:
272+ A defaultdict where keys are RHEL release version strings and values are
273+ lists of associated Erratum objects in the REL_PREP state.
274+ """
275+ rel_prep_lookup : DefaultDict [str , list [Erratum ]] = defaultdict (list )
276+ related_errata = ET_api_get (f"packages/{ package_name } " )["data" ]["relationships" ][
277+ "errata"
278+ ]
279+ assert isinstance (related_errata , list )
280+ for erratum_info in related_errata :
281+ if erratum_info ["status" ] != ErrataStatus .REL_PREP :
282+ continue
283+
284+ id = erratum_info ["id" ]
285+ cur_erratum = get_erratum (id )
286+ cur_release = get_RHEL_release (cur_erratum .release_id )
287+
288+ rel_prep_lookup [cur_release .version ].append (cur_erratum )
289+
290+ return rel_prep_lookup
291+
292+
293+ def get_previous_erratum (current_erratum_id : str | int , package_name : str ):
294+ """Finds the previous erratum for a given package, starting from a specific erratum.
295+
296+ RHEL releases inherit packages from previous releases, but only until they are shipped.
297+ This function searches backwards through RHEL release versions starting from the one
298+ associated with current_erratum_id looking for applicable REL_PREP errata, or for
299+ a shipped release. If we find a shipped release, we're done - no errata will be inherited
300+ from previous releases, so if we haven't found a REL_PREP errata first, we use the
301+ errata associated with the official released build for the package in that release
302+ and stop.
303+
304+ Args:
305+ current_erratum_id: The ID of the erratum to start the search from.
306+ package_name: The name of the package for which to find the previous erratum.
307+
308+ Returns:
309+ The previous Erratum object, or None if one cannot be found.
310+ """
311+ erratum = get_erratum (current_erratum_id )
312+
313+ target_release = get_RHEL_release (erratum .release_id )
314+ target_version = RHELVersion .from_str (target_release .version )
315+ if target_version is None :
316+ logger .info (f"Unknown RHEL release format: { target_release .version } " )
317+ return None
318+
319+ def is_previous_erratum_applicable (erratum_version : str , erratum : Erratum ):
320+ if erratum_version == target_version :
321+ return True
322+ elif target_release .shipped :
323+ return False
324+
325+ assert target_release .ship_date is not None
326+ return (
327+ erratum .publish_date is not None
328+ and erratum .publish_date <= target_release .ship_date
329+ )
330+
331+ rel_prep_lookup = _get_rel_prep_lookup (package_name )
332+ cur_version = target_version
333+ while cur_version :
334+ rel_prep_errata = rel_prep_lookup [str (cur_version )]
335+ rel_prep = [
336+ e
337+ for e in rel_prep_errata
338+ if is_previous_erratum_applicable (str (cur_version ), e )
339+ ]
340+
341+ if rel_prep :
342+ latest_erratum = max (
343+ rel_prep ,
344+ key = lambda e : e .publish_date if e .publish_date else datetime .min ,
345+ )
346+ return latest_erratum
347+
348+ release = get_RHEL_release (str (cur_version ))
349+ if release .shipped :
350+ released_build = ET_api_get (
351+ f"product_versions/{ release .version } /released_builds/{ package_name } "
352+ )
353+ return get_erratum (released_build ["errata_id" ])
354+
355+ cur_version = cur_version .parent
356+
357+
143358class RuleParseError (Exception ):
144359 pass
145360
0 commit comments