Skip to content
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
fc60245
feat(Job Endpoint): add job submission with input data using LFN prefix
Loxeris Sep 24, 2025
d1fd45f
test(Job Endpoint): add test for job submission with input data
Loxeris Sep 24, 2025
240960a
fix: change prefix
Loxeris Sep 29, 2025
5c8ba41
feat: add lfns_input to job input model
Loxeris Sep 30, 2025
edc2182
feat: download lfns in the execution hook
Loxeris Oct 17, 2025
aa06d3c
feat: use location instead of path for lfns
Loxeris Oct 17, 2025
d74c47b
docs: update docstring
Loxeris Oct 20, 2025
05bd754
feat(Job Endpoint): add job output sandboxes
Loxeris Sep 24, 2025
effcc9a
feat(Job Endpoint): add job output data
Loxeris Sep 24, 2025
82de6e7
feat(Job Endpoint): move job output management to Execution Hooks
Loxeris Sep 24, 2025
112980c
fix: fix lint issues
Loxeris Sep 24, 2025
6edc889
style: code cleaning
Loxeris Sep 30, 2025
2f9df75
feat: lfns_output_overrides field
Loxeris Oct 7, 2025
499a27d
style: code cleanup
Loxeris Oct 7, 2025
957af98
test: test outputs
Loxeris Oct 8, 2025
e8acb1a
feat: use DIRAC FileStorage
Loxeris Oct 29, 2025
6020378
chore: ignore DIRAC FileStorage being untyped
Loxeris Oct 29, 2025
f1c1143
feat: add a SandboxStore like client
Loxeris Oct 30, 2025
e599f9b
feat: use DataManager and SandboxStore in job pre-process
Loxeris Oct 30, 2025
07fe25d
feat: add output_paths and output_sandbox hints
Loxeris Nov 3, 2025
da8fa3b
chore: regenerate schemas
Loxeris Nov 3, 2025
924420a
fix: apply pr suggestions and add some docstring
Loxeris Nov 3, 2025
38e20ae
test: test if output hints are used correctly
Loxeris Nov 3, 2025
da46de9
feat: move output_query and input_query to QueryBasedPlugin and clean…
Loxeris Nov 18, 2025
3965240
fix: fix parameter file being set twice
Loxeris Nov 19, 2025
e6c84e8
test: job success for output tests
Loxeris Nov 19, 2025
556e3c5
feat: "mock" DIRAC DataManager
Loxeris Nov 25, 2025
44ae43a
feat: mocks following DIRAC's definition
Loxeris Nov 28, 2025
712f880
fix: lfns start with a `/` and handle uppercase prefix
Loxeris Dec 3, 2025
a3517ce
fix: update output workflows in tests
Loxeris Dec 5, 2025
6d2ad79
feat: add output_se hint
Loxeris Dec 5, 2025
d14ddd8
fix: fix output_se in hook instanciation
Loxeris Dec 5, 2025
49f4ce0
fix: fix sandbox mock
Loxeris Dec 8, 2025
06dcb11
refactor: refactor runtime_metadata to execution_hooks_plugin
Loxeris Dec 9, 2025
3ef7091
fix: fix lfn path for files of the same cwl output
Loxeris Dec 9, 2025
d6df8f3
docs: cleanup docstring
Loxeris Dec 10, 2025
d4f6eb2
fix: various fixes and add logs info for outputs
Loxeris Dec 10, 2025
fef32e8
refactor: move generic methods to JobWrapper
Loxeris Dec 12, 2025
bbc1200
refactor: change environment variable definition
Loxeris Dec 12, 2025
eef88c9
fix: fix sandboxstoreClient used for sandbox downloads
Loxeris Dec 12, 2025
8b9e320
fix: remove duplicate code
Loxeris Dec 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions src/dirac_cwl_proto/data_management_mocks/data_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from pathlib import Path

from DIRAC.DataManagementSystem.Client.DataManager import DataManager # type: ignore[import-untyped]
from DIRAC.Resources.Storage.FileStorage import FileStorage # type: ignore[import-untyped]
from DIRACCommon.Core.Utilities.ReturnValues import S_ERROR, S_OK, returnSingleResult # type: ignore[import-untyped]

from dirac_cwl_proto.data_management_mocks.file_catalog import LocalFileCatalog


class MockDataManager(DataManager):
"""
Mock DIRAC DataManager for local file storage
"""

def __init__(self):
self.base_storage_path = "filecatalog"
self.storage_element = FileStorage("local", {"Path": self.base_storage_path})
self.fileCatalog = LocalFileCatalog()

def getFile(self, lfn, destinationDir=".", sourceSE=None, diskOnly=False):
"""Get local copy of LFN(s) from Storage Elements.

:param mixed lfn: a single LFN or list of LFNs.
:param str destinationDir: directory to which the file(s) will be
downloaded. (Default: current working directory).
:param str sourceSE: source SE from which to download (Default: all replicas will be attempted).
:param bool diskOnly: chooses the disk ONLY replica(s). (Default: False)
:return: S_OK({"Successful": {}, "Failed": {}})/S_ERROR(errMessage).
"""
if isinstance(lfn, list):
lfns = lfn
elif isinstance(lfn, str):
lfns = [lfn]
else:
return S_ERROR(f"wrong type for lfn: {lfn}, expected str or list[str]")

if not sourceSE:
sourceSE = self.storage_element

success = {}
fail = {}
for lfn in lfns:
res = sourceSE.getFile(
str(
Path(self.base_storage_path)
/ str(lfn)
.removeprefix("lfn:")
.removeprefix("LFN:")
.removeprefix("/")
),
destinationDir,
)
if not res["OK"]:
fail[lfn] = res["Message"]
else:
success[lfn] = Path(lfn).name
return S_OK({"Successful": success, "Failed": fail})

def putAndRegister(
self,
lfn,
fileName,
diracSE,
guid=None,
path=None,
checksum=None,
overwrite=None,
):
"""Put a local file to a Storage Element and register in the File Catalogues

'lfn' is the file LFN
'file' is the full path to the local file
'diracSE' is the Storage Element to which to put the file
'guid' is the guid with which the file is to be registered (if not provided will be generated)
'path' is the path on the storage where the file will be put (if not provided the LFN will be used)
'overwrite' removes file from the file catalogue and SE before attempting upload
"""
self.fileCatalog.addFile(lfn)
return self.put(lfn, fileName, diracSE, path)

def put(self, lfn, fileName, diracSE, path=None):
"""Put a local file to a Storage Element

:param self: self reference
:param str lfn: LFN
:param str fileName: the full path to the local file
:param str diracSE: the Storage Element to which to put the file
:param str path: the path on the storage where the file will be put (if not provided the LFN will be used)

"""
se = self.storage_element
if not se:
return S_ERROR("No Storage Element defined")
if not path:
path = str(lfn).removeprefix("lfn:").removeprefix("LFN:").removeprefix("/")
dest = str(Path(self.base_storage_path) / Path(path))
res = returnSingleResult(se.putFile({dest: fileName}))
if not res["OK"]:
return S_OK({"Successful": {}, "Failed": {lfn: res["Message"]}})
return S_OK({"Successful": {lfn: res["Value"]}, "Failed": {}})
79 changes: 79 additions & 0 deletions src/dirac_cwl_proto/data_management_mocks/file_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import json
import time
from pathlib import Path

from DIRAC import S_ERROR, S_OK # type: ignore[import-untyped]
from DIRAC.Resources.Catalog.FileCatalog import FileCatalog # type: ignore[import-untyped]


class InMemoryFileCatalog(FileCatalog):
"""Minimal in-memory FileCatalog compatible with DIRAC DataManager."""

def __init__(self, catalogs=None, vo=None):
self._eligibleCatalogs = {}
self._files = {} # store metadata and logical file names
super(FileCatalog, self).__init__()

def _getEligibleCatalogs(self):
self._eligibleCatalogs = {
"MyMockCatalog": {"Type": "MockFileCatalog", "Backend": "Memory"}
}
return S_OK(self._eligibleCatalogs)

def findFile(self, lfn):
if lfn in self._files:
return S_OK([self._files[lfn]])
return S_ERROR(f"File {lfn} not found")

def addFile(self, lfn, metadata=None):
if lfn in self._files:
return S_ERROR(f"File {lfn} already exists")
self._files[lfn] = {"LFN": lfn, "Metadata": metadata or {}}
return S_OK(lfn)


class LocalFileCatalog(FileCatalog):
def __init__(self, catalogs=None, vo=None):
self._eligibleCatalogs = {
"MyMockCatalog": {"Type": "MockFileCatalog", "Backend": "LocalFileSystem"}
}
self._metadataPath = "filecatalog/metadata.json"
super(FileCatalog, self).__init__()

def _getEligibleCatalogs(self):
return S_OK(self._eligibleCatalogs)

def getFileMetadata(self, lfn):
metaAll = self._getAllMetadata()
if lfn not in metaAll:
return S_OK({"Successful": {}, "Failed": {lfn: f"File {lfn} not found"}})
return S_OK({"Successful": {lfn: metaAll[lfn]}, "Failed": {}})

def addFile(self, lfn):
if lfn in self._getAllMetadata():
return S_ERROR(f"File {lfn} already exists")
self.setMetadata(lfn, {"CreationDate": time.time()})
return S_OK({"Successful": {lfn: True}, "Failed": {}})

def setMetadata(self, lfn, metadataDict):
meta = self._getAllMetadata()
meta[lfn] = metadataDict

try:
self._setAllMetadata(meta)
except Exception as e:
return S_ERROR(f"Could set metadata: {e}")
return S_OK({"Successful": {lfn: True}, "Failed": {}})

def _getAllMetadata(self):
try:
with open(self._metadataPath, "r") as file:
meta = json.load(file)
except Exception:
meta = {}
return meta

def _setAllMetadata(self, metadata):
Path(self._metadataPath).parent.mkdir(parents=True, exist_ok=True)
with open(self._metadataPath, "w+") as file:
json.dump(metadata, file)
91 changes: 91 additions & 0 deletions src/dirac_cwl_proto/data_management_mocks/sandbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import logging
import random
import tarfile
from pathlib import Path
from typing import Optional, Sequence

from DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient import SandboxStoreClient # type: ignore[import-untyped]
from DIRACCommon.Core.Utilities.ReturnValues import S_OK # type: ignore[import-untyped]

logger = logging.getLogger(__name__)


class MockSandboxStoreClient(SandboxStoreClient):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried to "mock" the diracx sandbox store client rather than the DIRAC one? I think it would be preferable since it exists and the new JobWrapper is meant to be integrated to diracx eventually.

https://github.com/DIRACGrid/diracx/blob/47c2b296d0b7b98504c8f142cbe5df67a321df83/diracx-api/src/diracx/api/jobs.py#L51-L125

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't try, it does seem preferable indeed.

"""
Local mock for Dirac's SandboxStore Client.
"""

def __init__(self):
pass

def uploadFilesAsSandbox(
self,
fileList: Sequence[Path | str],
sizeLimit: int = 0,
assignTo: Optional[dict] = None,
):
"""Create and upload a sandbox archive from a list of files.

Packages the provided files into a compressed tar archive and stores
it under the local sandbox directory.

:param Sequence[Path | str] fileList: Files to be included in the sandbox.
:param int sizeLimit: Maximum allowed archive size in bytes. Currently unused.
:param Optional[dict] assignTo: Mapping of job identifiers to sandbox types (e.g. { 'Job:<id>': '<type>' }).

:return S_OK(sandbox_path): Path to the created sandbox archive, or `None` if no files were provided.
"""
if len(fileList) == 0:
return S_OK()
sandbox_id = random.randint(1000, 9999)
sandbox_path = Path("sandboxstore") / f"sandbox_{str(sandbox_id)}.tar.gz"
sandbox_path.parent.mkdir(exist_ok=True, parents=True)
with tarfile.open(sandbox_path, "w:gz") as tar:
for file in fileList:
if not file:
break
if isinstance(file, str):
file = Path(file)
tar.add(file, arcname=file.name)
res = S_OK(str(sandbox_path))
res["SandboxFileName"] = f"sandbox_{str(sandbox_id)}.tar.gz"
return res

def downloadSandbox(
self,
sbLocation: str | Path,
destinationDir: str = "",
inMemory: bool = False,
unpack: bool = True,
) -> list[Path]:
"""Download and extract files from a sandbox archive.

Opens the given sandbox archive and extracts its contents to the specified
directory.

:param str|Path sbLocation: Path to the sandbox archive file.
:param str destinationDir: Directory to extract the files into. Defaults to the current directory.
:param bool inMemory: Placeholder for in-memory extraction.
:param bool unpack: Whether to unpack the archive. Only unpacking is currently supported.

:return S_OK({list[Path]}): List of paths to the extracted files.
"""
if not unpack or inMemory:
raise NotImplementedError
else:
sandbox_path = Path("sandboxstore") / f"{sbLocation}.tar.gz"
with tarfile.open(sandbox_path, "r:gz") as tar:
tar.extractall(destinationDir, filter="data")
files = tar.getnames()
logger.info("Files downloaded successfully!")
return S_OK([str(Path(destinationDir) / file) for file in files])

def downloadSandboxForJob(
self, jobId, sbType, destinationPath="", inMemory=False, unpack=True
) -> None:
"""
Download sandbox contents for a specific job.

Placeholder for future implementation of job-based sandbox retrieval.
"""
raise NotImplementedError
2 changes: 0 additions & 2 deletions src/dirac_cwl_proto/execution_hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
"""

from .core import (
DataCatalogInterface,
ExecutionHooksBasePlugin,
ExecutionHooksHint,
SchedulingHint,
Expand Down Expand Up @@ -36,7 +35,6 @@
"TransformationExecutionHooksHint",
"ExecutionHooksBasePlugin",
"SchedulingHint",
"DataCatalogInterface",
"ExecutionHooksPluginRegistry",
"get_registry",
]
Loading
Loading