Skip to content

Commit 03ad0a2

Browse files
committed
ExecutionHooks is absorbed by TaskRuntimeBasePlugin and renamed to ExecutionHooksBasePlugin
1 parent dd3ff7e commit 03ad0a2

File tree

13 files changed

+146
-180
lines changed

13 files changed

+146
-180
lines changed

src/dirac_cwl_proto/job/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from ruamel.yaml import YAML
2828
from schema_salad.exceptions import ValidationException
2929

30-
from dirac_cwl_proto.metadata.core import TaskRuntimeBasePlugin
30+
from dirac_cwl_proto.metadata.core import ExecutionHooksBasePlugin
3131
from dirac_cwl_proto.submission_models import (
3232
JobParameterModel,
3333
JobSubmissionModel,
@@ -256,7 +256,7 @@ def submit_job_router(job: JobSubmissionModel) -> bool:
256256
def _pre_process(
257257
executable: CommandLineTool | Workflow | ExpressionTool,
258258
arguments: JobParameterModel | None,
259-
runtime_metadata: TaskRuntimeBasePlugin | None,
259+
runtime_metadata: ExecutionHooksBasePlugin | None,
260260
job_path: Path,
261261
) -> list[str]:
262262
"""
@@ -334,7 +334,7 @@ def _post_process(
334334
stdout: str,
335335
stderr: str,
336336
job_path: Path,
337-
runtime_metadata: TaskRuntimeBasePlugin | None,
337+
runtime_metadata: ExecutionHooksBasePlugin | None,
338338
):
339339
"""
340340
Post-process the job after execution.

src/dirac_cwl_proto/metadata/__init__.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,11 @@
1010
from .core import (
1111
DataCatalogInterface,
1212
DataManager,
13-
ExecutionHooks,
13+
ExecutionHooksBasePlugin,
1414
SchedulingHint,
15-
TaskRuntimeBasePlugin,
1615
TransformationDataManager,
1716
)
1817
from .registry import (
19-
MetadataPluginRegistry,
2018
discover_plugins,
2119
get_registry,
2220
)
@@ -32,15 +30,12 @@
3230
pass
3331

3432
__all__ = [
35-
# Core classes
36-
"TaskRuntimeBasePlugin",
37-
"DataCatalogInterface",
33+
# Core metadata and plugins
3834
"DataManager",
39-
"ExecutionHooks",
40-
"SchedulingHint",
4135
"TransformationDataManager",
42-
# Registry functions
43-
"MetadataPluginRegistry",
44-
"discover_plugins",
36+
"ExecutionHooksBasePlugin",
37+
"SchedulingHint",
38+
"DataCatalogInterface",
39+
"MetadataRegistry",
4540
"get_registry",
4641
]

src/dirac_cwl_proto/metadata/core.py

Lines changed: 30 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -19,48 +19,6 @@
1919
T = TypeVar("T", bound="SchedulingHint")
2020

2121

22-
class ExecutionHooks(ABC):
23-
"""Abstract base class for execution hooks.
24-
25-
This class defines the interface for pre/post processing operations
26-
that can be performed during job execution.
27-
"""
28-
29-
@abstractmethod
30-
def pre_process(self, job_path: Path, command: List[str]) -> List[str]:
31-
"""Pre-process job inputs and command.
32-
33-
Parameters
34-
----------
35-
job_path : Path
36-
Path to the job working directory.
37-
command : List[str]
38-
The command to be executed.
39-
40-
Returns
41-
-------
42-
List[str]
43-
Modified command list.
44-
"""
45-
pass
46-
47-
@abstractmethod
48-
def post_process(self, job_path: Path) -> bool:
49-
"""Post-process job outputs.
50-
51-
Parameters
52-
----------
53-
job_path : Path
54-
Path to the job working directory.
55-
56-
Returns
57-
-------
58-
bool
59-
True if post-processing succeeded, False otherwise.
60-
"""
61-
pass
62-
63-
6422
class DataCatalogInterface(ABC):
6523
"""Abstract interface for data catalog operations."""
6624

@@ -120,8 +78,8 @@ def store_output(self, output_name: str, src_path: str) -> None:
12078
logger.info(f"Output {output_name} stored in {dest}")
12179

12280

123-
class TaskRuntimeBasePlugin(BaseModel, DataCatalogInterface, ExecutionHooks):
124-
"""Base class for all runtime plugin models.
81+
class ExecutionHooksBasePlugin(BaseModel, DataCatalogInterface):
82+
"""Base class for all runtime plugin models with execution hooks.
12583
12684
This class combines Pydantic validation with the execution hooks
12785
and data catalog interfaces to provide a complete foundation for runtime plugin implementations.
@@ -151,12 +109,33 @@ def get_metadata_class(cls) -> str:
151109
name = name[:-8] # Remove "Metadata" suffix
152110
return name
153111

154-
def pre_process(self, job_path: Path, command: List[str]) -> List[str]:
155-
"""Default pre-processing: return command unchanged."""
112+
def pre_process(
113+
self, job_path: Path, command: List[str], **kwargs: Any
114+
) -> List[str]:
115+
"""Pre-process job inputs and command.
116+
117+
Parameters
118+
----------
119+
job_path : Path
120+
Path to the job working directory.
121+
command : List[str]
122+
The command to be executed.
123+
124+
Returns
125+
-------
126+
List[str]
127+
Modified command list.
128+
"""
156129
return command
157130

158-
def post_process(self, job_path: Path) -> bool:
159-
"""Default post-processing: always succeed."""
131+
def post_process(self, job_path: Path, **kwargs: Any) -> bool:
132+
"""Post-process job outputs.
133+
134+
Parameters
135+
----------
136+
job_path : Path
137+
Path to the job working directory.
138+
"""
160139
return True
161140

162141
def get_input_query(
@@ -284,11 +263,11 @@ def model_copy(
284263

285264
return super().model_copy(update=merged_update, deep=deep)
286265

287-
def to_runtime(self, submitted: Optional[Any] = None) -> "TaskRuntimeBasePlugin":
266+
def to_runtime(self, submitted: Optional[Any] = None) -> "ExecutionHooksBasePlugin":
288267
"""
289268
Build and instantiate the runtime metadata implementation.
290269
291-
The returned object is an instance of :class:`TaskRuntimeBasePlugin` created
270+
The returned object is an instance of :class:`ExecutionHooksBasePlugin` created
292271
by the metadata registry. The instantiation parameters are constructed
293272
by merging, in order:
294273
@@ -308,7 +287,7 @@ def to_runtime(self, submitted: Optional[Any] = None) -> "TaskRuntimeBasePlugin"
308287
309288
Returns
310289
-------
311-
TaskRuntimeBasePlugin
290+
ExecutionHooksBasePlugin
312291
Runtime plugin implementation instantiated from the registry.
313292
"""
314293
# Import here to avoid circular imports

src/dirac_cwl_proto/metadata/plugins/core.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@
99
from pathlib import Path
1010
from typing import Any, ClassVar, List, Optional, Union
1111

12-
from ..core import TaskRuntimeBasePlugin
12+
from pydantic import Field
1313

14+
from ..core import ExecutionHooksBasePlugin
1415

15-
class UserMetadata(TaskRuntimeBasePlugin):
16+
17+
class UserMetadata(ExecutionHooksBasePlugin):
1618
"""Default user metadata model with no special processing.
1719
1820
This is the simplest metadata model that performs no special input/output
@@ -22,7 +24,7 @@ class UserMetadata(TaskRuntimeBasePlugin):
2224
description: ClassVar[str] = "Basic user metadata with no special processing"
2325

2426

25-
class AdminMetadata(TaskRuntimeBasePlugin):
27+
class AdminMetadata(ExecutionHooksBasePlugin):
2628
"""Administrative metadata model with enhanced logging.
2729
2830
This metadata model provides additional logging and monitoring
@@ -44,34 +46,36 @@ class AdminMetadata(TaskRuntimeBasePlugin):
4446
enable_monitoring: bool = True
4547
admin_level: int = 1
4648

47-
def pre_process(self, job_path: Path, command: List[str]) -> List[str]:
49+
def pre_process(
50+
self, job_path: Path, command: List[str], **kwargs: Any
51+
) -> List[str]:
4852
"""Add logging configuration to command."""
4953
if self.log_level != "INFO":
5054
command.extend(["--log-level", self.log_level])
5155
return command
5256

53-
def post_process(self, job_path: Path) -> bool:
57+
def post_process(self, job_path: Path, **kwargs: Any) -> bool:
5458
"""Enhanced post-processing with monitoring."""
5559
if self.enable_monitoring:
5660
# Could send metrics to monitoring system
5761
pass
5862
return True
5963

6064

61-
class QueryBasedMetadata(TaskRuntimeBasePlugin):
65+
class QueryBasedMetadata(ExecutionHooksBasePlugin):
6266
"""Metadata model that supports query-based input resolution.
6367
6468
This model demonstrates how to implement query-based data discovery
6569
using metadata parameters.
6670
"""
6771

68-
description: ClassVar[str] = "Metadata with query-based input resolution"
72+
description: ClassVar[str] = "Query-based metadata for data discovery"
6973

7074
# Query parameters
71-
query_root: Optional[str] = None
72-
site: Optional[str] = None
73-
campaign: Optional[str] = None
74-
data_type: Optional[str] = None
75+
query_root: str = Field(default="/", description="Root path for queries")
76+
site: Optional[str] = Field(default=None, description="Site to query")
77+
campaign: Optional[str] = Field(default=None, description="Campaign name")
78+
data_type: Optional[str] = Field(default=None, description="Data type")
7579

7680
def get_input_query(
7781
self, input_name: str, **kwargs: Any
@@ -118,7 +122,7 @@ def get_output_query(self, output_name: str) -> Optional[Path]:
118122
return base_path / "default"
119123

120124

121-
class TaskWithMetadataQueryPlugin(TaskRuntimeBasePlugin):
125+
class TaskWithMetadataQueryPlugin(ExecutionHooksBasePlugin):
122126
"""Metadata plugin that demonstrates query-based input resolution.
123127
124128
This class provides methods to query metadata and generate input paths

src/dirac_cwl_proto/metadata/plugins/gaussian.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@
1010
from pathlib import Path
1111
from typing import Any, ClassVar, List, Optional, Union
1212

13-
from ..core import TaskRuntimeBasePlugin
13+
from ..core import ExecutionHooksBasePlugin
1414

1515

16-
class DataGenerationMetadata(TaskRuntimeBasePlugin):
16+
class DataGenerationMetadata(ExecutionHooksBasePlugin):
1717
"""Data generation metadata model for Gaussian fitting.
1818
19-
This model handles generation of test data for Gaussian fitting algorithms.
19+
This model handles generation of test data for Gaussian fitting algorithms.:w
20+
2021
It supports generating multiple output files with configurable names.
2122
2223
Parameters
@@ -43,7 +44,7 @@ def get_output_query(self, output_name: str) -> Optional[Path]:
4344

4445
return base_path / "data-generation"
4546

46-
def post_process(self, job_path: Path) -> bool:
47+
def post_process(self, job_path: Path, **kwargs: Any) -> bool:
4748
"""Post process the generated data files."""
4849
success = False
4950

@@ -71,7 +72,7 @@ def post_process(self, job_path: Path) -> bool:
7172
return success
7273

7374

74-
class GaussianFitMetadata(TaskRuntimeBasePlugin):
75+
class GaussianFitMetadata(ExecutionHooksBasePlugin):
7576
"""Gaussian fitting metadata model.
7677
7778
This model handles Gaussian fitting analysis on generated data sets.
@@ -85,7 +86,7 @@ class GaussianFitMetadata(TaskRuntimeBasePlugin):
8586
Second set of input data files.
8687
"""
8788

88-
description: ClassVar[str] = "Gaussian fitting analysis on data sets"
89+
description: ClassVar[str] = "Gaussian fitting analysis for generated data"
8990

9091
# Input data
9192
data1: Optional[List] = None
@@ -110,7 +111,7 @@ def get_output_query(self, output_name: str) -> Optional[Path]:
110111
return Path("filecatalog") / "gaussian_fit" / "fit"
111112
return None
112113

113-
def post_process(self, job_path: Path) -> bool:
114+
def post_process(self, job_path: Path, **kwargs: Any) -> bool:
114115
"""Post process the fitting results."""
115116
outputs = glob.glob(str(job_path / "fit*"))
116117
if outputs:

src/dirac_cwl_proto/metadata/plugins/lhcb.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
from pydantic import Field
1818
from ruamel.yaml import YAML
1919

20-
from ..core import TaskRuntimeBasePlugin
20+
from ..core import ExecutionHooksBasePlugin
2121

2222

23-
class LHCbMetadata(TaskRuntimeBasePlugin):
23+
class LHCbMetadata(ExecutionHooksBasePlugin):
2424
"""Base metadata model for LHCb experiment.
2525
2626
This class provides common functionality for all LHCb metadata models,
@@ -84,7 +84,9 @@ def get_output_query(self, output_name: str) -> Optional[Path]:
8484

8585
return base / "outputs"
8686

87-
def pre_process(self, job_path: Path, command: List[str]) -> List[str]:
87+
def pre_process(
88+
self, job_path: Path, command: List[str], **kwargs: Any
89+
) -> List[str]:
8890
"""Pre-process LHCb simulation job.
8991
9092
This method calculates the optimal number of events to simulate
@@ -170,7 +172,7 @@ def _update_job_parameters(self, job_path: Path, command: List[str]) -> None:
170172
with open(parameters_path, "w") as f:
171173
YAML().dump(parameter_dict, f)
172174

173-
def post_process(self, job_path: Path) -> bool:
175+
def post_process(self, job_path: Path, **kwargs: Any) -> bool:
174176
"""Post-process LHCb simulation outputs."""
175177
success = True
176178

@@ -248,7 +250,9 @@ def get_output_query(self, output_name: str) -> Optional[Path]:
248250

249251
return base / "outputs"
250252

251-
def pre_process(self, job_dir: Path, command: List[str]) -> List[str]:
253+
def pre_process(
254+
self, job_path: Path, command: List[str], **kwargs: Any
255+
) -> List[str]:
252256
"""Pre-process the job command for reconstruction."""
253257
# Only add LHCb-specific arguments if this is not a CWL workflow execution
254258
# (i.e., when running LHCb applications directly, not cwltool)
@@ -263,7 +267,7 @@ def pre_process(self, job_dir: Path, command: List[str]) -> List[str]:
263267

264268
return command
265269

266-
def post_process(self, job_path: Path) -> bool:
270+
def post_process(self, job_path: Path, **kwargs: Any) -> bool:
267271
"""Post-process LHCb reconstruction outputs."""
268272
success = True
269273

@@ -355,7 +359,9 @@ def get_output_query(self, output_name: str) -> Optional[Path]:
355359

356360
return base / "results"
357361

358-
def pre_process(self, job_path: Path, command: List[str]) -> List[str]:
362+
def pre_process(
363+
self, job_path: Path, command: List[str], **kwargs: Any
364+
) -> List[str]:
359365
"""Pre-process LHCb analysis job."""
360366
# Add analysis-specific parameters
361367
command.extend(["--analysis", self.analysis_name])
@@ -369,7 +375,7 @@ def pre_process(self, job_path: Path, command: List[str]) -> List[str]:
369375

370376
return command
371377

372-
def post_process(self, job_path: Path) -> bool:
378+
def post_process(self, job_path: Path, **kwargs: Any) -> bool:
373379
"""Post-process LHCb analysis outputs."""
374380
success = True
375381

0 commit comments

Comments
 (0)