Skip to content

Commit beb71c1

Browse files
committed
add constructor
1 parent c96dfbe commit beb71c1

File tree

1 file changed

+18
-22
lines changed

1 file changed

+18
-22
lines changed

src/dirac_cwl_proto/job/job_wrapper.py

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
class JobWrapper:
3636
"""Job Wrapper for the execution hook."""
3737

38+
def __init__(self) -> None:
39+
self.runtime_metadata: ExecutionHooksBasePlugin | None = None
40+
self.job_path: Path = Path()
41+
3842
def __download_input_sandbox(
3943
self, arguments: JobInputModel, job_path: Path
4044
) -> None:
@@ -82,8 +86,6 @@ def _pre_process(
8286
self,
8387
executable: CommandLineTool | Workflow | ExpressionTool,
8488
arguments: JobInputModel | None,
85-
runtime_metadata: ExecutionHooksBasePlugin | None,
86-
job_path: Path,
8789
) -> list[str]:
8890
"""
8991
Pre-process the job before execution.
@@ -97,7 +99,7 @@ def _pre_process(
9799
command = ["cwltool", "--parallel"]
98100

99101
task_dict = save(executable)
100-
task_path = job_path / "task.cwl"
102+
task_path = self.job_path / "task.cwl"
101103
with open(task_path, "w") as task_file:
102104
YAML().dump(task_dict, task_file)
103105
command.append(str(task_path.name))
@@ -106,23 +108,23 @@ def _pre_process(
106108
if arguments.sandbox:
107109
# Download the files from the sandbox store
108110
logger.info("Downloading the files from the sandbox store...")
109-
self.__download_input_sandbox(arguments, job_path)
111+
self.__download_input_sandbox(arguments, self.job_path)
110112
logger.info("Files downloaded successfully!")
111113

112114
# Download input data from the file catalog
113115
logger.info("Downloading input data from the file catalog...")
114-
self.__download_input_data(arguments, job_path)
116+
self.__download_input_data(arguments, self.job_path)
115117
logger.info("Input data downloaded successfully!")
116118

117119
# Prepare the parameters for cwltool
118120
logger.info("Preparing the parameters for cwltool...")
119121
parameter_dict = save(cast(Saveable, arguments.cwl))
120-
parameter_path = job_path / "parameter.cwl"
122+
parameter_path = self.job_path / "parameter.cwl"
121123
with open(parameter_path, "w") as parameter_file:
122124
YAML().dump(parameter_dict, parameter_file)
123125
command.append(str(parameter_path.name))
124-
if runtime_metadata:
125-
return runtime_metadata.pre_process(job_path, command)
126+
if self.runtime_metadata:
127+
return self.runtime_metadata.pre_process(self.job_path, command)
126128

127129
return command
128130

@@ -131,8 +133,6 @@ def _post_process(
131133
status: int,
132134
stdout: str,
133135
stderr: str,
134-
job_path: Path,
135-
runtime_metadata: ExecutionHooksBasePlugin | None,
136136
):
137137
"""
138138
Post-process the job after execution.
@@ -146,8 +146,8 @@ def _post_process(
146146
logger.info(stdout)
147147
logger.info(stderr)
148148

149-
if runtime_metadata:
150-
return runtime_metadata.post_process(job_path)
149+
if self.runtime_metadata:
150+
return self.runtime_metadata.post_process(self.job_path)
151151

152152
return True
153153

@@ -161,29 +161,27 @@ def run_job(self, job: JobSubmissionModel) -> bool:
161161
logger = logging.getLogger("JobWrapper")
162162
# Instantiate runtime metadata from the serializable descriptor and
163163
# the job context so implementations can access task inputs/overrides.
164-
runtime_metadata = (
164+
self.runtime_metadata = (
165165
job.execution_hooks.to_runtime(job) if job.execution_hooks else None
166166
)
167167

168168
# Isolate the job in a specific directory
169-
job_path = Path(".") / "workernode" / f"{random.randint(1000, 9999)}"
170-
job_path.mkdir(parents=True, exist_ok=True)
169+
self.job_path = Path(".") / "workernode" / f"{random.randint(1000, 9999)}"
170+
self.job_path.mkdir(parents=True, exist_ok=True)
171171

172172
try:
173173
# Pre-process the job
174174
logger.info("Pre-processing Task...")
175175
command = self._pre_process(
176176
job.task,
177177
job.parameters[0] if job.parameters else None,
178-
runtime_metadata,
179-
job_path,
180178
)
181179
logger.info("Task pre-processed successfully!")
182180

183181
# Execute the task
184182
logger.info(f"Executing Task: {command}")
185183
result = subprocess.run(
186-
command, capture_output=True, text=True, cwd=job_path
184+
command, capture_output=True, text=True, cwd=self.job_path
187185
)
188186

189187
if result.returncode != 0:
@@ -199,8 +197,6 @@ def run_job(self, job: JobSubmissionModel) -> bool:
199197
result.returncode,
200198
result.stdout,
201199
result.stderr,
202-
job_path,
203-
runtime_metadata,
204200
):
205201
logger.info("Task post-processed successfully!")
206202
return True
@@ -212,5 +208,5 @@ def run_job(self, job: JobSubmissionModel) -> bool:
212208
return False
213209
finally:
214210
# Clean up
215-
if job_path.exists():
216-
shutil.rmtree(job_path)
211+
if self.job_path.exists():
212+
shutil.rmtree(self.job_path)

0 commit comments

Comments
 (0)