Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 98 additions & 64 deletions github/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,94 +34,93 @@ def check_if_merge_or_commit():
return False


def _resolve_repo_full_name(repo_name: str) -> str:
"""
Returns a repository identifier in 'owner/repo' format when possible.

Priority:
1) repo_name if already 'owner/repo'
2) env GITHUB_REPOSITORY if set to 'owner/repo'
3) if repo_name is only 'repo', and env is 'owner/repo', use env (best effort)
"""
repo_name = (repo_name or "").strip()
if "/" in repo_name:
return repo_name

env_repo = (os.getenv("GITHUB_REPOSITORY") or "").strip()
if "/" in env_repo:
# If only repo name was provided, prefer env full name
return env_repo

return repo_name # may still be only 'repo'


def _get_repo(client: Github, repo_name: str):
"""
Robust repo getter:
- If we have owner/repo -> client.get_repo(owner/repo)
- Else -> client.get_user().get_repo(repo) (works for user-owned repos)
"""
repo_resolved = _resolve_repo_full_name(repo_name)
if "/" in repo_resolved:
return client.get_repo(repo_resolved)
return client.get_user().get_repo(repo_resolved)


def add_pull_request_comment(github_auth_token, repo_name, pr_number, comment_body):
"""
Adds a comment to a pull request

Arguments:
github_auth_token {str}: The auth token of the github user
repo_name {str}: The name of the repository
repo_name {str}: The name of the repository (either 'repo' or 'owner/repo')
pr_number {int}: The Pull request number to add a comment
comment_body {str}: The body of the comment
"""
try:
client = Github(github_auth_token)
repo = client.get_user().get_repo(repo_name)
repo = _get_repo(client, repo_name)
pull = repo.get_pull(pr_number)
pull.create_issue_comment(comment_body)
except Exception as e:
print("There was an error while commenting on the Pull request: {}".format(e))


def create_github_repository_issue(
github_auth_token, repo_name, issue_title, issue_body
):
def create_github_repository_issue(github_auth_token, repo_name, issue_title, issue_description):
"""
Creates an issue in a given repository

Arguments:
github_auth_token {str}: The auth token of the github user
repo_name {str}: The name of the repository
issue_title {int}: The title of the issue to be created
issue_body {str}: The body of the issue to be created
Create an issue in the given GitHub repository.

Fixes Issue #82:
- Accepts repo_name as 'repo' OR 'owner/repo'
- Resolves to full 'owner/repo' using env GITHUB_REPOSITORY when needed
- Uses client.get_repo('owner/repo') when full name exists (prevents 404)
"""
try:
client = Github(github_auth_token)
repo = client.get_user().get_repo(repo_name)
issue = repo.create_issue(issue_title, issue_body)
except Exception as e:
print("There was an error while creating an issue: {}".format(e))

repo_resolved = _resolve_repo_full_name(repo_name)

def create_challenge_zip_file(challenge_zip_file_path, ignore_dirs, ignore_files):
"""
Creates the challenge zip file at a given path

Arguments:
challenge_zip_file_path {str}: The relative path of the created zip file
ignore_dirs {list}: The list of directories to exclude from the zip file
ignore_files {list}: The list of files to exclude from the zip file
"""
working_dir = (
os.getcwd()
) # Special case for github. For local. use os.path.dirname(os.getcwd())

# Creating evaluation_script.zip file
eval_script_dir = working_dir + "/evaluation_script"
eval_script_zip = zipfile.ZipFile(
"evaluation_script.zip", "w", zipfile.ZIP_DEFLATED
)
for root, dirs, files in os.walk(eval_script_dir):
for file in files:
file_name = os.path.join(root, file)
name_in_zip_file = (
file_name[len(eval_script_dir) + 1 :]
if file_name.startswith(eval_script_dir)
else file_name
)
eval_script_zip.write(file_name, name_in_zip_file)
eval_script_zip.close()

# Creating the challenge_config.zip file
zipf = zipfile.ZipFile(challenge_zip_file_path, "w", zipfile.ZIP_DEFLATED)
for root, dirs, files in os.walk(working_dir):
parents = root.split("/")
if not set(parents) & set(ignore_dirs):
for file in files:
if file not in ignore_files:
file_name = os.path.join(root, file)
name_in_zip_file = (
file_name[len(working_dir) + 1 :]
if file_name.startswith(working_dir)
else file_name
)
zipf.write(file_name, name_in_zip_file)
zipf.close()
# Helpful, non-sensitive debug line (good for maintainers too)
print("\nINFO: Creating GitHub issue in repository: {}".format(repo_resolved))

# IMPORTANT: Always prefer get_repo when owner/repo is available
repo = _get_repo(client, repo_resolved)

body = "{}\n\n---\n\n(Generated by EvalAI-Starters validation workflow)".format(issue_description)
repo.create_issue(title=issue_title, body=body)

print("\nCreated a GitHub issue successfully.")
return True

except Exception as e:
print("\nThere was an error while creating an issue: {}".format(e))
return False


def get_request_header(token):
"""
Returns user auth token formatted in header for sending requests

Arguments:
token {str}: The user token to gain access to EvalAI
"""
Expand All @@ -132,7 +131,7 @@ def get_request_header(token):
def load_host_configs(config_path):
"""
Loads token to be used for sending requests

Arguments:
config_path {str}: The path of host configs having the user token, team id and the EvalAI host url
"""
Expand Down Expand Up @@ -161,9 +160,9 @@ def load_host_configs(config_path):
def validate_token(response):
"""
Function to check if the authentication token provided by user is valid or not

Arguments:
response {dict}: The response json dict sent back from EvalAI
response {dict}: The response json dict sent back from EvalAI
"""
error = None
if "detail" in response:
Expand All @@ -183,3 +182,38 @@ def validate_token(response):
os.environ["CHALLENGE_ERRORS"] = error
return False
return True


def create_challenge_zip_file(zip_file_path, ignore_dirs, ignore_files):
"""
Creates a zip file of the repository excluding ignored directories/files.

Arguments:
zip_file_path {str}: output zip file path
ignore_dirs {list}: directories to ignore
ignore_files {list}: filenames to ignore
"""
ignore_dirs = set(ignore_dirs or [])
ignore_files = set(ignore_files or [])

# Make sure we don't zip the zip itself if it already exists
ignore_files.add(os.path.basename(zip_file_path))

with zipfile.ZipFile(zip_file_path, "w", zipfile.ZIP_DEFLATED) as zipf:
root_dir = os.getcwd()

for folder, subfolders, files in os.walk(root_dir):
# Skip ignored directories anywhere in the path
parts = set(folder.replace("\\", "/").split("/"))
if parts.intersection(ignore_dirs):
continue

for filename in files:
if filename in ignore_files:
continue

file_path = os.path.join(folder, filename)

# Store relative path in zip
arcname = os.path.relpath(file_path, root_dir)
zipf.write(file_path, arcname)