Skip to content

Commit 2279ec0

Browse files
committed
medn
1 parent a282ece commit 2279ec0

File tree

12 files changed

+400
-342
lines changed

12 files changed

+400
-342
lines changed

cernopendata/cold_storage/api.py

Lines changed: 10 additions & 198 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from datetime import datetime
3030

3131
from flask import current_app
32+
from flask_mail import Message
3233
from invenio_db import db
3334
from invenio_files_rest.models import (
3435
Bucket,
@@ -37,12 +38,7 @@
3738
ObjectVersion,
3839
ObjectVersionTag,
3940
)
40-
from invenio_pidstore.models import PersistentIdentifier
4141
from invenio_records_files.api import FileObject, FilesIterator, Record
42-
from sqlalchemy import func
43-
from flask_mail import Message
44-
45-
from cernopendata.api import RecordFilesWithIndex
4642

4743
from .models import RequestMetadata, TransferMetadata
4844

@@ -91,7 +87,7 @@ class Transfer:
9187
@staticmethod
9288
def create(entry):
9389
"""Create a new transfer entry."""
94-
transfer = Transfer(
90+
transfer = TransferMetadata(
9591
action=entry["action"],
9692
new_filename=entry["new_filename"],
9793
record_uuid=entry["record_uuid"],
@@ -110,22 +106,23 @@ def create(entry):
110106
def _get_ongoing_transfers(last_check):
111107
"""Get transfers that need processing."""
112108
return (
113-
Transfer.query.filter(
114-
Transfer.last_check <= last_check, Transfer.finished.is_(None)
109+
TransferMetadata.query.filter(
110+
TransferMetadata.last_check <= last_check,
111+
TransferMetadata.finished.is_(None),
115112
)
116-
.order_by(Transfer.last_check)
113+
.order_by(TransferMetadata.last_check)
117114
.all()
118115
)
119116

120117
@staticmethod
121118
def is_scheduled(file_id, action):
122119
"""Check if a transfer is already scheduled."""
123120
return (
124-
db.session.query(Transfer.id)
121+
db.session.query(TransferMetadata.id)
125122
.filter(
126-
Transfer.action == action,
127-
Transfer.file_id == file_id,
128-
Transfer.finished.is_(None),
123+
TransferMetadata.action == action,
124+
TransferMetadata.file_id == file_id,
125+
TransferMetadata.finished.is_(None),
129126
)
130127
.first()
131128
is not None
@@ -139,46 +136,6 @@ def _load_class(full_class_path):
139136
cls = getattr(module, class_name)
140137
return cls()
141138

142-
@staticmethod
143-
def check_transfers(catalog):
144-
"""Check all the ongoing transfers."""
145-
logger.info("Checking all the ongoing transfers")
146-
now = datetime.utcnow()
147-
all_status = {}
148-
summary = {}
149-
for transfer in Transfer._get_ongoing_transfers(now):
150-
id = transfer.id
151-
transfer.last_check = datetime.utcnow()
152-
transfer.status, error = Transfer._load_class(
153-
f"{transfer.method}.TransferManager"
154-
).transfer_status(transfer.method_id)
155-
all_status[id] = transfer.status
156-
if transfer.status not in summary:
157-
summary[transfer.status] = 0
158-
summary[transfer.status] += 1
159-
if transfer.status == "DONE":
160-
logger.debug(
161-
f"Transfer {id}: just finished! Let's update the catalog and mark it as done"
162-
)
163-
transfer.finished = datetime.now()
164-
catalog.add_copy(
165-
transfer.record_uuid,
166-
transfer.file_id,
167-
transfer.action,
168-
transfer.new_filename,
169-
)
170-
if transfer.status == "FAILED" or not transfer.status:
171-
logger.error(f"The transfer {id} failed :(")
172-
transfer.reason = error
173-
transfer.finished = datetime.now()
174-
else:
175-
logger.debug(f"Transfer {id} is in status {transfer.status}")
176-
db.session.add(transfer)
177-
db.session.commit()
178-
catalog.reindex_entries()
179-
logger.info(f"Summary: {summary}")
180-
return all_status
181-
182139

183140
class Request:
184141
"""Class to check the cold storage requests."""
@@ -247,148 +204,3 @@ def subscribe(transfer_id, email):
247204
db.session.commit()
248205
return True
249206
return False
250-
251-
def check_requests(self, manager):
252-
"""Check the active requests."""
253-
# The requests would go through these stages
254-
# SUBMITTED -> STARTED -> COMPLETED
255-
self._check_submitted(manager)
256-
257-
# Now, let's look at the ones that 'started'
258-
self._check_running()
259-
260-
@staticmethod
261-
def _check_submitted(manager):
262-
"""Check if there are any new transfers submitted."""
263-
for action in ["stage", "archive"]:
264-
active_transfers_count = Transfer.query.filter(
265-
Transfer.finished.is_(None), Transfer.action == action
266-
).count()
267-
threshold = Request.get_active_transfers_threshold(action)
268-
269-
logger.info(
270-
f"Checking if we can {action} more records: active {active_transfers_count}/{threshold}"
271-
)
272-
submitted = 0
273-
limit = threshold - active_transfers_count
274-
if limit > 0:
275-
transfers = Request.query.filter_by(
276-
status="submitted", action=action
277-
).all()
278-
279-
for transfer in transfers:
280-
info = manager.doOperation(
281-
action,
282-
transfer.record_id,
283-
limit=limit,
284-
register=True,
285-
force=False,
286-
dry=False,
287-
)
288-
logger.debug(f"Got {info}")
289-
if info:
290-
submitted += len(info)
291-
transfer.num_files += len(info)
292-
transfer.size += sum(item.size for item in info)
293-
transfer.started_at = datetime.utcnow()
294-
if limit == submitted:
295-
logger.info(
296-
f"Reached the threshold of {threshold} transfers. There might be more in this record"
297-
f"({submitted + active_transfers_count}). Let's wait before continuing"
298-
)
299-
else:
300-
transfer.status = "started"
301-
db.session.add(transfer)
302-
limit -= submitted
303-
if limit <= 0:
304-
logger.info("We have submitted enough. Stopping")
305-
break
306-
if submitted:
307-
logger.info(f"{submitted} transfers have been submitted!")
308-
db.session.commit()
309-
310-
@staticmethod
311-
def _check_running():
312-
"""Check the records that are being archived."""
313-
for action in ["stage", "archive"]:
314-
315-
requests = Request.query.filter_by(status="started", action=action).all()
316-
logger.debug(f"Checking the {len(requests)} {action} requests")
317-
completed = 0
318-
for request in requests:
319-
record = RecordFilesWithIndex.get_record(request.record_id)
320-
321-
if action == "stage":
322-
if record["availability"] != "online":
323-
continue
324-
elif action == "archive":
325-
files = (f for index in record.file_indices for f in index["files"])
326-
missing = next(
327-
(
328-
f
329-
for f in files
330-
if "tags" not in f or "uri_cold" not in f["tags"]
331-
),
332-
None,
333-
)
334-
if missing:
335-
logger.debug(f"The file {missing['key']} is not in tape yet...")
336-
continue
337-
completed += 1
338-
Request.complete(request)
339-
logger.info(f"{completed}/{len(requests)} requests have finished")
340-
341-
@staticmethod
342-
def get_requests(
343-
status=None,
344-
action=None,
345-
record=None,
346-
summary=False,
347-
sort=None,
348-
direction=None,
349-
page=None,
350-
per_page=None,
351-
):
352-
"""Get the summary of the requests."""
353-
if summary:
354-
query = db.session.query(
355-
Request.status,
356-
Request.action,
357-
func.count().label("count"),
358-
func.sum(Request.num_files).label("files"),
359-
func.sum(Request.size).label("size"),
360-
)
361-
else:
362-
query = Request.query
363-
364-
if status:
365-
status_list = status.split(",")
366-
query = query.filter(Request.status.in_(status_list))
367-
368-
if action:
369-
action_list = action.split(",")
370-
query = query.filter(Request.action.in_(action_list))
371-
372-
if record:
373-
try:
374-
uuid = PersistentIdentifier.get("recid", record).object_uuid
375-
query = query.filter_by(record_id=uuid)
376-
except Exception:
377-
query = query.filter(False)
378-
if summary:
379-
result = query.group_by(Request.status, Request.action).all()
380-
381-
if sort:
382-
column = getattr(Request, sort, None)
383-
if column:
384-
if direction == "desc":
385-
query = query.order_by(column.desc())
386-
else:
387-
query = query.order_by(column.asc())
388-
389-
if page:
390-
result = query.order_by(Request.created_at.desc()).paginate(
391-
page=page, per_page=per_page, error_out=False
392-
)
393-
394-
return result

cernopendata/cold_storage/catalog.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,9 @@ def get_files_from_record(self, record, limit=None):
6464
if "_file_indices" in record:
6565
for f in record["_file_indices"]:
6666
files += f["files"]
67-
if limit:
68-
start = 0
69-
end = len(files)
70-
if limit < 0:
71-
start = -limit
72-
else:
73-
end = limit
74-
files = files[start:end]
75-
logger.debug(f"And the list of files are: f{files}")
67+
if limit and limit < 0:
68+
logger.debug(f"Skipping the first {limit} files")
69+
files = files[-limit:]
7670
return files
7771

7872
def clear_hot(self, record, file_id):

cernopendata/cold_storage/cli.py

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@
2424

2525
"""Cold Storage CLI."""
2626

27-
import json
28-
import logging.config
29-
import sys
30-
from functools import wraps
3127
from math import log2
3228

3329
import click
@@ -37,6 +33,7 @@
3733
from invenio_pidstore.models import PersistentIdentifier
3834

3935
from .manager import ColdStorageManager
36+
from .service import RequestService, TransferService
4037

4138
argument_record = click.argument("record", nargs=-1, required=True, metavar="RECORD")
4239

@@ -47,7 +44,6 @@
4744
+ "with the same file and checksum, import it without issuing the transfer",
4845
)
4946
option_dry = click.option("--dry/--do-it", default=False, help="Do not issue transfers")
50-
option_debug = click.option("--debug/--no-debug", default=False)
5147
option_force = click.option(
5248
"--force/--no-force",
5349
default=False,
@@ -80,16 +76,15 @@ def cold():
8076
@with_appcontext
8177
@argument_record
8278
@option_register
83-
@option_debug
8479
@option_limit
8580
@option_force
8681
@option_dry
87-
def archive(record, register, debug, limit, force, dry):
82+
def archive(record, register, limit, force, dry):
8883
"""Move a record to cold."""
89-
_doOperation("archive", record, register, debug, limit, force, dry)
84+
_doOperation("archive", record, register, limit, force, dry)
9085

9186

92-
def _doOperation(operation, record, register, debug, limit, force, dry):
87+
def _doOperation(operation, record, register, limit, force, dry):
9388
"""Internal function to do the CLI commands."""
9489
m = ColdStorageManager(current_app)
9590
counter = 0
@@ -113,13 +108,12 @@ def _doOperation(operation, record, register, debug, limit, force, dry):
113108
@with_appcontext
114109
@argument_record
115110
@option_register
116-
@option_debug
117111
@option_limit
118112
@option_force
119113
@option_dry
120-
def stage(record, register, debug, limit, force, dry):
114+
def stage(record, register, limit, force, dry):
121115
"""Move a record from cold."""
122-
_doOperation("stage", record, register, debug, limit, force, dry)
116+
_doOperation("stage", record, register, limit, force, dry)
123117

124118

125119
@cold.command()
@@ -133,8 +127,7 @@ def settings():
133127
@cold.command()
134128
@with_appcontext
135129
@argument_record
136-
@option_debug
137-
def list(record, debug):
130+
def list(record):
138131
"""Print the urls for an entry.
139132
140133
By default, it prints the urls for all the files of the entry.
@@ -156,8 +149,6 @@ def list(record, debug):
156149
click.secho(f"The record '{r}' does not exist.", fg="red")
157150
continue
158151
info = m.list(uuid)
159-
if debug:
160-
print("Printing debug info", info)
161152
if not info:
162153
click.secho(f"The record {r} does not exist!")
163154
stats["errors"] += [r]
@@ -191,25 +182,21 @@ def list(record, debug):
191182
@argument_record
192183
@option_limit
193184
@option_dry
194-
@option_debug
195-
def clear_hot(record, limit, dry, debug):
185+
def clear_hot(record, limit, dry):
196186
"""Delete the hot copy of a file that has a cold copy."""
197-
_doOperation("clear_hot", record, None, debug, limit, None, dry)
187+
_doOperation("clear_hot", record, None, limit, None, dry)
198188

199189

200190
@cold.command()
201191
@with_appcontext
202-
@option_debug
203-
def check_transfers(debug):
192+
def process_transfers():
204193
"""Check the status of the transfers."""
205-
m = ColdStorageManager(current_app)
206-
return m.check_current_transfers()
194+
return TransferService.process_transfers()
207195

208196

209197
@cold.command()
210198
@with_appcontext
211-
@option_debug
212-
def check_requests(debug):
199+
def process_requests():
213200
"""Check the status of the requests."""
214-
m = ColdStorageManager(current_app)
215-
return m.check_requests()
201+
202+
return RequestService.process_requests()

0 commit comments

Comments
 (0)