Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 67 additions & 43 deletions nimp/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

'''Provides functions for build artifacts'''

import copy
from __future__ import annotations

import datetime
import hashlib
import json
Expand All @@ -34,6 +35,8 @@
import stat
import zipfile
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TypedDict

import requests

Expand All @@ -52,16 +55,36 @@
torf = None


def _is_http_url(string):
if TYPE_CHECKING:
from typing import Any
from typing import Iterable
from typing import Mapping

StrPathLike = str | os.PathLike[str]


class Artifact(TypedDict):
revision: str
uri: str


def _is_http_url(string: str) -> bool:
return re.match(r'^http[s]?:\/\/.*$', string) is not None


def list_artifacts(artifact_pattern: str, format_arguments, api_context):
def list_artifacts(
artifact_pattern: str,
format_arguments: Mapping[str, Any],
api_context_: nimp.utils.git.GitApiContext | None,
) -> list[Artifact]:
'''List all artifacts and their revision using the provided pattern after formatting'''

format_arguments = copy.deepcopy(format_arguments)
format_arguments['revision'] = '{revision}'
artifact_pattern = artifact_pattern.format(**format_arguments)
artifact_pattern = artifact_pattern.format_map(
{
**format_arguments,
'revision': '{revision}',
}
)

if not _is_http_url(artifact_pattern):
artifact_source = nimp.system.sanitize_path(os.path.dirname(artifact_pattern))
Expand All @@ -74,27 +97,25 @@ def list_artifacts(artifact_pattern: str, format_arguments, api_context):
)

all_files = _list_files(artifact_source, False)
all_artifacts = []
all_artifacts: list[Artifact] = []
for file_uri in all_files:
file_name = os.path.basename(file_uri.rstrip('/'))
artifact_match = artifact_regex.match(file_name)
if artifact_match:
group_revision = artifact_match.group('revision')
sortable_revision = copy.deepcopy(group_revision)
if api_context:
sortable_revision = nimp.utils.git.get_gitea_commit_timestamp(api_context, group_revision)
if sortable_revision is not None:
artifact = {
'revision': group_revision,
'sortable_revision': sortable_revision,
'uri': file_uri,
}
all_artifacts.append(artifact)
if not artifact_match:
continue

group_revision = artifact_match.group('revision')
all_artifacts.append(
{
'revision': group_revision,
'uri': file_uri,
}
)
return all_artifacts


def _list_files(source: str, recursive):
all_files = []
def _list_files(source: str, recursive: bool) -> list[str]:
all_files: list[str] = []

if _is_http_url(source):
if not source.endswith('/'):
Expand Down Expand Up @@ -128,7 +149,7 @@ def _list_files(source: str, recursive):
return all_files


def download_artifact(workspace_directory, artifact_uri):
def download_artifact(workspace_directory: str, artifact_uri: str) -> str:
'''Download an artifact to the workspace'''

download_directory = os.path.join(workspace_directory, '.nimp', 'downloads')
Expand Down Expand Up @@ -158,7 +179,7 @@ def download_artifact(workspace_directory, artifact_uri):
return local_artifact_path


def _download_file(file_uri, output_path):
def _download_file(file_uri: str, output_path: StrPathLike) -> None:
if os.path.exists(output_path):
os.remove(output_path)
output_directory = os.path.dirname(output_path)
Expand All @@ -174,7 +195,7 @@ def _download_file(file_uri, output_path):
shutil.copyfile(file_uri, output_path)


def _extract_archive(archive_path, output_path):
def _extract_archive(archive_path: StrPathLike, output_path: StrPathLike) -> None:
if os.path.exists(output_path):
shutil.rmtree(output_path)

Expand All @@ -192,7 +213,7 @@ def _extract_archive(archive_path, output_path):
os.remove(inner_archive_path)


def install_artifact(artifact_path, destination_directory):
def install_artifact(artifact_path: str, destination_directory: StrPathLike) -> None:
'''Install an artifact in the workspace'''

if not os.path.exists(artifact_path):
Expand All @@ -213,7 +234,7 @@ def install_artifact(artifact_path, destination_directory):
_try_make_executable(destination)


def _try_make_executable(file_path):
def _try_make_executable(file_path: str) -> None:
if platform.system() == 'Windows':
return

Expand All @@ -231,14 +252,20 @@ def _try_make_executable(file_path):
logging.warning('Failed to make file executable: %s (FilePath: %s)', exception, file_path)


def _try_rename(src, dst, max_attempts=5, retry_delay=2):
def _rename():
def _try_rename(src: StrPathLike, dst: StrPathLike, max_attempts: int = 5, retry_delay: int = 2) -> None:
def _rename() -> None:
os.rename(src, dst)

nimp.system.try_execute(_rename, OSError, attempt_maximum=max_attempts, retry_delay=retry_delay)


def create_artifact(artifact_path, file_collection, archive, compress, dry_run):
def create_artifact(
artifact_path: str,
file_collection: Iterable[tuple[StrPathLike, StrPathLike]],
archive: bool,
compress: bool,
dry_run: bool,
) -> None:
'''Create an artifact'''

if os.path.isfile(artifact_path + '.zip') or os.path.isdir(artifact_path):
Expand Down Expand Up @@ -292,7 +319,7 @@ def create_artifact(artifact_path, file_collection, archive, compress, dry_run):
shutil.move(artifact_path_tmp, artifact_path)


def create_torrent(artifact_path, announce, dry_run):
def create_torrent(artifact_path: StrPathLike, announce: str | None, dry_run: bool) -> None:
'''Create a torrent for an existing artifact'''

if torf is None:
Expand Down Expand Up @@ -329,7 +356,7 @@ def create_torrent(artifact_path, announce, dry_run):
tmp_torrent_path.rename(torrent_path)


def create_hash(artifact_path, hash_method, dry_run):
def create_hash(artifact_path: str, hash_method, dry_run: bool) -> None:
artifact_full_path = _find_artifact(artifact_path)
if not artifact_full_path:
raise FileNotFoundError(f'Artifact not found: {artifact_path}')
Expand All @@ -343,26 +370,23 @@ def create_hash(artifact_path, hash_method, dry_run):
json.dump({hash_method: file_hash}, fh)


def get_file_hash(file_path, hash_method):
def get_file_hash(file_path: StrPathLike, hash_method: str):
'''helper function to parse potentially big files'''
assert getattr(hashlib, hash_method)()
hash_lib = getattr(hashlib, hash_method)()
hasher = hashlib.new(hash_method)

_BLOCK_SIZE = 65536

with open(file_path, 'rb') as fh:
file_buffer = fh.read(_BLOCK_SIZE)
while len(file_buffer) > 0:
hash_lib.update(file_buffer)
file_buffer = fh.read(_BLOCK_SIZE)
file_hash = hash_lib.hexdigest()
while buffer := fh.read(_BLOCK_SIZE):
hasher.update(buffer)
file_hash = hasher.hexdigest()

logging.debug(f"{file_path} {hash_method}: {file_hash}")
logging.debug("%s %s: %s", file_path, hash_method, file_hash)
return file_hash


# TODO (l.cahour): this is workaround the fact we don't use artifact objects containing the info we need
def _find_artifact(artifact_path):
def _find_artifact(artifact_path: str) -> str | None:
if os.path.isfile(artifact_path + '.zip'):
return artifact_path + '.zip'
elif os.path.isdir(artifact_path):
Expand All @@ -373,8 +397,8 @@ def _find_artifact(artifact_path):

# TODO (l.cahour): this is a naive first attempt at using a wrapper class to clean how we handle artifacts saving
# Try to make this better and use it everywhere else in the future
class TempArtifact(object):
def __init__(self, file_path, mode, force=False):
class TempArtifact:
def __init__(self, file_path: StrPathLike, mode, force: bool = False) -> None:
self.name = file_path
self.temp = f'{file_path}.tmp'
self.mode = mode
Expand Down
Loading