Skip to content

Commit c2677c0

Browse files
committed
Issue #27 : Added ResourceRequirement validator (draft/idea)
1 parent 1ba6194 commit c2677c0

File tree

2 files changed

+134
-2
lines changed

2 files changed

+134
-2
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
from abc import ABC, abstractmethod
2+
from pydantic import BaseModel, ConfigDict
3+
from cwl_utils.parser.cwl_v1_2 import Workflow, CommandLineTool, WorkflowStep
4+
from typing import ClassVar
5+
6+
class RequirementValidator(BaseModel, ABC):
7+
model_config = ConfigDict(validate_assignment=True, arbitrary_types_allowed=True)
8+
9+
requirement_class: ClassVar[str]
10+
cwl_object: Workflow | CommandLineTool | WorkflowStep # TODO: might need to add ExpressionTool later? and NestedWorkflow?
11+
12+
def get_requirement(self, cwl_object: Workflow | CommandLineTool | WorkflowStep):
13+
"""
14+
Extract the requirement from the current cwl_object,
15+
based on the requirement class we want.
16+
17+
:param cwl_object: The cwl_object to extract the requirement from.
18+
:return: The requirement object, or None if not found.
19+
"""
20+
requirements = getattr(cwl_object, "requirements", []) or []
21+
for requirement in requirements:
22+
if requirement.class_ == self.requirement_class:
23+
return requirement
24+
return None
25+
26+
@abstractmethod
27+
def validate_requirement(self, requirement, context: str = "", global_requirement = None):
28+
"""
29+
Validate a requirement, specific for each requirement class.
30+
31+
:param requirement: The current requirement to validate.
32+
:param context: A context string describing the validation context. Ex: "Step requirement", "Global requirement", etc.
33+
:param global_requirement: The global Workflow requirement, if any.
34+
"""
35+
pass
36+
37+
def validate(self):
38+
"""
39+
Validate requirements in Workflow, WorkflowStep and WorkflowStep.run.
40+
# TODO: might need to add ExpressionTool later and also, find a way to validate NestedWorkflow?
41+
"""
42+
global_requirement = self.get_requirement(self.cwl_object)
43+
44+
# Validate Workflow/CommandLineTool global requirements.
45+
if global_requirement:
46+
self.validate_requirement(global_requirement, context="global requirements")
47+
48+
# Validate WorkflowStep requirements, if any.
49+
if self.cwl_object.class_ != 'CommandLineTool' and self.cwl_object.steps:
50+
for step in self.cwl_object.steps:
51+
step_requirement = self.get_requirement(step)
52+
if step_requirement:
53+
self.validate_requirement(step_requirement,
54+
context="step requirements",
55+
global_requirement=global_requirement)
56+
# Validate WorkflowStep.run, if any.
57+
if step.run:
58+
step_run_requirement = self.get_requirement(step.run)
59+
if step_run_requirement:
60+
self.validate_requirement(step_run_requirement,
61+
context="step run requirements",
62+
global_requirement=global_requirement)
63+
64+
65+
class ResourceRequirementValidator(RequirementValidator):
66+
requirement_class = "ResourceRequirement"
67+
68+
@staticmethod
69+
def validate_minmax(min_value, max_value, resource, context):
70+
"""
71+
Check if the resource min_value is higher than the resource max_value.
72+
If so, raise a ValueError.
73+
74+
:param min_value: The current resource min_value.
75+
:param max_value: The current resource max_value.
76+
:param resource: The resource name.
77+
:param context: A context string describing the validation context. Ex: "Step requirement", "Global requirement", etc.
78+
"""
79+
if min_value and max_value and min_value > max_value:
80+
raise ValueError(f"{resource}Min is higher than {resource}Max in {context}")
81+
82+
@staticmethod
83+
def validate_conflict(min_value, global_max_value, resource, context):
84+
"""
85+
Check if the resource min_value is higher than the global resource max_value.
86+
If so, raise a ValueError.
87+
88+
:param min_value: The current resource min_value.
89+
:param global_max_value: The global resource max_value.
90+
:param resource: The resource name.
91+
:param context: A context string describing the validation context. Ex: "Step requirement", "Global requirement", etc.
92+
"""
93+
if min_value and global_max_value and min_value > global_max_value:
94+
raise ValueError(f"{resource}Min is higher than global {resource}Max in {context}")
95+
96+
def validate_requirement(self, requirement, context: str = "", global_requirement = None):
97+
"""
98+
Validate a ResourceRequirement.
99+
Verify:
100+
- that resourceMin is not higher than resourceMax (CommandLineTool, Workflow, WorkflowStep, WorkflowStep.run)
101+
- that resourceMin (WorkflowStep, WorkflowStep.run) is not higher than global (Workflow) resourceMax.
102+
103+
:param requirement: The current ResourceRequirement to validate.
104+
:param context: A context string describing the validation context. Ex: "Step requirement", "Global requirement", etc.
105+
:param global_requirement: The global Workflow requirement, if any.
106+
"""
107+
self.validate_minmax(requirement.ramMin, requirement.ramMax, "ram", context)
108+
self.validate_minmax(requirement.coresMin, requirement.coresMax, "cores", context)
109+
110+
if global_requirement:
111+
# Only WorkflowStep and WorkflowStep.run cases
112+
self.validate_conflict(requirement.ramMin, global_requirement.ramMax, "ram", context)
113+
self.validate_conflict(requirement.coresMin, global_requirement.coresMax, "cores", context)

src/dirac_cwl_proto/job/__init__.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@
2727
from ruamel.yaml import YAML
2828
from schema_salad.exceptions import ValidationException
2929

30-
from dirac_cwl_proto.execution_hooks.core import ExecutionHooksBasePlugin
30+
from dirac_cwl_proto.execution_hooks import (
31+
ExecutionHooksBasePlugin,
32+
)
33+
from dirac_cwl_proto.execution_hooks.validator import (
34+
ResourceRequirementValidator,
35+
)
3136
from dirac_cwl_proto.submission_models import (
3237
JobInputModel,
3338
JobSubmissionModel,
@@ -220,12 +225,27 @@ def submit_job_router(job: JobSubmissionModel) -> bool:
220225

221226
# Validate the jobs
222227
logger.info("Validating the job(s)...")
228+
229+
try:
230+
#TODO: I don't know if it's the best way to do that ?
231+
# If we keep this class-idea, maybe later we could do a list of RequirementClass for each one we want to validate for the workflow
232+
# and loop on it to call validate() on each one? Without calling them manyally like this:
233+
ResourceRequirementValidator(cwl_object=job.task).validate()
234+
except ValueError as ex:
235+
#TODO: I don't really know how to handle this to match with the current test?
236+
# because it seems that I need to raise a ValidationException for 'test_run_job_validation_failure' (maybe I'm wrong on that)
237+
# the way I did it is kinda bad here because it raises a ValueError and a ValidationException for the same exception...
238+
# we have like 3 traceback when trying to submit a bad job using the CLI
239+
logger.exception(f"RequirementValidationError: {ex}")
240+
raise ValidationException(f"RequirementValidationError: {ex}")
241+
223242
# Initiate 1 job per parameter
224243
jobs = []
225244
if not job.parameters:
226245
jobs.append(job)
227246
else:
228247
for parameter in job.parameters:
248+
print(parameter)
229249
jobs.append(
230250
JobSubmissionModel(
231251
task=job.task,
@@ -247,7 +267,6 @@ def submit_job_router(job: JobSubmissionModel) -> bool:
247267

248268
return all(results)
249269

250-
251270
# -----------------------------------------------------------------------------
252271
# JobWrapper
253272
# -----------------------------------------------------------------------------

0 commit comments

Comments
 (0)