forked from DIRACGrid/dirac-cwl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_manager.py
More file actions
100 lines (86 loc) · 3.87 KB
/
data_manager.py
File metadata and controls
100 lines (86 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from pathlib import Path
from DIRAC.DataManagementSystem.Client.DataManager import DataManager # type: ignore[import-untyped]
from DIRAC.Resources.Storage.FileStorage import FileStorage # type: ignore[import-untyped]
from DIRACCommon.Core.Utilities.ReturnValues import S_ERROR, S_OK, returnSingleResult # type: ignore[import-untyped]
from dirac_cwl_proto.data_management_mocks.file_catalog import LocalFileCatalog
class MockDataManager(DataManager):
"""
Mock DIRAC DataManager for local file storage
"""
def __init__(self):
self.base_storage_path = "filecatalog"
self.storage_element = FileStorage("local", {"Path": self.base_storage_path})
self.fileCatalog = LocalFileCatalog()
def getFile(self, lfn, destinationDir=".", sourceSE=None, diskOnly=False):
"""Get local copy of LFN(s) from Storage Elements.
:param mixed lfn: a single LFN or list of LFNs.
:param str destinationDir: directory to which the file(s) will be
downloaded. (Default: current working directory).
:param str sourceSE: source SE from which to download (Default: all replicas will be attempted).
:param bool diskOnly: chooses the disk ONLY replica(s). (Default: False)
:return: S_OK({"Successful": {}, "Failed": {}})/S_ERROR(errMessage).
"""
if isinstance(lfn, list):
lfns = lfn
elif isinstance(lfn, str):
lfns = [lfn]
else:
return S_ERROR(f"wrong type for lfn: {lfn}, expected str or list[str]")
if not sourceSE:
sourceSE = self.storage_element
success = {}
fail = {}
for lfn in lfns:
res = sourceSE.getFile(
str(
Path(self.base_storage_path)
/ str(lfn)
.removeprefix("lfn:")
.removeprefix("LFN:")
.removeprefix("/")
),
destinationDir,
)
if not res["OK"]:
fail[lfn] = res["Message"]
else:
success[lfn] = Path(lfn).name
return S_OK({"Successful": success, "Failed": fail})
def putAndRegister(
self,
lfn,
fileName,
diracSE,
guid=None,
path=None,
checksum=None,
overwrite=None,
):
"""Put a local file to a Storage Element and register in the File Catalogues
'lfn' is the file LFN
'file' is the full path to the local file
'diracSE' is the Storage Element to which to put the file
'guid' is the guid with which the file is to be registered (if not provided will be generated)
'path' is the path on the storage where the file will be put (if not provided the LFN will be used)
'overwrite' removes file from the file catalogue and SE before attempting upload
"""
self.fileCatalog.addFile(lfn)
return self.put(lfn, fileName, diracSE, path)
def put(self, lfn, fileName, diracSE, path=None):
"""Put a local file to a Storage Element
:param self: self reference
:param str lfn: LFN
:param str fileName: the full path to the local file
:param str diracSE: the Storage Element to which to put the file
:param str path: the path on the storage where the file will be put (if not provided the LFN will be used)
"""
se = self.storage_element
if not se:
return S_ERROR("No Storage Element defined")
if not path:
path = str(lfn).removeprefix("lfn:").removeprefix("LFN:").removeprefix("/")
dest = str(Path(self.base_storage_path) / Path(path))
res = returnSingleResult(se.putFile({dest: fileName}))
if not res["OK"]:
return S_OK({"Successful": {}, "Failed": {lfn: res["Message"]}})
return S_OK({"Successful": {lfn: res["Value"]}, "Failed": {}})