Skip to content

Commit df03608

Browse files
committed
services: add handling of async results of upload
* adds the task results to the record * addresses #1970 Signed-off-by: Ilias Koutsakis <[email protected]>
1 parent a4fd8be commit df03608

File tree

6 files changed

+178
-20
lines changed

6 files changed

+178
-20
lines changed

cap/modules/deposit/api.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
UpdateDepositPermission)
7979

8080
from .review import Reviewable
81-
from .tasks import upload_to_zenodo
81+
from .tasks import create_zenodo_upload_tasks
8282
from .utils import create_zenodo_deposit
8383

8484
_datastore = LocalProxy(lambda: current_app.extensions['security'].datastore)
@@ -287,9 +287,9 @@ def upload(self, pid, *args, **kwargs):
287287
self.commit()
288288

289289
# upload files to zenodo deposit
290-
upload_to_zenodo.delay(files, recid, token,
291-
deposit['id'],
292-
deposit['links']['bucket'])
290+
create_zenodo_upload_tasks(files, recid, token,
291+
deposit['id'],
292+
deposit['links']['bucket'])
293293
else:
294294
raise FileUploadError(
295295
'You cannot create an empty Zenodo deposit. '

cap/modules/deposit/tasks.py

+71-13
Original file line numberDiff line numberDiff line change
@@ -21,38 +21,96 @@
2121
# In applying this license, CERN does not
2222
# waive the privileges and immunities granted to it by virtue of its status
2323
# as an Intergovernmental Organization or submit itself to any jurisdiction.
24-
"""Tasks."""
24+
"""Zenodo Upload Tasks."""
2525

2626
from __future__ import absolute_import, print_function
2727

2828
import requests
2929
from flask import current_app
30-
from celery import shared_task
30+
from celery import chord, group, current_task, shared_task
31+
3132
from invenio_files_rest.models import FileInstance
33+
from invenio_db import db
34+
35+
from cap.modules.experiments.errors import ExternalAPIException
36+
from .utils import get_zenodo_deposit_from_record
37+
38+
39+
def create_zenodo_upload_tasks(files, recid, token,
40+
zenodo_depid, zenodo_bucket_url):
41+
"""Create the upload tasks and get the results."""
42+
current_app.logger.info(
43+
f'Uploading files to Zenodo {zenodo_depid}: {files}.')
44+
45+
# the only way to have a task that waits for
46+
# other tasks to finish is the `chord` structure
47+
upload_callback = save_results_to_record.s(depid=zenodo_depid, recid=recid)
48+
upload_tasks = group(
49+
upload.s(filename, recid, token, zenodo_bucket_url)
50+
for filename in files
51+
)
52+
53+
chord(upload_tasks, upload_callback).delay()
3254

3355

3456
@shared_task(autoretry_for=(Exception, ),
3557
retry_kwargs={
3658
'max_retries': 5,
3759
'countdown': 10
3860
})
39-
def upload_to_zenodo(files, recid, token, zenodo_depid, zenodo_bucket_url):
40-
"""Upload to Zenodo the files the user selected."""
61+
def upload(filename, recid, token, zenodo_bucket_url):
62+
"""Upload file to Zenodo."""
4163
from cap.modules.deposit.api import CAPDeposit
42-
rec = CAPDeposit.get_record(recid)
64+
record = CAPDeposit.get_record(recid)
4365

44-
for filename in files:
45-
file_obj = rec.files[filename]
46-
file_ins = FileInstance.get(file_obj.file_id)
66+
file_obj = record.files[filename]
67+
file_ins = FileInstance.get(file_obj.file_id)
68+
task_id = current_task.request.id
4769

48-
with open(file_ins.uri, 'rb') as fp:
70+
with open(file_ins.uri, 'rb') as fp:
71+
try:
4972
resp = requests.put(
5073
url=f'{zenodo_bucket_url}/{filename}',
5174
data=fp,
5275
params=dict(access_token=token),
5376
)
5477

55-
if not resp.ok:
56-
current_app.logger.error(
57-
f'Uploading file {filename} to deposit {zenodo_depid} '
58-
f'failed with {resp.status_code}.')
78+
current_app.logger.error(
79+
f'{task_id}: Zenodo upload of file `{filename}`: {resp.status_code}.') # noqa
80+
81+
status = resp.status_code
82+
msg = resp.json()
83+
except (ValueError, ExternalAPIException) as err:
84+
status = 'FAILED'
85+
msg = str(err)
86+
87+
current_app.logger.error(
88+
f'{task_id}: Something went wrong with the task:\n{msg}')
89+
finally:
90+
return {
91+
'task_id': task_id,
92+
'result': {'file': filename, 'status': status, 'message': msg}
93+
}
94+
95+
96+
@shared_task(autoretry_for=(Exception, ),
97+
retry_kwargs={
98+
'max_retries': 5,
99+
'countdown': 10
100+
})
101+
def save_results_to_record(tasks, depid, recid):
102+
"""Save the results of uploading to the record."""
103+
from cap.modules.deposit.api import CAPDeposit
104+
record = CAPDeposit.get_record(recid)
105+
106+
# update the tasks of the specified zenodo deposit (filename: status)
107+
# this way we can attach multiple deposits
108+
zenodo = get_zenodo_deposit_from_record(record, depid)
109+
for task in tasks:
110+
zenodo['tasks'][task['task_id']] = task['result']
111+
112+
record.commit()
113+
db.session.commit()
114+
115+
current_app.logger.info(
116+
f'COMPLETED: Zenodo {depid} uploads:\n{tasks}')

cap/modules/deposit/utils.py

+14
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,20 @@ def add_api_to_links(links):
8282
return response
8383

8484

85+
def get_zenodo_deposit_from_record(record, pid):
86+
"""Get the related Zenodo information from a record."""
87+
try:
88+
index = [idx for idx, deposit in enumerate(record['_zenodo'])
89+
if deposit['id'] == pid][0]
90+
91+
# set an empty dict as tasks if there is none
92+
record['_zenodo'][index].setdefault('tasks', {})
93+
return record['_zenodo'][index]
94+
except IndexError:
95+
raise FileUploadError(
96+
'The Zenodo pid you provided is not associated with this record.')
97+
98+
8599
def create_zenodo_deposit(token, data=None):
86100
"""Create a Zenodo deposit using the logged in user's credentials."""
87101
zenodo_url = current_app.config.get("ZENODO_SERVER_URL")
+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
{
2+
"name": "zenodo",
3+
"version": "0.0.1",
4+
"fullname": "",
5+
"experiment": null,
6+
"is_indexed": false,
7+
"use_deposit_as_record": true,
8+
"allow_all": true,
9+
"deposit_schema": {
10+
"additionalProperties": "False",
11+
"type": "array",
12+
"items": {
13+
"type": "object",
14+
"properties": {
15+
"id": {
16+
"type": "number"
17+
},
18+
"created": {
19+
"type": "string"
20+
},
21+
"title": {
22+
"type": "string"
23+
},
24+
"creator": {
25+
"type": "string"
26+
},
27+
"links": {
28+
"type": "object",
29+
"properties": {
30+
"self": {
31+
"type": "string"
32+
},
33+
"html": {
34+
"type": "string"
35+
},
36+
"publish": {
37+
"type": "string"
38+
},
39+
"bucket": {
40+
"type": "string"
41+
}
42+
}
43+
},
44+
"tasks": {
45+
"type": "object",
46+
"patternProperties": {
47+
"^[0-F]{8}-([0-F]{4}-){3}[0-F]{12}$": {
48+
"type": "object",
49+
"properties": {
50+
"file": {
51+
"type": "string"
52+
},
53+
"status": {
54+
"type": "string"
55+
},
56+
"message": {
57+
"type": "string"
58+
}
59+
}
60+
}
61+
}
62+
}
63+
},
64+
"title": "Zenodo Deposit"
65+
}
66+
},
67+
"deposit_mapping": {},
68+
"deposit_options": {}
69+
}

cap/modules/services/views/zenodo.py

+18
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@
2727

2828
import requests
2929
from flask import current_app, jsonify
30+
from invenio_pidstore.resolver import Resolver
3031

3132
from . import blueprint
33+
from cap.modules.access.utils import login_required
34+
from cap.modules.deposit.api import CAPDeposit
3235

3336

3437
def _get_zenodo_record(zenodo_id):
@@ -47,3 +50,18 @@ def get_zenodo_record(zenodo_id):
4750
"""Get record from zenodo (route)."""
4851
resp, status = _get_zenodo_record(zenodo_id)
4952
return jsonify(resp), status
53+
54+
55+
@blueprint.route('/zenodo/tasks/<depid>')
56+
@login_required
57+
def get_zenodo_tasks(depid):
58+
"""Get record from zenodo (route)."""
59+
resolver = Resolver(pid_type='depid',
60+
object_type='rec',
61+
getter=lambda x: x)
62+
63+
_, uuid = resolver.resolve(depid)
64+
record = CAPDeposit.get_record(uuid)
65+
tasks = record.get('_zenodo', {}).get('tasks', [])
66+
67+
return jsonify(tasks), 200

tests/integration/test_zenodo_upload.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def test_create_and_upload_to_zenodo(mock_token, app, users, deposit_with_file,
104104

105105
assert len(record['_zenodo']) == 1
106106
assert record['_zenodo'][0]['id'] == 111
107-
assert record['_zenodo'][0]['title'] == None
107+
assert record['_zenodo'][0]['title'] is None
108108
assert record['_zenodo'][0]['created'] == '2020-11-20T11:49:39.147767+00:00'
109109

110110

@@ -382,8 +382,7 @@ def test_zenodo_upload_file_not_uploaded_error(mock_token, app, users, deposit_w
382382
assert resp.status_code == 201
383383

384384
captured = capsys.readouterr()
385-
assert 'Uploading file test-file.txt to deposit 111 failed with 500' \
386-
in captured.err
385+
assert 'Zenodo upload of file `test-file.txt`: 500.' in captured.err
387386

388387

389388
@patch('cap.modules.deposit.api._fetch_token', return_value='test-token')

0 commit comments

Comments
 (0)