|
21 | 21 | # In applying this license, CERN does not
|
22 | 22 | # waive the privileges and immunities granted to it by virtue of its status
|
23 | 23 | # as an Intergovernmental Organization or submit itself to any jurisdiction.
|
24 |
| -"""Tasks.""" |
| 24 | +"""Zenodo Upload Tasks.""" |
25 | 25 |
|
26 | 26 | from __future__ import absolute_import, print_function
|
27 | 27 |
|
28 | 28 | import requests
|
29 | 29 | from flask import current_app
|
30 |
| -from celery import shared_task |
| 30 | +from celery import chord, group, current_task, shared_task |
| 31 | + |
31 | 32 | from invenio_files_rest.models import FileInstance
|
| 33 | +from invenio_db import db |
| 34 | + |
| 35 | +from cap.modules.experiments.errors import ExternalAPIException |
| 36 | +from .utils import get_zenodo_deposit_from_record |
| 37 | + |
| 38 | + |
| 39 | +def create_zenodo_upload_tasks(files, recid, token, |
| 40 | + zenodo_depid, zenodo_bucket_url): |
| 41 | + """Create the upload tasks and get the results.""" |
| 42 | + current_app.logger.info( |
| 43 | + f'Uploading files to Zenodo {zenodo_depid}: {files}.') |
| 44 | + |
| 45 | + # the only way to have a task that waits for |
| 46 | + # other tasks to finish is the `chord` structure |
| 47 | + upload_callback = save_results_to_record.s(depid=zenodo_depid, recid=recid) |
| 48 | + upload_tasks = group( |
| 49 | + upload.s(filename, recid, token, zenodo_bucket_url) |
| 50 | + for filename in files |
| 51 | + ) |
| 52 | + |
| 53 | + chord(upload_tasks, upload_callback).delay() |
32 | 54 |
|
33 | 55 |
|
34 | 56 | @shared_task(autoretry_for=(Exception, ),
|
35 | 57 | retry_kwargs={
|
36 | 58 | 'max_retries': 5,
|
37 | 59 | 'countdown': 10
|
38 | 60 | })
|
39 |
| -def upload_to_zenodo(files, recid, token, zenodo_depid, zenodo_bucket_url): |
40 |
| - """Upload to Zenodo the files the user selected.""" |
| 61 | +def upload(filename, recid, token, zenodo_bucket_url): |
| 62 | + """Upload file to Zenodo.""" |
41 | 63 | from cap.modules.deposit.api import CAPDeposit
|
42 |
| - rec = CAPDeposit.get_record(recid) |
| 64 | + record = CAPDeposit.get_record(recid) |
43 | 65 |
|
44 |
| - for filename in files: |
45 |
| - file_obj = rec.files[filename] |
46 |
| - file_ins = FileInstance.get(file_obj.file_id) |
| 66 | + file_obj = record.files[filename] |
| 67 | + file_ins = FileInstance.get(file_obj.file_id) |
| 68 | + task_id = current_task.request.id |
47 | 69 |
|
48 |
| - with open(file_ins.uri, 'rb') as fp: |
| 70 | + with open(file_ins.uri, 'rb') as fp: |
| 71 | + try: |
49 | 72 | resp = requests.put(
|
50 | 73 | url=f'{zenodo_bucket_url}/{filename}',
|
51 | 74 | data=fp,
|
52 | 75 | params=dict(access_token=token),
|
53 | 76 | )
|
54 | 77 |
|
55 |
| - if not resp.ok: |
56 |
| - current_app.logger.error( |
57 |
| - f'Uploading file {filename} to deposit {zenodo_depid} ' |
58 |
| - f'failed with {resp.status_code}.') |
| 78 | + current_app.logger.error( |
| 79 | + f'{task_id}: Zenodo upload of file `{filename}`: {resp.status_code}.') # noqa |
| 80 | + |
| 81 | + status = resp.status_code |
| 82 | + msg = resp.json() |
| 83 | + except (ValueError, ExternalAPIException) as err: |
| 84 | + status = 'FAILED' |
| 85 | + msg = str(err) |
| 86 | + |
| 87 | + current_app.logger.error( |
| 88 | + f'{task_id}: Something went wrong with the task:\n{msg}') |
| 89 | + finally: |
| 90 | + return { |
| 91 | + 'task_id': task_id, |
| 92 | + 'result': {'file': filename, 'status': status, 'message': msg} |
| 93 | + } |
| 94 | + |
| 95 | + |
| 96 | +@shared_task(autoretry_for=(Exception, ), |
| 97 | + retry_kwargs={ |
| 98 | + 'max_retries': 5, |
| 99 | + 'countdown': 10 |
| 100 | + }) |
| 101 | +def save_results_to_record(tasks, depid, recid): |
| 102 | + """Save the results of uploading to the record.""" |
| 103 | + from cap.modules.deposit.api import CAPDeposit |
| 104 | + record = CAPDeposit.get_record(recid) |
| 105 | + |
| 106 | + # update the tasks of the specified zenodo deposit (filename: status) |
| 107 | + # this way we can attach multiple deposits |
| 108 | + zenodo = get_zenodo_deposit_from_record(record, depid) |
| 109 | + for task in tasks: |
| 110 | + zenodo['tasks'][task['task_id']] = task['result'] |
| 111 | + |
| 112 | + record.commit() |
| 113 | + db.session.commit() |
| 114 | + |
| 115 | + current_app.logger.info( |
| 116 | + f'COMPLETED: Zenodo {depid} uploads:\n{tasks}') |
0 commit comments