|
21 | 21 | # In applying this license, CERN does not
|
22 | 22 | # waive the privileges and immunities granted to it by virtue of its status
|
23 | 23 | # as an Intergovernmental Organization or submit itself to any jurisdiction.
|
24 |
| -"""Tasks.""" |
| 24 | +"""Zenodo Upload Tasks.""" |
25 | 25 |
|
26 | 26 | from __future__ import absolute_import, print_function
|
27 | 27 |
|
28 | 28 | import requests
|
29 | 29 | from flask import current_app
|
30 |
| -from celery import shared_task |
| 30 | +from celery import chord, group, current_task, shared_task |
| 31 | + |
31 | 32 | from invenio_files_rest.models import FileInstance
|
| 33 | +from invenio_db import db |
| 34 | + |
| 35 | +from .utils import get_zenodo_deposit_from_record |
| 36 | + |
| 37 | + |
| 38 | +def create_zenodo_upload_tasks(files, recid, token, |
| 39 | + zenodo_depid, zenodo_bucket_url): |
| 40 | + """Create the upload tasks and get the results.""" |
| 41 | + current_app.logger.info( |
| 42 | + f'Uploading files to Zenodo {zenodo_depid}: {files}.') |
| 43 | + |
| 44 | + # the only way to have a task that waits for |
| 45 | + # other tasks to finish is the `chord` structure |
| 46 | + upload_callback = save_results_to_record.s(depid=zenodo_depid, recid=recid) |
| 47 | + upload_tasks = group( |
| 48 | + upload.s(filename, recid, token, zenodo_bucket_url) |
| 49 | + for filename in files |
| 50 | + ) |
| 51 | + |
| 52 | + chord(upload_tasks, upload_callback).delay() |
| 53 | + |
| 54 | + |
| 55 | +@shared_task(autoretry_for=(Exception, ), |
| 56 | + retry_kwargs={ |
| 57 | + 'max_retries': 5, |
| 58 | + 'countdown': 10 |
| 59 | + }) |
| 60 | +def upload(filename, recid, token, zenodo_bucket_url): |
| 61 | + """Upload file to Zenodo.""" |
| 62 | + from cap.modules.deposit.api import CAPDeposit |
| 63 | + record = CAPDeposit.get_record(recid) |
| 64 | + |
| 65 | + file_obj = record.files[filename] |
| 66 | + file_ins = FileInstance.get(file_obj.file_id) |
| 67 | + task_id = current_task.request.id |
| 68 | + |
| 69 | + with open(file_ins.uri, 'rb') as fp: |
| 70 | + resp = requests.put( |
| 71 | + url=f'{zenodo_bucket_url}/{filename}', |
| 72 | + data=fp, |
| 73 | + params=dict(access_token=token), |
| 74 | + ) |
| 75 | + |
| 76 | + current_app.logger.error( |
| 77 | + f'{task_id}: Zenodo upload of file `{filename}`: {resp.status_code}.') # noqa |
| 78 | + return task_id, {'file': filename, 'result': resp.status_code} |
32 | 79 |
|
33 | 80 |
|
34 | 81 | @shared_task(autoretry_for=(Exception, ),
|
35 | 82 | retry_kwargs={
|
36 | 83 | 'max_retries': 5,
|
37 | 84 | 'countdown': 10
|
38 | 85 | })
|
39 |
| -def upload_to_zenodo(files, recid, token, zenodo_depid, zenodo_bucket_url): |
40 |
| - """Upload to Zenodo the files the user selected.""" |
| 86 | +def save_results_to_record(tasks, depid, recid): |
| 87 | + """Save the results of uploading to the record.""" |
41 | 88 | from cap.modules.deposit.api import CAPDeposit
|
42 |
| - rec = CAPDeposit.get_record(recid) |
43 |
| - |
44 |
| - for filename in files: |
45 |
| - file_obj = rec.files[filename] |
46 |
| - file_ins = FileInstance.get(file_obj.file_id) |
47 |
| - |
48 |
| - with open(file_ins.uri, 'rb') as fp: |
49 |
| - resp = requests.put( |
50 |
| - url=f'{zenodo_bucket_url}/{filename}', |
51 |
| - data=fp, |
52 |
| - params=dict(access_token=token), |
53 |
| - ) |
54 |
| - |
55 |
| - if not resp.ok: |
56 |
| - current_app.logger.error( |
57 |
| - f'Uploading file {filename} to deposit {zenodo_depid} ' |
58 |
| - f'failed with {resp.status_code}.') |
| 89 | + record = CAPDeposit.get_record(recid) |
| 90 | + |
| 91 | + # update the tasks of the specified zenodo deposit (filename: status) |
| 92 | + # this way we can attach multiple deposits |
| 93 | + zenodo = get_zenodo_deposit_from_record(record, depid) |
| 94 | + for task in tasks: |
| 95 | + zenodo['tasks'][task[0]] = task[1] |
| 96 | + |
| 97 | + record.commit() |
| 98 | + db.session.commit() |
| 99 | + |
| 100 | + current_app.logger.info( |
| 101 | + f'COMPLETED: Zenodo {depid} uploads:\n{tasks}') |
0 commit comments