Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 62 additions & 27 deletions tailor_distro/pull_distro_repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import glob
import shutil
import github
import json

from dataclasses import dataclass
from requests.exceptions import HTTPError
from catkin_pkg.package import parse_package
from concurrent.futures import ThreadPoolExecutor, as_completed
Expand All @@ -28,6 +30,13 @@
DOWNLOAD_TIMEOUT_SEC = 30
CHUNK_SIZE = 1024 * 1024

@dataclass
class RepoInformation:
owner: str
name: str
exists: bool
sha: str
tarball: str

def get_name_and_owner(repo_url: str) -> Tuple[Optional[str], str]:
"""
Expand Down Expand Up @@ -75,20 +84,19 @@ def graphql_with_retry(requester, query, max_attempts=DOWNLOAD_RETRIES, delay=RE

def retrieve_tarballs(
repos_url: List[str], refs: List[str], github_client, chunk: int = 100
) -> Dict[str, Dict[str, str | bool]]:
) -> List[RepoInformation]:
"""
Retrieve the tarball for a list of repositories using the GraphQL API of Github. If the ref_branch exists,
the tarball corresponding to that reference is returned. Otherwise, exists bool is set to False.
:param repos_url: list of repository URLs
:param refs: default versions to retrieve
:param github_client: Github client
:chunk: limit of the number of repositories that can be processed to avoid running into rate limit issues
:returns: a dictionary such as d[repo_name] ={exists: True/False, sha: str, tarball: str}
:returns: a list of RepoInformation objects containing all relevant data
"""
names_and_owners = [get_name_and_owner(url) for url in repos_url]
requester = github_client._Github__requester

out: Dict[str, Dict[str, str | bool]] = {}
out: List[RepoInformation] = []
for start in range(0, len(names_and_owners), chunk):
slice_ = names_and_owners[start:start + chunk]
slice_refs = refs[start:start + chunk]
Expand Down Expand Up @@ -121,7 +129,7 @@ def retrieve_tarballs(
else:
sha = v["target"]["oid"]
tarball = v["target"]["tarballUrl"]
click.echo(f"Obtained tarball URL for {repo_name}... (ref: {ref})")
click.echo(f"Obtained tarball URL for {repo_name}... (ref: {ref}, sha: {sha})")
exists = True
else:
exists = False
Expand All @@ -133,21 +141,26 @@ def retrieve_tarballs(
fg="red",
)
)
out[repo_name] = {
"exists": exists,
"sha": sha,
"tarball": tarball,
}
out.append(
RepoInformation(
owner=repo_owner,
name=repo_name,
exists=exists,
sha=sha,
tarball=tarball,
)
)
return out


def process_repo(repo: str, tarball_url: str, base_dir: pathlib.Path) -> None:
def process_repo(repo: str, tarball_url: str, target_dir: pathlib.Path) -> pathlib.Path:
"""Download and unpack a single repository using its tarball URL
:param repo: Name of the repository
:param tarball_url: Tarball URL
:param base_dir: Directory where to unpack the repositoriess
:param target_dir: Directory where to unpack the repositoriess
:returns: the relative path where the repository has been extracted
"""
repo_dir = base_dir / repo
repo_dir = target_dir / repo
repo_dir.mkdir(parents=True, exist_ok=True)
retries = DOWNLOAD_RETRIES
while True:
Expand Down Expand Up @@ -179,40 +192,62 @@ def process_repo(repo: str, tarball_url: str, base_dir: pathlib.Path) -> None:
with tarfile.open(archive_path) as tar:
tar.extractall(path=repo_dir)

# Return the path of the extracted repository
dirs = [p for p in repo_dir.iterdir() if p.is_dir()]
if len(dirs) == 1:
return dirs[0]
return repo_dir


def append_jsonl(log_path: pathlib.Path, repo_info: RepoInformation, repo_path: pathlib.Path) -> None:
"""Append repository information to json log file
:param log_path: path of the log file
:param repo_info: RepoInformation object containing all relevant data
:param repo_path: Path where the repository has been extracted to
"""

log_path.parent.mkdir(parents=True, exist_ok=True)
repo_log = {
"owner": repo_info.owner,
"repo": repo_info.name,
"sha": repo_info.sha,
"path": str(repo_path),
}
line = json.dumps(repo_log, ensure_ascii=False) + "\n"
with open(log_path, "a", encoding="utf-8") as f:
f.write(line)


def pull_repositories(
repo_data: Dict[str, Dict[str, str | bool]], base_dir: pathlib.Path
repo_data: List[RepoInformation], base_dir: pathlib.Path
) -> None:
"""Download and unpack a list of repository tarballs
:param repo_data: List of repository names and their tarball URLs
:param repo_data: List of RepoInformation class
:param base_dir: Directory where to unpack the repositories
"""
click.echo("Download and unpack repositories...", err=False)
base_dir.mkdir(parents=True, exist_ok=True)

# Check if the tarballUrl exists
repo_tarballs: Dict[str, str] = {}
for repo_name, data in repo_data.items():
if data["exists"]:
repo_tarballs[repo_name] = data["tarball"]
logfile_path = base_dir / "repositories_data.jsonl"

with ThreadPoolExecutor(max_workers=PULL_WORKERS) as pool:
futures = {
pool.submit(process_repo, repo, url, base_dir): repo
for repo, url in repo_tarballs.items()
pool.submit(process_repo, repo.name, repo.tarball, base_dir): repo
for repo in repo_data
if repo.exists
}

for future in as_completed(futures):
repo_name = futures[future]
repo_info = futures[future]
try:
future.result()
# click.echo(click.style(f"[✓] Downloaded {repo_name}", fg="green"), err=False)
repo_path = future.result()
except Exception as exc:
click.echo(
click.style(f"[✗] Could not download {repo_name}: {exc}", fg="red"),
click.style(f"[✗] Could not download {repo_info.name}: {exc}", fg="red"),
err=True,
)
raise
append_jsonl(logfile_path, repo_info, repo_path)



def remove_packages(whitelisted_pkgs: Dict[str, List[str]]) -> None:
Expand Down