Skip to content

Commit

Permalink
Merge pull request #103 from sw360/martin/fix-github-tag-matching
Browse files Browse the repository at this point in the history
GitHub tag matching
  • Loading branch information
tngraf authored Jan 29, 2025
2 parents 9924eaf + d05917c commit 7508706
Show file tree
Hide file tree
Showing 6 changed files with 1,074 additions and 165 deletions.
293 changes: 230 additions & 63 deletions capycli/bom/findsources.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import re
import sys
import time
from typing import Any, Dict, List, Tuple
from collections.abc import Iterable
from typing import Any, Dict, List, Set, Tuple
from urllib.parse import parse_qs, urlparse

import requests
import semver
Expand All @@ -36,13 +38,59 @@
class FindSources(capycli.common.script_base.ScriptBase):
"""Go through the list of SBOM items and try to determine the source code."""

class TagCache:
"""A key task performed in this module is fetching tags from GitHub
and match tags to (component) versions. This task includes many
calls to the GitHub API, which we seek to limit by implementing
an internal cache and a logic to guess tags, instead of
performing exhaustive searches.
"""
def __init__(self) -> None:
self.data: Dict[Tuple[str, str], Set[str]] = {}

def __getitem__(self, key: Any) -> Set[str]:
"""Get the set of all cached tags for a key."""
return self.data[self._validate_key(key)]

def _validate_key(self, key: Tuple[str, str]) -> Tuple[str, str]:
"""Ensure our keys are hashable."""
if len(key) != 2 or key != (str(key[0]), str(key[1])):
raise KeyError(f'{self.__class__.__name__} key must consist of'
'a project name and a version string')
return key

def add(self, project: str, version: str, tag: str) -> None:
"""Cache a tag for a specific project and version."""
key = self._validate_key((project, version))
tags = self.data.setdefault(key, set())
tags.add(tag)

def filter(self, project: str, version: str, data: Any) -> List[str]:
"""Remove all cached entries from @data."""
if isinstance(data, str):
data = [data]
elif not isinstance(data, Iterable):
raise ValueError('Expecting an iterable of tags!')
key = self._validate_key((project, version))
return [item for item in data
if item not in self.data.get(key, [])
and len(item) > 0]

def filter_and_cache(self, project: str, version: str, data: Any) -> List[str]:
"""Convenience method to to filtering and adding in one run."""
candidates = set(self.filter(project, version, data))
for tag in candidates:
self.add(project, version, tag)
return list(candidates)

def __init__(self) -> None:
self.verbose: bool = False
self.version_regex = re.compile(r"[\d+\.|_]+[\d+]")
self.version_regex = re.compile(r"(\d+[._])+\d+")
self.github_project_name_regex = re.compile(r"^[a-zA-Z0-9-]+(/[a-zA-Z0-9-]+)*$")
self.github_name: str = ""
self.github_token: str = ""
self.sw360_url: str = os.environ.get("SW360ServerUrl", "")
self.tag_cache = self.TagCache()

def is_sourcefile_accessible(self, sourcefile_url: str) -> bool:
"""Check if the URL is accessible."""
Expand Down Expand Up @@ -70,34 +118,61 @@ def is_sourcefile_accessible(self, sourcefile_url: str) -> bool:
return False

@staticmethod
def github_request(url: str, username: str = "", token: str = "") -> Any:
def github_request(url: str, username: str = "", token: str = "",
return_response: bool = False,
allow_redirects: bool = True, # default in requests
) -> Any:
try:
headers = {}
if token:
headers["Authorization"] = "token " + token
if username:
headers["Username"] = username
response = requests.get(url, headers=headers)
if not response.ok:
if response.status_code == 429 or \
'rate limit exceeded' in response.reason or \
"API rate limit exceeded" in response.json().get("message"):
print(
Fore.LIGHTYELLOW_EX +
" Github API rate limit exceeded - wait 60s and retry ... " +
Style.RESET_ALL)
time.sleep(60)
return FindSources.github_request(url, username, token)

return response.json()
response = requests.get(url, headers=headers,
allow_redirects=allow_redirects)
if response.status_code == 429 \
or 'rate limit exceeded' in response.reason \
or 'API rate limit exceeded' in response.json().get('message', ''):
print(
Fore.LIGHTYELLOW_EX +
" Github API rate limit exceeded - wait 60s and retry ... " +
Style.RESET_ALL)
time.sleep(60)
return FindSources.github_request(url, username, token, return_response=return_response)
if response.json().get('message', '').startswith("Bad credentials"):
print_red("Invalid GitHub credential provided - aborting!")
sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SERVICE)

except AttributeError as err:
# response.json() did not return a dictionary
if hasattr(err, 'name'):
name = err.name
else: # Python prior to 3.10
name = err.args[0].split("'")[3]
if not name == 'get':
raise

except requests.exceptions.JSONDecodeError:
response._content = b'{}'

except requests.exceptions.ConnectionError as ex:
print(
Fore.LIGHTYELLOW_EX +
f" Connection issues accessing {url} " + repr(ex) +
"\n Retrying in 60 seconds!" +
Style.RESET_ALL)
time.sleep(60)
return FindSources.github_request(url, username, token, return_response=return_response)

except Exception as ex:
print(
Fore.LIGHTYELLOW_EX +
" Error accessing GitHub: " + repr(ex) +
Style.RESET_ALL)

return {}
response = requests.Response()
response._content = \
b'{' + f'"exception": "{repr(ex)}"'.encode() + b'}'
return response if return_response else response.json()

@staticmethod
def get_repositories(name: str, language: str, username: str = "", token: str = "") -> Any:
Expand Down Expand Up @@ -135,27 +210,133 @@ def get_repo_name(github_url: str) -> str:
@staticmethod
def get_github_info(repository_url: str, username: str = "",
token: str = "") -> get_github_info_type:
"""This method used to iterate through all resource pages of
GitHub's /tags API, collect the results, then return a huge
list with all results.
Removed because this approach does not scale well and we did
encounter projects with tens of thousands of tags.
"""
Query tag infos from GitHub.
In the good case a list of tags entries (= dictionaries) is returned.
In the bad case a JSON error message is returned.
raise NotImplementedError(
"Removed with introduction of get_matchting_source_tag!")

def _get_github_repo(self, github_ref: str) -> Dict[str, Any]:
"""Fetch GitHub API object identified by @github_ref.
@github_ref can be a simple "<owner>/<repo>" string or any
from the plethora of links that refer to a
project on GitHub.
By using urlparse() we save ourselves a little bit of work
with trailing queries and fragments, but any @github_ref with
colons, where the first colon is not part of '://' will not
yield viable results,
e.g. 'api.github.com:443/repos/sw360/capycli'.
"""
url = 'api.github.com/repos/'
gh_ref = urlparse(github_ref, scheme='no_scheme')
if gh_ref.scheme == 'no_scheme': # interpret @github_ref as OWNER/REPO
url += gh_ref.path
elif not gh_ref.netloc.endswith('github.com'):
raise ValueError(f'{github_ref} is not an expected @github_ref!')
elif gh_ref.path.startswith('/repos'):
url += gh_ref.path[6:]
else:
url += gh_ref.path
if url.endswith('.git'):
url = url[0:-4]
url = 'https://' + url.replace('//', '/')
repo = {}
while 'tags_url' not in repo and 'github.com' in url:
repo = self.github_request(url, self.github_name, self.github_token)
url = url.rsplit('/', 1)[0] # remove last path segment
if 'tags_url' not in repo:
raise ValueError(f"Unable to make @github_ref {github_ref} work!")
return repo

def _get_link_page(self, res: requests.Response, which: str = 'next') -> int:
"""Fetch only page number from link-header."""
try:
url = urlparse(res.links[which]['url'])
return int(parse_qs(url.query)['page'][0])
except KeyError: # GitHub gave us only one results page
return 1

def get_matching_source_url(self, version: Any, github_ref: str,
version_prefix: Any = None
) -> str:
"""Find a URL to download source code from GitHub. We are
looking for the source code in @github_ref at @version.
We expect to match @version to an existing tag in the repo
identified by @github_ref. We want to have the source
code download URL of that existing tag!
In order to perform this matching, we must retrieve the tags
from GitHub and then analyse them. First, we use
get_matching_tag(). If that doesn't yield a positive result,
we try to infer a tag for @version, to prevent an exhaustive
search over all tags.
"""
length_per_page = 100
page = 1
tags: List[Dict[str, Any]] = []
tag_url = "https://api.github.com/repos/" + repository_url + "/tags"
query = "?per_page=%s&page=%s" % (length_per_page, page)
tmp = FindSources.github_request(tag_url + query, username, token)
if not isinstance(tmp, list):
return tags
tags.extend(tmp)
while len(tmp) == length_per_page:
page += 1
query = "?per_page=%s&page=%s" % (length_per_page, page)
tmp = FindSources.github_request(tag_url + query, username, token)
tags.extend(tmp)
return tags
try:
repo = self._get_github_repo(github_ref)
except ValueError as err:
print_yellow(" " + str(err))
return ""

tags_url = repo['tags_url'] + '?per_page=100'
git_refs_url_tpl = repo['git_refs_url'].replace('{/sha}', '{sha}', 1)

res = self.github_request(tags_url, self.github_name,
self.github_token, return_response=True)
pages = self._get_link_page(res, 'last')
for _ in range(pages): # we prefer this over "while True"
# note: in res.json() we already have the first results page
try:
tags = [tag for tag in res.json()
if version_prefix is None
or tag['name'].startswith(version_prefix)]
source_url = self.get_matching_tag(tags, version, tags_url)
if len(source_url) > 0: # we found what we believe is
return source_url # the correct source_url

except (TypeError, KeyError, AttributeError):
# res.json() did not give us an iterable of things where
# 'name' is a viable index, for instance an error message
tags = []

new_prefixes = self.tag_cache.filter_and_cache(
repo['full_name'], version, # cache key
[self.version_regex.split(tag['name'], 1)[0]
for tag in tags
if self.version_regex.search(tag['name']) is not None])

for prefix in new_prefixes:
url = git_refs_url_tpl.format(sha=f'/tags/{prefix}')
w_prefix = self.github_request(url, self.github_name,
self.github_token)
if isinstance(w_prefix, dict): # exact match
w_prefix = [w_prefix]

# ORDER BY tag-name-length DESC
by_size = sorted([(len(tag['ref']), tag) for tag in w_prefix],
key=lambda x: x[0])
w_prefix = [itm[1] for itm in reversed(by_size)]

transformed_for_get_matching_tags = [
{'name': tag['ref'].replace('refs/tags/', '', 1),
'zipball_url': tag['url'].replace(
'/git/refs/tags/', '/zipball/refs/tags/', 1),
} for tag in w_prefix]
source_url = self.get_matching_tag(
transformed_for_get_matching_tags, version, tags_url)
if len(source_url) > 0: # we found what we believe is
return source_url # the correct source_url
try:
url = res.links['next']['url']
res = self.github_request(url, self.github_name,
self.github_token, return_response=True)
except KeyError: # no more result pages
break
print_yellow(" No matching tag for version " + version + " found")
return ""

def to_semver_string(self, version: str) -> str:
"""Bring all version information to a format we can compare."""
Expand Down Expand Up @@ -193,8 +374,7 @@ def find_github_url(self, component: Component, use_language: bool = True) -> st
name_match = [r for r in repositories.get("items") if component_name in r.get("name", "")]
if len(name_match):
for match in name_match:
tag_info = self.github_request(match["tags_url"], self.github_name, self.github_token)
source_url = self.get_matching_tag(tag_info, component.version or "", match["html_url"])
source_url = self.get_matching_source_url(component.version, match["tags_url"])
if len(name_match) == 1:
return source_url
elif source_url:
Expand Down Expand Up @@ -261,10 +441,7 @@ def find_golang_url(self, component: Component) -> str:

if repository_name.startswith("https://github.com/"):
repository_name = repository_name[len("https://github.com/"):]
tag_info = self.get_github_info(repository_name, self.github_name, self.github_token)
tag_info_checked = self.check_for_github_error(tag_info)
source_url = self.get_matching_tag(tag_info_checked, component_version,
repository_name, version_prefix or "")
source_url = self.get_matching_source_url(component_version, repository_name, version_prefix)

# component["RepositoryUrl"] = repository_name
return source_url
Expand All @@ -284,26 +461,15 @@ def get_github_source_url(self, github_url: str, version: str) -> str:

if self.verbose:
print_text(" repo_name:", repo_name)

tag_info = self.get_github_info(repo_name, self.github_name, self.github_token)
tag_info_checked = self.check_for_github_error(tag_info)
return self.get_matching_tag(tag_info_checked, version, github_url)
return self.get_matching_source_url(version, repo_name)

def check_for_github_error(self, tag_info: get_github_info_type) -> List[Dict[str, Any]]:
if isinstance(tag_info, list):
# assume valid answer
return tag_info

# check for 'rate limit exceeded' message
if "message" in tag_info:
if tag_info["message"].startswith("API rate limit exceeded"):
print_red("GitHub API rate limit exceeded - aborting!")
sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SERVICE)
if tag_info["message"].startswith("Bad credentials"):
print_red("Invalid GitHub credential provided - aborting!")
sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SERVICE)

return []
"""This method was introduced to check the output of
get_github_info() for errors.
Removed, because get_github_info was removed.
"""
raise NotImplementedError(
"Removed with introduction of get_matchting_source_tag!")

def get_matching_tag(self, tag_info: List[Dict[str, Any]], version: str, github_url: str,
version_prefix: str = "") -> str:
Expand Down Expand Up @@ -369,7 +535,7 @@ def get_source_url_from_release(self, release_id: str) -> str:
if release_details:
source_url = release_details.get("sourceCodeDownloadurl", "")
if self.verbose:
print("getting source url from get from sw360 for release_id " + release_id)
print(" getting source url from get from sw360 for release_id " + release_id)
if source_url != "":
return source_url
break
Expand Down Expand Up @@ -468,7 +634,8 @@ def find_source_url_recursive_by_sw360(self, component: Component) -> str:

@staticmethod
def find_source_url_by_language(component: Component) -> str:
capycli.dependencies.javascript.GetJavascriptDependencies().try_find_component_metadata(component, "")
if hasattr(capycli, 'dependencies'):
capycli.dependencies.javascript.GetJavascriptDependencies().try_find_component_metadata(component, "")
url = CycloneDxSupport.get_ext_ref_source_url(component)
if isinstance(url, XsUri):
return url._uri
Expand Down
Loading

0 comments on commit 7508706

Please sign in to comment.