Skip to content

Commit 59a85e1

Browse files
committed
refactoring
1 parent a362f95 commit 59a85e1

File tree

1 file changed

+65
-46
lines changed

1 file changed

+65
-46
lines changed

src/dirac_cwl_proto/job/__init__.py

Lines changed: 65 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import random
77
import tarfile
88
from pathlib import Path
9-
from typing import Any, Optional
9+
from typing import Any
1010

1111
import typer
1212
from cwl_utils.pack import pack
@@ -130,17 +130,67 @@ def submit_job_client(
130130
"[green]:heavy_check_mark:[/green] [bold]CLI:[/bold] Job(s) validated."
131131
)
132132

133+
jobs = validate_jobs(job)
134+
135+
for job in jobs:
136+
# Dump the job model to a file
137+
with open("job.json", "w") as f:
138+
f.write(job.model_dump_json())
139+
140+
# TODO add call to create_sandbox router adding files from parameter and the job.json file
141+
# For now just set hardcoded sandbox_id
142+
sandbox_id = "SB:SandboxSE|/S3/diracx-sandbox-store/isb.tar.bz2"
143+
144+
# Convert job.jspn to jdl
145+
console.print(
146+
"[blue]:information_source:[/blue] [bold]CLI:[/bold] Converting job model to jdl..."
147+
)
148+
convert_to_jdl(job, sandbox_id)
149+
133150
# Submit the job
134151
console.print(
135-
"[blue]:information_source:[/blue] [bold]CLI:[/bold] Submitting the job(s) to service..."
152+
"[blue]:information_source:[/blue] [bold]CLI:[/bold] Submitting the job(s)..."
136153
)
137154
print_json(job.model_dump_json(indent=4))
138-
if not submit_job_router(job, local):
155+
156+
if local:
157+
if not submit_job_router(job):
158+
console.print(
159+
"[red]:heavy_multiplication_x:[/red] [bold]CLI:[/bold] Failed to run job(s)."
160+
)
161+
return typer.Exit(code=1)
139162
console.print(
140-
"[red]:heavy_multiplication_x:[/red] [bold]CLI:[/bold] Failed to run job(s)."
163+
"[green]:heavy_check_mark:[/green] [bold]CLI:[/bold] Job(s) done."
164+
)
165+
else:
166+
# TODO call job/jdl router
167+
console.print(
168+
"[blue]:information_source:[/blue] [bold]CLI:[/bold] Call diracx: jobs/jdl router..."
141169
)
142-
return typer.Exit(code=1)
143-
console.print("[green]:heavy_check_mark:[/green] [bold]CLI:[/bold] Job(s) done.")
170+
171+
172+
def validate_jobs(job: JobSubmissionModel) -> list[JobSubmissionModel]:
173+
console.print(
174+
"[blue]:information_source:[/blue] [bold]CLI:[/bold] Validating the job(s)..."
175+
)
176+
# Initiate 1 job per parameter
177+
jobs = []
178+
if not job.parameters:
179+
jobs.append(job)
180+
else:
181+
for parameter in job.parameters:
182+
jobs.append(
183+
JobSubmissionModel(
184+
task=job.task,
185+
parameters=[parameter],
186+
scheduling=job.scheduling,
187+
execution_hooks=job.execution_hooks,
188+
)
189+
)
190+
console.print(
191+
"[green]:information_source:[/green] [bold]CLI:[/bold] Job(s) validated!"
192+
)
193+
return jobs
144194

145195

146196
def convert_to_jdl(job: JobSubmissionModel, sandbox_id: str) -> None:
@@ -230,7 +280,7 @@ def upload_local_input_files(input_data: dict[str, Any]) -> str | None:
230280
# -----------------------------------------------------------------------------
231281

232282

233-
def submit_job_router(job: JobSubmissionModel, local: Optional[bool] = True) -> bool:
283+
def submit_job_router(job: JobSubmissionModel) -> bool:
234284
"""
235285
Execute a job using the router.
236286
@@ -241,48 +291,17 @@ def submit_job_router(job: JobSubmissionModel, local: Optional[bool] = True) ->
241291
logger = logging.getLogger("JobRouter")
242292

243293
# Validate the jobs
244-
logger.info("Validating the job(s)...")
245-
# Initiate 1 job per parameter
246-
jobs = []
247-
if not job.parameters:
248-
jobs.append(job)
249-
else:
250-
for parameter in job.parameters:
251-
jobs.append(
252-
JobSubmissionModel(
253-
task=job.task,
254-
parameters=[parameter],
255-
scheduling=job.scheduling,
256-
execution_hooks=job.execution_hooks,
257-
)
258-
)
259-
logger.info("Job(s) validated!")
294+
jobs = validate_jobs(job)
260295

261-
# Simulate the submission of the job (just execute the job locally)
262-
logger.info("Submitting jobs...")
296+
# Execute the job locally
297+
logger.info("Executing jobs locally...")
263298
results = []
264299

265300
for job in jobs:
266-
if local:
267-
job_wrapper = JobWrapper()
268-
logger.info("Running job locally:\n")
269-
print_json(job.model_dump_json(indent=4))
270-
results.append(job_wrapper.run_job(job))
271-
logger.info("Jobs done.")
272-
else:
273-
# Dump the job model to a file
274-
with open("job.json", "w") as f:
275-
f.write(job.model_dump_json())
276-
277-
# TODO add call to create_sandbox router adding files from parameter and the job.json file
278-
# For now just set hardcoded sandbox_id
279-
sandbox_id = "SB:SandboxSE|/S3/diracx-sandbox-store/isb.tar.bz2"
280-
281-
# Convert job.jspn to jdl
282-
logger.info("Converting job model to jdl:\n")
283-
convert_to_jdl(job, sandbox_id)
284-
# TODO call job/jdl router
285-
logger.info("Submitting job to jobs/jdl router:\n")
286-
print_json(job.model_dump_json(indent=4))
301+
job_wrapper = JobWrapper()
302+
logger.info("Executing job locally:\n")
303+
print_json(job.model_dump_json(indent=4))
304+
results.append(job_wrapper.run_job(job))
305+
logger.info("Jobs done.")
287306

288307
return all(results)

0 commit comments

Comments
 (0)