Skip to content

Commit b4b1244

Browse files
committed
First version of the cold storage interface. New actions to request files, see the requests and subscribe
1 parent 97c9c31 commit b4b1244

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2529
-97
lines changed

Dockerfile

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ FROM --platform=$BUILDPLATFORM registry.cern.ch/inveniosoftware/almalinux:1
2929
# Use XRootD 5.8.1
3030
ENV XROOTD_VERSION=5.8.1
3131

32+
# Install the CERN CA
33+
COPY docker/carepo.repo /etc/yum.repos.d/
34+
35+
RUN yum install -y ca_CERN-Root-2 && yum clean -y all
36+
3237
# Install CERN Open Data Portal web node pre-requisites
3338
# hadolint ignore=DL3033
3439
RUN yum install -y \
@@ -42,8 +47,9 @@ RUN yum install -y \
4247
yum groupinstall -y "Development Tools" && \
4348
yum clean -y all
4449

50+
# hadolint ignore=DL3033
4551
RUN echo "Will install xrootd version: $XROOTD_VERSION (latest if empty)" && \
46-
yum install -y xrootd-"$XROOTD_VERSION" python3-xrootd-"$XROOTD_VERSION" && \
52+
yum install -y xrootd-"$XROOTD_VERSION" python3-xrootd-"$XROOTD_VERSION" swig python3-gfal2-util gfal2-plugin-http python3-gfal2 && \
4753
yum clean -y all
4854

4955
RUN pip uninstall pipenv -y && pip install --no-cache-dir --upgrade pip==24.3.1 setuptools==70.0.0 wheel==0.45.1 && \
@@ -72,13 +78,16 @@ ENV PATH=$PATH:${INVENIO_INSTANCE_PATH}/python/bin
7278

7379
# Add CERN Open Data Portal sources to `code` and work there
7480
WORKDIR ${CODE_DIR}
81+
COPY . ${CODE_DIR}
82+
USER root
83+
RUN chown -R "${INVENIO_USER_ID}":root "${CODE_DIR}"
84+
USER ${INVENIO_USER_ID}
7585

7686
# Debug off by default
7787
ARG DEBUG=""
7888
ENV DEBUG=${DEBUG:-""}
7989

8090
# Install CERN Open Data Portal sources
81-
COPY . ${CODE_DIR}
8291
# hadolint ignore=DL3013,SC2086
8392
RUN git config --global url.https://github.com/.insteadOf git://github.com/ && if [ "$DEBUG" ]; then FLAGS="-e"; fi && \
8493
pip install --no-cache-dir --user ${FLAGS} ".[all]" && pip check

MANIFEST.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,4 @@ recursive-include sentry *.json
4545
recursive-include sentry *.py
4646
recursive-include sentry *.sh
4747
recursive-include sentry *.yml
48+
recursive-include docker *.repo

cernopendata/api.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def __getitem__(self, key):
4545
"""Get a specific file."""
4646
obj = FileIndexMetadata.get(self.record, key)
4747
if obj:
48-
return self.file_cls(obj, self.filesmap.get(obj.key, {}))
48+
return self.file_cls(obj, self.file_indices.get(obj.key, {}))
4949
raise KeyError(key)
5050

5151
def flush(self):
@@ -98,6 +98,17 @@ def check_availability(self):
9898
else:
9999
self["availability"] = "sample files"
100100

101+
def flush_indices(self):
102+
"""Updates the _file_indices information based on what exists on the database."""
103+
print("Updating the record with file indices")
104+
self["_file_indices"] = []
105+
# First, let's get all the file indices that this record has
106+
for elem in BucketTag.query.filter_by(value=str(self.id), key="record").all():
107+
self["_file_indices"].append(
108+
FileIndexMetadata.get(None, str(elem.bucket)).dumps()
109+
)
110+
self.check_availability()
111+
101112

102113
class FileIndexMetadata:
103114
"""Class for the FileIndexMetadata."""
@@ -109,6 +120,7 @@ def __init__(self):
109120
self._size = 0
110121
self._files = []
111122
self._description = ""
123+
self._bucket = ""
112124

113125
def __repr__(self):
114126
"""Representation of the object."""
@@ -131,6 +143,7 @@ def create(cls, record, file_object, description=""):
131143
rb._description = description
132144
BucketTag.create(rb._bucket, "index_name", index_file_name)
133145
BucketTag.create(rb._bucket, "record", record.model.id)
146+
BucketTag.create(rb._bucket, "description", description)
134147
print(f"The file index contains {len(index_content)} entries.")
135148
for entry in index_content:
136149
entry_file = FileInstance.create()
@@ -162,9 +175,13 @@ def get(cls, record_id, bucket_id):
162175
.one()
163176
.value
164177
)
165-
bucket = Bucket.get(bucket_id)
166-
for o in ObjectVersion.get_by_bucket(bucket).all():
167-
f = FileObject(o, {})
178+
tag = BucketTag.query.filter_by(
179+
bucket_id=str(bucket_id), key="description"
180+
).first()
181+
obj._description = tag.value if tag else obj._index_file_name
182+
obj._bucket = Bucket.get(bucket_id)
183+
for o in ObjectVersion.get_by_bucket(obj._bucket).all():
184+
f = MultiURIFileObject(o, {})
168185
# Let's put also the uri
169186
f["uri"] = FileInstance.get(str(o.file_id)).uri
170187
f["filename"] = f["uri"].split("/")[-1]
@@ -196,6 +213,7 @@ def dumps(self):
196213
"size": self._size,
197214
"files": files,
198215
"description": self._description,
216+
"bucket": str(self._bucket),
199217
}
200218

201219
def flush(self):

cernopendata/cold_storage/__init__.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# This file is part of CERN Open Data Portal.
4+
# Copyright (C) 2017-2025 CERN.
5+
#
6+
# CERN Open Data Portal is free software; you can redistribute it
7+
# and/or modify it under the terms of the GNU General Public License as
8+
# published by the Free Software Foundation; either version 2 of the
9+
# License, or (at your option) any later version.
10+
#
11+
# CERN Open Data Portal is distributed in the hope that it will be
12+
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14+
# General Public License for more details.
15+
#
16+
# You should have received a copy of the GNU General Public License
17+
# along with CERN Open Data Portal; if not, write to the
18+
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
19+
# MA 02111-1307, USA.
20+
#
21+
# In applying this license, CERN does not
22+
# waive the privileges and immunities granted to it by virtue of its status
23+
# as an Intergovernmental Organization or submit itself to any jurisdiction.
24+
"""Cold Storage."""

cernopendata/cold_storage/api.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# This file is part of CERN Open Data Portal.
4+
# Copyright (C) 2017-2025 CERN.
5+
#
6+
# CERN Open Data Portal is free software; you can redistribute it
7+
# and/or modify it under the terms of the GNU General Public License as
8+
# published by the Free Software Foundation; either version 2 of the
9+
# License, or (at your option) any later version.
10+
#
11+
# CERN Open Data Portal is distributed in the hope that it will be
12+
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14+
# General Public License for more details.
15+
#
16+
# You should have received a copy of the GNU General Public License
17+
# along with CERN Open Data Portal; if not, write to the
18+
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
19+
# MA 02111-1307, USA.
20+
#
21+
# In applying this license, CERN does not
22+
# waive the privileges and immunities granted to it by virtue of its status
23+
# as an Intergovernmental Organization or submit itself to any jurisdiction.
24+
25+
"""Cold Storage API."""
26+
import json
27+
from collections import OrderedDict
28+
29+
from invenio_files_rest.models import (
30+
Bucket,
31+
BucketTag,
32+
FileInstance,
33+
ObjectVersion,
34+
ObjectVersionTag,
35+
)
36+
from invenio_records_files.api import FileObject, FilesIterator, Record
37+
from invenio_records_files.models import RecordsBuckets
38+
39+
40+
class FileObjectCold(FileObject):
41+
"""Overwrite the fileobject to get multiple URI."""
42+
43+
@classmethod
44+
def create(self, bucket, filename, file_id):
45+
"""Create a cold file object."""
46+
return ObjectVersion.create(bucket, filename, file_id)
47+
48+
def dumps(self):
49+
"""This one has the information about the cold URI stored in a ObjectVersionTag."""
50+
info = super(FileObjectCold, self).dumps()
51+
info["tags"] = {}
52+
for tagName in ("uri_cold", "hot_deleted"):
53+
tag = ObjectVersionTag.get(str(self.obj.version_id), tagName)
54+
if tag:
55+
info["tags"][tagName] = tag.value
56+
if "availability" in self.data:
57+
del self.data["availability"]
58+
info["availability"] = self.availability
59+
if "uri" not in info:
60+
file = FileInstance.get(str(self.obj.file_id))
61+
info["uri"] = file.uri
62+
return info
63+
64+
@property
65+
def availability(self):
66+
"""Describe the QoS of the file."""
67+
if "availability" not in self.data:
68+
avl = "ready"
69+
for t in self.obj.tags:
70+
if t.key == "hot_deleted":
71+
avl = "needs request"
72+
self.data["availability"] = avl
73+
return self.data["availability"]

cernopendata/cold_storage/catalog.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# This file is part of CERN Open Data Portal.
4+
# Copyright (C) 2017-2025 CERN.
5+
#
6+
# CERN Open Data Portal is free software; you can redistribute it
7+
# and/or modify it under the terms of the GNU General Public License as
8+
# published by the Free Software Foundation; either version 2 of the
9+
# License, or (at your option) any later version.
10+
#
11+
# CERN Open Data Portal is distributed in the hope that it will be
12+
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14+
# General Public License for more details.
15+
#
16+
# You should have received a copy of the GNU General Public License
17+
# along with CERN Open Data Portal; if not, write to the
18+
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
19+
# MA 02111-1307, USA.
20+
#
21+
# In applying this license, CERN does not
22+
# waive the privileges and immunities granted to it by virtue of its status
23+
# as an Intergovernmental Organization or submit itself to any jurisdiction.
24+
25+
"""Cold Storage Catalog."""
26+
27+
from datetime import datetime
28+
29+
from invenio_db import db
30+
from invenio_files_rest.models import FileInstance, ObjectVersion, ObjectVersionTag
31+
from invenio_indexer.api import RecordIndexer
32+
from invenio_pidstore.models import PersistentIdentifier
33+
34+
from cernopendata.api import RecordFilesWithIndex
35+
36+
37+
class Catalog:
38+
"""Class to interact with the repository."""
39+
40+
def __init__(self, debug=False):
41+
"""Initialize the catalog."""
42+
self._indexer = RecordIndexer()
43+
self._debug = debug
44+
self._reindex_queue = []
45+
46+
def get_record(self, record_uuid):
47+
"""First, lets get the record."""
48+
try:
49+
record = RecordFilesWithIndex.get_record(record_uuid)
50+
except Exception as e:
51+
print(f"Couldn't find a record with the id '{record_uuid}': {e}")
52+
return
53+
return record
54+
55+
def get_files_from_record(self, record, limit=None):
56+
"""Getting the files from a record."""
57+
files = []
58+
if self._debug:
59+
print("DEBUG: The catalog got the record:", record.to_dict())
60+
if record:
61+
if "_files" in record:
62+
files += record["_files"]
63+
if "_file_indices" in record:
64+
for f in record["_file_indices"]:
65+
files += f["files"]
66+
if limit:
67+
start = 0
68+
end = len(files)
69+
if limit < 0:
70+
start = -limit
71+
else:
72+
end = limit
73+
files = files[start:end]
74+
if self._debug:
75+
print("DEBUG: And the list of files are:", files)
76+
return files
77+
78+
def clear_hot(self, record, file_id):
79+
"""Marking the hot copy as deleted."""
80+
81+
def _clear_hot_function(version_id):
82+
"""Create a tag for the file identifying that the copy is not available."""
83+
ObjectVersionTag.create(version_id, "hot_deleted", str(datetime.now()))
84+
85+
return self._update_file_and_reindex(record.id, file_id, _clear_hot_function)
86+
87+
def _update_file_and_reindex(self, record_uuid, file_id, update_function):
88+
"""Function to update the repository."""
89+
f = FileInstance.get(file_id)
90+
if not f:
91+
print(f"Can't find that file :( {file_id}")
92+
return False
93+
objectVersion = ObjectVersion.query.filter_by(file_id=f.id).one_or_none()
94+
if not objectVersion:
95+
print(f"Can't find the object associated to that file :( {file_id}")
96+
return False
97+
update_function(objectVersion.version_id)
98+
db.session.commit()
99+
if record_uuid not in self._reindex_queue:
100+
print(f"Record {record_uuid} will be reindexed")
101+
self._reindex_queue += [record_uuid]
102+
return True
103+
104+
def reindex_entries(self):
105+
"""Reindexes all the entries that have been modified."""
106+
while len(self._reindex_queue) > 0:
107+
record_uuid = self._reindex_queue.pop(0)
108+
print(f"Ready to reindex {record_uuid}")
109+
record = RecordFilesWithIndex.get_record(record_uuid)
110+
if not record:
111+
print(f"Couldn't find that record '{record_uuid}'")
112+
continue
113+
print("Got the object from the database")
114+
record.files.flush()
115+
record.flush_indices()
116+
record.commit()
117+
db.session.commit()
118+
try:
119+
self._indexer.index(record)
120+
except Exception as e:
121+
print(f"Error during the reindex {e}")
122+
try:
123+
record.commit()
124+
self._indexer.index(record)
125+
print("The second time worked!")
126+
except Exception as e:
127+
print(f"Doing it again did not help :( {e}")
128+
129+
def add_copy(self, record_uuid, file_id, new_qos, new_filename):
130+
"""Adds a copy to a particular file. It reindexes the record."""
131+
132+
def _add_copy_function(version_id):
133+
"""Function to add a file tag with a new uri for the file."""
134+
if new_qos == "cold":
135+
ObjectVersionTag.create_or_update(version_id, "uri_cold", new_filename)
136+
elif new_qos == "hot":
137+
ObjectVersionTag.delete(version_id, "hot_deleted")
138+
139+
return self._update_file_and_reindex(record_uuid, file_id, _add_copy_function)

0 commit comments

Comments
 (0)