Skip to content

Commit 5e4caab

Browse files
committed
medn
1 parent a282ece commit 5e4caab

File tree

10 files changed

+170
-146
lines changed

10 files changed

+170
-146
lines changed

cernopendata/cold_storage/api.py

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class Transfer:
9191
@staticmethod
9292
def create(entry):
9393
"""Create a new transfer entry."""
94-
transfer = Transfer(
94+
transfer = TransferMetadata(
9595
action=entry["action"],
9696
new_filename=entry["new_filename"],
9797
record_uuid=entry["record_uuid"],
@@ -110,22 +110,23 @@ def create(entry):
110110
def _get_ongoing_transfers(last_check):
111111
"""Get transfers that need processing."""
112112
return (
113-
Transfer.query.filter(
114-
Transfer.last_check <= last_check, Transfer.finished.is_(None)
113+
TransferMetadata.query.filter(
114+
TransferMetadata.last_check <= last_check,
115+
TransferMetadata.finished.is_(None),
115116
)
116-
.order_by(Transfer.last_check)
117+
.order_by(TransferMetadata.last_check)
117118
.all()
118119
)
119120

120121
@staticmethod
121122
def is_scheduled(file_id, action):
122123
"""Check if a transfer is already scheduled."""
123124
return (
124-
db.session.query(Transfer.id)
125+
db.session.query(TransferMetadata.id)
125126
.filter(
126-
Transfer.action == action,
127-
Transfer.file_id == file_id,
128-
Transfer.finished.is_(None),
127+
TransferMetadata.action == action,
128+
TransferMetadata.file_id == file_id,
129+
TransferMetadata.finished.is_(None),
129130
)
130131
.first()
131132
is not None
@@ -261,8 +262,8 @@ def check_requests(self, manager):
261262
def _check_submitted(manager):
262263
"""Check if there are any new transfers submitted."""
263264
for action in ["stage", "archive"]:
264-
active_transfers_count = Transfer.query.filter(
265-
Transfer.finished.is_(None), Transfer.action == action
265+
active_transfers_count = TransferMetadata.query.filter(
266+
TransferMetadata.finished.is_(None), TransferMetadata.action == action
266267
).count()
267268
threshold = Request.get_active_transfers_threshold(action)
268269

@@ -272,7 +273,7 @@ def _check_submitted(manager):
272273
submitted = 0
273274
limit = threshold - active_transfers_count
274275
if limit > 0:
275-
transfers = Request.query.filter_by(
276+
transfers = RequestMetadata.query.filter_by(
276277
status="submitted", action=action
277278
).all()
278279

@@ -291,6 +292,7 @@ def _check_submitted(manager):
291292
transfer.num_files += len(info)
292293
transfer.size += sum(item.size for item in info)
293294
transfer.started_at = datetime.utcnow()
295+
logger.info(f"THE LIMIT WAS {limit}, AND WE SUBMITTED {submitted}")
294296
if limit == submitted:
295297
logger.info(
296298
f"Reached the threshold of {threshold} transfers. There might be more in this record"
@@ -312,7 +314,9 @@ def _check_running():
312314
"""Check the records that are being archived."""
313315
for action in ["stage", "archive"]:
314316

315-
requests = Request.query.filter_by(status="started", action=action).all()
317+
requests = RequestMetadata.query.filter_by(
318+
status="started", action=action
319+
).all()
316320
logger.debug(f"Checking the {len(requests)} {action} requests")
317321
completed = 0
318322
for request in requests:
@@ -352,22 +356,20 @@ def get_requests(
352356
"""Get the summary of the requests."""
353357
if summary:
354358
query = db.session.query(
355-
Request.status,
356-
Request.action,
359+
RequestMetadata.status,
360+
RequestMetadata.action,
357361
func.count().label("count"),
358-
func.sum(Request.num_files).label("files"),
359-
func.sum(Request.size).label("size"),
362+
func.sum(RequestMetadata.num_files).label("files"),
363+
func.sum(RequestMetadata.size).label("size"),
360364
)
361365
else:
362-
query = Request.query
366+
query = RequestMetadata.query
363367

364368
if status:
365-
status_list = status.split(",")
366-
query = query.filter(Request.status.in_(status_list))
369+
query = query.filter(RequestMetadata.status.in_(status))
367370

368371
if action:
369-
action_list = action.split(",")
370-
query = query.filter(Request.action.in_(action_list))
372+
query = query.filter(RequestMetadata.action.in_(action))
371373

372374
if record:
373375
try:
@@ -376,7 +378,9 @@ def get_requests(
376378
except Exception:
377379
query = query.filter(False)
378380
if summary:
379-
result = query.group_by(Request.status, Request.action).all()
381+
result = query.group_by(
382+
RequestMetadata.status, RequestMetadata.action
383+
).all()
380384

381385
if sort:
382386
column = getattr(Request, sort, None)
@@ -387,7 +391,7 @@ def get_requests(
387391
query = query.order_by(column.asc())
388392

389393
if page:
390-
result = query.order_by(Request.created_at.desc()).paginate(
394+
result = query.order_by(RequestMetadata.created_at.desc()).paginate(
391395
page=page, per_page=per_page, error_out=False
392396
)
393397

cernopendata/cold_storage/catalog.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,9 @@ def get_files_from_record(self, record, limit=None):
6464
if "_file_indices" in record:
6565
for f in record["_file_indices"]:
6666
files += f["files"]
67-
if limit:
68-
start = 0
69-
end = len(files)
70-
if limit < 0:
71-
start = -limit
72-
else:
73-
end = limit
74-
files = files[start:end]
75-
logger.debug(f"And the list of files are: f{files}")
67+
if limit and limit < 0:
68+
logger.debug(f"Skipping the first {limit} files")
69+
files = files[-limit:]
7670
return files
7771

7872
def clear_hot(self, record, file_id):

cernopendata/cold_storage/cli.py

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
+ "with the same file and checksum, import it without issuing the transfer",
4848
)
4949
option_dry = click.option("--dry/--do-it", default=False, help="Do not issue transfers")
50-
option_debug = click.option("--debug/--no-debug", default=False)
5150
option_force = click.option(
5251
"--force/--no-force",
5352
default=False,
@@ -80,16 +79,15 @@ def cold():
8079
@with_appcontext
8180
@argument_record
8281
@option_register
83-
@option_debug
8482
@option_limit
8583
@option_force
8684
@option_dry
87-
def archive(record, register, debug, limit, force, dry):
85+
def archive(record, register, limit, force, dry):
8886
"""Move a record to cold."""
89-
_doOperation("archive", record, register, debug, limit, force, dry)
87+
_doOperation("archive", record, register, limit, force, dry)
9088

9189

92-
def _doOperation(operation, record, register, debug, limit, force, dry):
90+
def _doOperation(operation, record, register, limit, force, dry):
9391
"""Internal function to do the CLI commands."""
9492
m = ColdStorageManager(current_app)
9593
counter = 0
@@ -113,13 +111,12 @@ def _doOperation(operation, record, register, debug, limit, force, dry):
113111
@with_appcontext
114112
@argument_record
115113
@option_register
116-
@option_debug
117114
@option_limit
118115
@option_force
119116
@option_dry
120-
def stage(record, register, debug, limit, force, dry):
117+
def stage(record, register, limit, force, dry):
121118
"""Move a record from cold."""
122-
_doOperation("stage", record, register, debug, limit, force, dry)
119+
_doOperation("stage", record, register, limit, force, dry)
123120

124121

125122
@cold.command()
@@ -133,8 +130,7 @@ def settings():
133130
@cold.command()
134131
@with_appcontext
135132
@argument_record
136-
@option_debug
137-
def list(record, debug):
133+
def list(record):
138134
"""Print the urls for an entry.
139135
140136
By default, it prints the urls for all the files of the entry.
@@ -156,8 +152,6 @@ def list(record, debug):
156152
click.secho(f"The record '{r}' does not exist.", fg="red")
157153
continue
158154
info = m.list(uuid)
159-
if debug:
160-
print("Printing debug info", info)
161155
if not info:
162156
click.secho(f"The record {r} does not exist!")
163157
stats["errors"] += [r]
@@ -191,25 +185,22 @@ def list(record, debug):
191185
@argument_record
192186
@option_limit
193187
@option_dry
194-
@option_debug
195-
def clear_hot(record, limit, dry, debug):
188+
def clear_hot(record, limit, dry):
196189
"""Delete the hot copy of a file that has a cold copy."""
197-
_doOperation("clear_hot", record, None, debug, limit, None, dry)
190+
_doOperation("clear_hot", record, None, limit, None, dry)
198191

199192

200193
@cold.command()
201194
@with_appcontext
202-
@option_debug
203-
def check_transfers(debug):
195+
def check_transfers():
204196
"""Check the status of the transfers."""
205197
m = ColdStorageManager(current_app)
206198
return m.check_current_transfers()
207199

208200

209201
@cold.command()
210202
@with_appcontext
211-
@option_debug
212-
def check_requests(debug):
203+
def check_requests():
213204
"""Check the status of the requests."""
214205
m = ColdStorageManager(current_app)
215206
return m.check_requests()

cernopendata/cold_storage/manager.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,12 @@ def _move_record(
125125
record = self._catalog.get_record(record_uuid)
126126
if not record:
127127
return []
128-
for file in self._catalog.get_files_from_record(record, limit):
128+
for file in self._catalog.get_files_from_record(record):
129129
transfers += self._move_record_file(
130130
record.id, file, action, move_function, register, force, dry
131131
)
132+
if len(transfers) >= limit:
133+
logger.info("Reached the limit. Going back")
132134
logger.info(f"{len(transfers)} transfers have been issued")
133135
return transfers
134136

cernopendata/cold_storage/schemas.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# This file is part of CERN Open Data Portal.
4+
# Copyright (C) 2017-2025 CERN.
5+
#
6+
# CERN Open Data Portal is free software; you can redistribute it
7+
# and/or modify it under the terms of the GNU General Public License as
8+
# published by the Free Software Foundation; either version 2 of the
9+
# License, or (at your option) any later version.
10+
#
11+
# CERN Open Data Portal is distributed in the hope that it will be
12+
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14+
# General Public License for more details.
15+
#
16+
# You should have received a copy of the GNU General Public License
17+
# along with CERN Open Data Portal; if not, write to the
18+
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
19+
# MA 02111-1307, USA.
20+
#
21+
# In applying this license, CERN does not
22+
# waive the privileges and immunities granted to it by virtue of its status
23+
# as an Intergovernmental Organization or submit itself to any jurisdiction.
24+
25+
"""Schemas used for the transfers."""
26+
27+
from invenio_pidstore.models import PersistentIdentifier
28+
from marshmallow import Schema, fields, validate
29+
30+
31+
class TransferRequestQuerySchema(Schema):
32+
"""Arguments for the transfer search."""
33+
34+
page = fields.Int(missing=1)
35+
per_page = fields.Int(missing=10)
36+
status = fields.List(fields.Str())
37+
action = fields.List(fields.Str())
38+
record = fields.Str()
39+
sort = fields.Str()
40+
direction = fields.Str(validate=validate.OneOf(["asc", "desc"]), missing="asc")
41+
42+
43+
class TransferRequestSchema(Schema):
44+
"""Schema of a transfer."""
45+
46+
id = fields.Str()
47+
recid = fields.Method("get_recid")
48+
action = fields.Str()
49+
num_files = fields.Int()
50+
size = fields.Int()
51+
status = fields.Str()
52+
created_at = fields.DateTime()
53+
started_at = fields.DateTime(allow_none=True)
54+
completed_at = fields.DateTime(allow_none=True)
55+
56+
def get_recid(self, obj):
57+
"""Convert the uuid into the recid."""
58+
pid = PersistentIdentifier.query.filter_by(object_uuid=obj.record_id).first()
59+
return pid.pid_value if pid else None

cernopendata/config.py

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -68,32 +68,10 @@
6868
# Logging - Set up Sentry for Invenio-Logging
6969
SENTRY_DSN = os.environ.get("SENTRY_DSN", None)
7070

71-
LOGGING = {
72-
"version": 1,
73-
"disable_existing_loggers": False,
74-
"formatters": {
75-
"default": {
76-
"format": "[%(asctime)s] %(levelname)s in %(module)s: %(message)s",
77-
},
78-
},
79-
"handlers": {
80-
"console": {
81-
"class": "logging.StreamHandler",
82-
"formatter": "default",
83-
},
84-
},
85-
"root": {
86-
"level": "INFO", # <- Set global level to INFO
87-
"handlers": ["console"],
88-
},
89-
"loggers": {
90-
"invenio": {
91-
"level": "INFO", # <- Set Invenio-specific logging level
92-
"handlers": ["console"],
93-
"propagate": False,
94-
},
95-
},
96-
}
71+
COLD_ACTIVE_ARCHIVING_TRANSFERS_THRESHOLD = 1000
72+
# Maximum number of transfers that should be active at a given moment
73+
COLD_ACTIVE_STAGING_TRANSFERS_THRESHOLD = 50
74+
# Maximum number of transfers that should be active at a given moment
9775

9876
LOGGING_SENTRY_CELERY = os.environ.get("LOGGING_SENTRY_CELERY", False)
9977

cernopendata/modules/pages/schemas.py

Lines changed: 0 additions & 22 deletions
This file was deleted.

0 commit comments

Comments
 (0)