-
Notifications
You must be signed in to change notification settings - Fork 58
Expand file tree
/
Copy pathtasks.py
More file actions
236 lines (187 loc) · 7.11 KB
/
tasks.py
File metadata and controls
236 lines (187 loc) · 7.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import base64
import json
import os
import subprocess
import tempfile
import zipfile
from datetime import timedelta
import jwt
import requests
from celery.utils.log import get_task_logger
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from django.conf import settings
from django.core import files
from django.db import transaction
from django.db.transaction import on_commit
from django.utils.timezone import now
from lambda_tasks.decorators import lambda_task
from grandchallenge.algorithms.models import Algorithm
from grandchallenge.codebuild.tasks import create_codebuild_build
from grandchallenge.core.celery import (
acks_late_2xlarge_task,
acks_late_micro_short_task,
)
from grandchallenge.github.exceptions import GitHubBadRefreshTokenException
logger = get_task_logger(__name__)
def get_repo_url(payload):
installation_id = payload["installation"]["id"]
b64_key = settings.GITHUB_PRIVATE_KEY_BASE64
b64_bytes = b64_key.encode("ascii")
key_bytes = base64.b64decode(b64_bytes)
private_key = serialization.load_pem_private_key(
key_bytes, password=None, backend=default_backend()
)
current_time = now()
msg = {
"iat": int(current_time.timestamp()) - 60,
"exp": int(current_time.timestamp()) + 60 * 5,
"iss": settings.GITHUB_APP_ID,
}
token = jwt.encode(msg, private_key, algorithm="RS256")
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.github+json",
}
resp = requests.post(
f"https://api.github.com/app/installations/{installation_id}/access_tokens",
headers=headers,
timeout=10,
)
access_token = json.loads(resp.content)["token"]
repo_url = payload["repository"]["html_url"]
return repo_url.replace("//", f"//x-access-token:{access_token}@")
def install_lfs():
process = subprocess.check_output(
["git", "lfs", "install"], stderr=subprocess.STDOUT
)
return process
def fetch_repo(payload, repo_url, tmpdirname, recurse_submodules):
cmd = [
"git",
"clone",
"--branch",
payload["ref"],
"--depth",
"1",
repo_url,
tmpdirname,
]
if recurse_submodules:
cmd.insert(2, "--recurse-submodules")
process = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
return process
def check_license(tmpdirname):
process = subprocess.Popen(
["licensee", "detect", tmpdirname, "--json", "--no-remote"],
stdout=subprocess.PIPE,
)
try:
outs, errs = process.communicate(timeout=15)
except subprocess.TimeoutExpired:
process.kill()
raise
return json.loads(outs.decode("utf-8"))
def save_zipfile(ghwm, tmpdirname):
zip_name = f"{ghwm.repo_name}-{ghwm.tag}.zip"
tmp_zip = tempfile.NamedTemporaryFile()
with zipfile.ZipFile(tmp_zip.name, "w") as zipf:
for foldername, _subfolders, filenames in os.walk(tmpdirname):
for filename in filenames:
file_path = os.path.join(foldername, filename)
zipf.write(file_path, file_path.replace(f"{tmpdirname}/", ""))
temp_file = files.File(tmp_zip, name=zip_name)
return temp_file
def build_repo(ghwm_pk):
on_commit(
create_codebuild_build.signature(kwargs={"pk": ghwm_pk}).apply_async
)
@acks_late_2xlarge_task
def get_zipfile(*, pk):
from grandchallenge.github.models import GitHubWebhookMessage
ghwm = GitHubWebhookMessage.objects.get(pk=pk)
if ghwm.clone_status != GitHubWebhookMessage.CloneStatusChoices.PENDING:
raise RuntimeError("Clone status was not pending")
payload = ghwm.payload
repo_url = get_repo_url(payload)
ghwm.clone_status = GitHubWebhookMessage.CloneStatusChoices.STARTED
ghwm.save()
try:
recurse_submodules = Algorithm.objects.get(
repo_name=ghwm.payload["repository"]["full_name"]
).recurse_submodules
except Algorithm.DoesNotExist:
logger.info("No algorithm linked to this repo")
ghwm.clone_status = (
GitHubWebhookMessage.CloneStatusChoices.NOT_APPLICABLE
)
ghwm.save()
return
with tempfile.TemporaryDirectory() as tmpdirname:
try:
# Run git lfs install here, doing it in the dockerfile does not
# seem to work
install_lfs()
fetch_repo(payload, repo_url, tmpdirname, recurse_submodules)
license_check_result = check_license(tmpdirname)
temp_file = save_zipfile(ghwm, tmpdirname)
# update GithubWebhook object
ghwm.zipfile = temp_file
ghwm.license_check_result = license_check_result
ghwm.clone_status = GitHubWebhookMessage.CloneStatusChoices.SUCCESS
ghwm.save()
build_repo(ghwm.pk)
except Exception as e:
ghwm.stdout = str(getattr(e, "stdout", ""))
ghwm.stderr = str(getattr(e, "stderr", ""))
ghwm.clone_status = GitHubWebhookMessage.CloneStatusChoices.FAILURE
ghwm.save()
if not ghwm.user_error:
raise
@acks_late_micro_short_task
def unlink_algorithm(*, pk):
from grandchallenge.github.models import GitHubWebhookMessage
ghwm = GitHubWebhookMessage.objects.get(pk=pk)
for repo in ghwm.payload["repositories"]:
Algorithm.objects.filter(repo_name=repo["full_name"]).update(
repo_name=""
)
@acks_late_micro_short_task(name=f"{__name__}.cleanup_expired_tokens")
@transaction.atomic
def cleanup_expired_tokens_celery(**kwargs):
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return cleanup_expired_tokens(**kwargs)
@lambda_task
def cleanup_expired_tokens():
from grandchallenge.github.models import GitHubUserToken
GitHubUserToken.objects.filter(refresh_token_expires__lt=now()).only(
"pk"
).delete()
@acks_late_micro_short_task(name=f"{__name__}.refresh_user_token")
def refresh_user_token_celery(**kwargs):
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return refresh_user_token(**kwargs)
@lambda_task
def refresh_user_token(*, pk: int):
from grandchallenge.github.models import GitHubUserToken
token = GitHubUserToken.objects.get(pk=pk)
try:
token.refresh_access_token()
except GitHubBadRefreshTokenException:
token.delete()
return
token.save()
@acks_late_micro_short_task(name=f"{__name__}.refresh_expiring_user_tokens")
def refresh_expiring_user_tokens_celery(**kwargs):
# TODO: 4408 Remove, this is still here to handle existing tasks on SQS
return refresh_expiring_user_tokens(**kwargs)
@lambda_task
def refresh_expiring_user_tokens():
"""Refresh user tokens expiring in the next 1 to 28 days"""
from grandchallenge.github.models import GitHubUserToken
queryset = GitHubUserToken.objects.filter(
refresh_token_expires__gt=now() + timedelta(days=1),
refresh_token_expires__lt=now() + timedelta(days=28),
)
for token in queryset.iterator():
refresh_user_token.execute_on_commit(pk=token.pk)