Skip to content

Use celery for queues #109

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ VOLUME /app /data/repos /data/transformed /transformer
RUN apk add --update-cache \
bash \
git \
redis \
&& rm -rf /var/cache/apk/*
WORKDIR /temp
COPY requirements/common.txt requirements/dev.txt requirements/
RUN pip install --upgrade pip \
&& pip install --no-cache-dir -r requirements/dev.txt
WORKDIR /app
EXPOSE 8000
EXPOSE 8000 6379
CMD ["/bin/bash"]
6 changes: 6 additions & 0 deletions argv/__init__.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env python
# coding: utf-8

from .celery import app as celery_app

__all__ = ('celery_app',)
14 changes: 14 additions & 0 deletions argv/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env python
# coding: utf-8

import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'argv.settings')

app = Celery('argv')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()
8 changes: 8 additions & 0 deletions argv/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
'django.contrib.humanize',
'widget_tweaks',
'django_countries',
'django_celery_results',
]

if DEBUG == True:
Expand Down Expand Up @@ -252,3 +253,10 @@
messages.WARNING: 'alert-warning',
messages.ERROR: 'alert-danger',
}

# Celery Configuration Options
CELERY_TIMEZONE = config("CELERY_TIMEZONE", default='America/Chicago')
CELERY_TASK_TRACK_STARTED = config("CELERY_TASK_TRACK_STARTED", default=True, cast=bool)
CELERY_TASK_TIME_LIMIT = config("CELERY_TASK_TIME_LIMIT", default=30 * 60, cast=int)
CELERY_BROKER_URL = config("CELERY_BROKER_URL", default="redis://localhost:6379/0")
CELERY_RESULT_BACKEND = 'django-db'
187 changes: 187 additions & 0 deletions backend/cloner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#!/usr/bin/env python
# coding: utf-8

from argv.celery import app
from celery.exceptions import SoftTimeLimitExceeded
from celery.utils.log import get_task_logger
celery_logger = get_task_logger(__name__)

from django.conf import settings
from django.utils import timezone

from decouple import config

import subprocess
import shutil
import os
import tempfile
import socket

from website.models import ProjectSnapshot

from os import walk

import backend.tasks

CLONER_DRY_RUN = config('CLONER_DRY_RUN', default=False, cast=bool)
CLONER_NO_UNPACK = config('CLONER_NO_UNPACK', default=False, cast=bool)
CLONER_SHALLOW = config('CLONER_SHALLOW', default=False, cast=bool)
CLONER_NOFILTER = config('CLONER_NOFILTER', default=False, cast=bool)

# TODO: Verbosity fix
def run_command(args, cwd):
p = subprocess.Popen(args,
cwd=cwd,
stdout=None,
stderr=None
# stdout=subprocess.PIPE if self.verbosity < 3 else None,
# stderr=subprocess.PIPE if self.verbosity < 3 else None
)
p.wait()
return p


# What happens on failure?
@app.task(time_limit=(60 * 60),
soft_time_limit=(55 * 60))
def process_snapshot(snapshot_pk, selection_pk=None):
celery_logger.info(f"Running for {snapshot_pk}")
snapshot = ProjectSnapshot.objects.get(pk=snapshot_pk)

celery_logger.info('processing ProjectSnapshot: ' + snapshot.project.url)
if not CLONER_DRY_RUN:
snapshot.host = socket.gethostname()
snapshot.save()

host = snapshot.project.url[snapshot.project.url.index('://') + 3:]
project_name = host[host.index('/') + 1:]
project_base = project_name[0:project_name.index('/')] if '/' in project_name else project_name
project_parent = project_name[0:project_name.rindex('/')] if '/' in project_name else ''
host = host[:host.index('/')]

repo_root = os.path.join(getattr(settings, 'REPO_PATH'), host)
path = os.path.join(repo_root, project_name)
tmp = tempfile.gettempdir()
src_dir = os.path.join(tmp, project_name)
git_dir = os.path.join(src_dir, '.git')

celery_logger.debug(' -> root: ' + repo_root)
celery_logger.debug(' -> name: ' + project_name)
celery_logger.debug(' -> base: ' + project_base)
celery_logger.debug(' -> path: ' + path)
celery_logger.debug(' -> tmp: ' + tmp)
celery_logger.debug(' -> src_dir: ' + src_dir)
celery_logger.debug(' -> git_dir: ' + git_dir)

if os.path.exists(path):
celery_logger.info(' -> SKIPPING: already exists: ' + project_name)
if selection_pk is not None:
backend.tasks.filter_selection.delay(selection_pk)
if not CLONER_DRY_RUN:
snapshot.path = os.path.join(host, project_name)
snapshot.datetime_processed = timezone.now()
snapshot.save()
return

try:
clone_repo(host, src_dir, project_name, git_dir, repo_root, project_parent, snapshot)
finally:
# cleanup temp files
if os.path.exists(os.path.join(tmp, project_base)):
shutil.rmtree(os.path.join(tmp, project_base))

if not CLONER_DRY_RUN:
snapshot.datetime_processed = timezone.now()
snapshot.save()
if selection_pk is not None:
backend.tasks.filter_selection.delay(selection_pk)

def clone_repo(host, src_dir, project_name, git_dir, repo_root, project_parent, snapshot):
# clone and filter the repo
os.makedirs(src_dir, 0o755, True)
run_command(['git', 'init'], src_dir)

run_command(['git', 'config', '--local', 'core.sparseCheckout', 'true'], src_dir)
with open(os.path.join(src_dir, '.git/info/sparse-checkout'), 'w') as sparseFile:
sparseFile.writelines('**/*.java\n')

# get all remote objects
run_command(['git', 'remote', 'add', 'origin', 'https://foo:bar@' + host + '/' + project_name], src_dir)
if CLONER_SHALLOW:
p = run_command(['git', 'fetch', '--depth', '1'], src_dir)
else:
p = run_command(['git', 'fetch'], src_dir)

if p.returncode == 0:
# filter out unwanted objects
if not CLONER_NOFILTER:
run_command(['git', 'filter-repo', '--path-regex', '[^/]+\\.java$'], src_dir)
run_command(['git', 'remote', 'add', 'origin', 'https://foo:bar@' + host + '/' + project_name], src_dir)

# unpack object files
unpack_git(git_dir)

# checkout working tree
# similar to: git remote show origin | grep "HEAD branch" | cut -d ":" -f 2
masterrefproc = subprocess.Popen(['git', 'remote', 'show', 'origin'], cwd=src_dir, stdout=subprocess.PIPE, stderr=None)
parts = filter(lambda x: 'HEAD branch' in x, masterrefproc.stdout.read().decode().strip().split('\n'))
masterref = list(parts)[0].split(':')[1].strip()
p = run_command(['git', 'checkout', masterref], src_dir)

if not CLONER_DRY_RUN:
if p.returncode == 0:
if not os.path.exists(repo_root):
celery_logger.info('root does not exist, creating: ' + repo_root)
os.makedirs(repo_root, 0o755, True)

update_metrics(snapshot, src_dir)

# move repo to final location
parent_dir = os.path.join(repo_root, project_parent)
celery_logger.info('moving ' + src_dir + ' -> ' + parent_dir)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir, 0o755, True)
shutil.move(src_dir, parent_dir)

snapshot.path = os.path.join(host, project_name)
else:
celery_logger.info('failed to clone ' + project_name)

def unpack_git(git_dir):
"""unpack all Git object files"""
if CLONER_NO_UNPACK:
return

if os.path.exists(os.path.join(git_dir, 'objects/pack')):
packdir = os.path.join(git_dir, 'pack')
shutil.move(os.path.join(git_dir, 'objects/pack'), packdir)

for (root, dirs, files) in walk(packdir):
for f in files:
if f.endswith('.pack'):
with open(os.path.join(root, f), 'rb') as packfile:
unpack = subprocess.Popen(['git', 'unpack-objects'], cwd=git_dir, stdin=subprocess.PIPE, stdout=None, stderr=None)
while True:
b = packfile.read(64 * 1024)
if not b:
break
unpack.stdin.write(b)

shutil.rmtree(packdir)

def update_metrics(project, path):
# update number of commits
p = subprocess.Popen(['git', 'rev-list', '--count', 'HEAD'], cwd=path, stdout=subprocess.PIPE, stderr=None)
project.commits = int(p.stdout.read().decode().strip())
celery_logger.info(path + ', commits =' + str(project.commits))

# update number of committers
p = subprocess.Popen(['git', 'shortlog', '-sne'], cwd=path, stdout=subprocess.PIPE, stderr=None)
project.committers = p.stdout.read().decode().count('\n')
celery_logger.info(path + ', committers =' + str(project.committers))

# update src file counts
# similar to: find . -name '*.java' | wc -l
p = subprocess.Popen(['find', '.', '-name', '*.java'], cwd=path, stdout=subprocess.PIPE, stderr=None)
project.src_files = p.stdout.read().decode().count('\n')
celery_logger.info(path + ', src_files =' + str(project.src_files))
5 changes: 4 additions & 1 deletion backend/discoveryrunner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from website.choices import ONGOING, PROCESSED
from website.models import Project, ProjectSnapshot, Selection

import backend.tasks as backend_tasks


class DiscoveryRunner:
def __init__(self, selector, backend_id, dry_run, verbosity):
Expand Down Expand Up @@ -46,4 +48,5 @@ def discovered_project(self, url):
s = p.snapshots.order_by('-datetime_processed').first()
else:
s, _ = ProjectSnapshot.objects.get_or_create(project=p)
Selection.objects.get_or_create(project_selector=self.selector, snapshot=s)
sel = Selection.objects.get_or_create(project_selector=self.selector, snapshot=s)
backend_tasks.process_snapshot.delay(s.pk, sel.pk)
81 changes: 81 additions & 0 deletions backend/filterer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/env python
# coding: utf-8

from argv.celery import app
from celery.utils.log import get_task_logger
celery_logger = get_task_logger(__name__)

from django.db import transaction
from django.utils import timezone

from decouple import config

from website.choices import READY, ONGOING, PROCESSED

from website.models import Selection, FilterDetail, ProjectSelector


FILTERER_DRY_RUN = config('FILTERER_DRY_RUN', default=False, cast=bool)

__all__ = ['filter_selection']

@app.task(bind=True, retry_backoff=True)
def filter_selection(self, selection_pk):
selection = Selection.objects.get(pk=selection_pk)
if selection.snapshot.host is None:
raise self.retry()

celery_logger.info(f' --->>>> processing Selection {selection}')

if selection.snapshot.path and test_repo(selection):
celery_logger.debug(f'-> retained snapshot: {selection.snapshot}')
selection.retained = True
selection.save()
else:
celery_logger.debug(f'-> filtered snapshot: {selection.snapshot}')
selection.retained = False
selection.save()

selection.status = PROCESSED
selection.save(update_fields=['status'])

with transaction.atomic():
if not Selection.objects.filter(selector=selection.project_selector).exclude(status=PROCESSED).exists():
celery_logger.debug(f'ProjectSelector finished: {selection.project_selector}')
selection.project_selector.status = PROCESSED
selection.project_selector.fin_process = timezone.now()
selection.project_selector.save(update=['status', 'fin_process',])


def test_repo(selection):
project_selector = selection.project_selector
snapshot = selection.snapshot

for f in FilterDetail.objects.filter(project_selector=project_selector).all():
value = f.value
filter = f.pfilter.flter.name
if filter == "Minimum number of commits":
if not snapshot.commits or snapshot.commits < int(value):
return False
elif filter == "Maximum number of commits":
if not snapshot.commits or snapshot.commits > int(value):
return False
elif filter == "Minimum number of source files":
if not snapshot.src_files or snapshot.src_files < int(value):
return False
elif filter == "Maximum number of source files":
if not snapshot.src_files or snapshot.src_files > int(value):
return False
elif filter == "Minimum number of committers":
if not snapshot.committers or snapshot.committers < int(value):
return False
elif filter == "Maximum number of committers":
if not snapshot.committers or snapshot.committers > int(value):
return False
elif filter == "Minimum number of stars":
pass
elif filter == "Maximum number of stars":
pass

return True

18 changes: 18 additions & 0 deletions backend/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env python
# coding: utf-8

from argv.celery import app

from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)

from django.conf import settings
from django.utils import timezone

from backend.transformrunner import run_transforms
from backend.filterer import filter_selection
from backend.cloner import process_snapshot

__all__ = ['process_snapshot',
'filter_selection'
'run_transforms']
Loading