|
| 1 | +from itertools import chain, takewhile |
| 2 | +from typing import Iterable, List |
| 3 | + |
| 4 | +import requests |
| 5 | +from semantic_version import Version |
| 6 | + |
| 7 | +from nextcloudappstore.core.models import NextcloudRelease |
| 8 | + |
| 9 | + |
| 10 | +class GitHubClient: |
| 11 | + def __init__(self, base_url: str, api_token: str = None) -> None: |
| 12 | + self.base_url = base_url.rstrip('/') |
| 13 | + self.api_token = api_token |
| 14 | + self.headers = None if self.api_token else { |
| 15 | + 'Authorization': 'token %s' % self.api_token |
| 16 | + } |
| 17 | + |
| 18 | + def get_tags(self, page: int, size: int = 100): |
| 19 | + url = '%s/repos/nextcloud/server/tags' % self.base_url |
| 20 | + params = (('per_page', size), ('page', page)) |
| 21 | + response = requests.get(url, params=params, headers=self.headers) |
| 22 | + response.raise_for_status() |
| 23 | + return response.json() |
| 24 | + |
| 25 | + |
| 26 | +def sync_releases(versions: Iterable[str]) -> None: |
| 27 | + """ |
| 28 | + All given versions have a release. If a release is absent, persisted |
| 29 | + releases are out of date and need to have their release flag removed. |
| 30 | + Finally the latest version must be marked as current. |
| 31 | + :param versions: an iterable yielding all retrieved GitHub tags |
| 32 | + :return: |
| 33 | + """ |
| 34 | + current_releases = NextcloudRelease.objects.all() |
| 35 | + imported_releases = [ |
| 36 | + NextcloudRelease.objects.get_or_create(version=version)[0] |
| 37 | + for version in versions |
| 38 | + ] |
| 39 | + if imported_releases: |
| 40 | + # all imported releases have a release, existing ones don't |
| 41 | + for release in imported_releases: |
| 42 | + release.has_release = True |
| 43 | + release.save() |
| 44 | + for release in get_old_releases(current_releases, imported_releases): |
| 45 | + release.has_release = False |
| 46 | + release.save() |
| 47 | + # set latest release |
| 48 | + NextcloudRelease.objects.update(is_current=False) |
| 49 | + latest = max(imported_releases, key=lambda v: Version(v.version)) |
| 50 | + latest.is_current = True |
| 51 | + latest.save() |
| 52 | + |
| 53 | + |
| 54 | +NextcloudReleases = List[NextcloudRelease] |
| 55 | + |
| 56 | + |
| 57 | +def get_old_releases(current: NextcloudReleases, |
| 58 | + imported: NextcloudReleases) -> NextcloudReleases: |
| 59 | + imported_versions = {release.version for release in imported} |
| 60 | + return [release for release in current |
| 61 | + if release.version not in imported_versions] |
| 62 | + |
| 63 | + |
| 64 | +def get_supported_releases(client: GitHubClient, |
| 65 | + oldest_supported: str) -> Iterable[str]: |
| 66 | + releases = get_stable_releases(client) |
| 67 | + return takewhile(lambda v: is_supported(v, oldest_supported), releases) |
| 68 | + |
| 69 | + |
| 70 | +def get_stable_releases(client: GitHubClient) -> Iterable[str]: |
| 71 | + json = chain.from_iterable(TagPages(client)) |
| 72 | + return (tag for tag in (release['name'].lstrip('v') |
| 73 | + for release in json |
| 74 | + if 'name' in release) |
| 75 | + if is_stable(tag)) |
| 76 | + |
| 77 | + |
| 78 | +def is_supported(oldest_supported: str, version: str) -> bool: |
| 79 | + return Version(oldest_supported) >= Version(version) |
| 80 | + |
| 81 | + |
| 82 | +def is_stable(release: str) -> bool: |
| 83 | + try: |
| 84 | + version = Version(release) |
| 85 | + return not version.prerelease |
| 86 | + except ValueError: |
| 87 | + return False |
| 88 | + |
| 89 | + |
| 90 | +class TagPages: |
| 91 | + """ |
| 92 | + The GitHub API is paginated which makes it a pain to fetch all results. |
| 93 | + This iterable returns a stream of json arrays until no further results |
| 94 | + are found. To iterate over all releases you need to flatten the results |
| 95 | + returned from this iterator first |
| 96 | + """ |
| 97 | + |
| 98 | + def __init__(self, client: GitHubClient, size: int = 100) -> None: |
| 99 | + self.client = client |
| 100 | + self.size = size |
| 101 | + self.page = 1 # pages are 1 indexed |
| 102 | + |
| 103 | + def __iter__(self) -> 'TagPages': |
| 104 | + return self |
| 105 | + |
| 106 | + def __next__(self): |
| 107 | + json = self.client.get_tags(self.page, self.size) |
| 108 | + if len(json) > 0: |
| 109 | + self.page += 1 |
| 110 | + return json |
| 111 | + else: |
| 112 | + raise StopIteration |
0 commit comments