Skip to content

Commit 68cf5fd

Browse files
authored
Merge pull request #48 from AcquaDiGiorgio/issue-43-parallel-worflows
Parallel workflow execution
2 parents a83df69 + 1f4bc59 commit 68cf5fd

File tree

3 files changed

+107
-1
lines changed

3 files changed

+107
-1
lines changed

src/dirac_cwl_proto/job/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ def _pre_process(
268268

269269
# Prepare the task for cwltool
270270
logger.info("Preparing the task for cwltool...")
271-
command = ["cwltool"]
271+
command = ["cwltool", "--parallel"]
272272

273273
task_dict = save(executable)
274274
task_path = job_path / "task.cwl"

test/test_workflows.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import re
22
import shutil
3+
import subprocess
34
import threading
45
import time
56
from pathlib import Path
@@ -242,6 +243,50 @@ def test_run_job_validation_failure(
242243
)
243244

244245

246+
def test_run_job_parallely():
247+
error_margin_percentage = 0.10
248+
249+
# This command forces the process 'dirac-cwl' to execute ONLY in
250+
# one core of the machine, independently of how many there are
251+
# phisically available.
252+
# This simulates a sequential execution of the worklflow.
253+
command = [
254+
"taskset",
255+
"-c",
256+
"0",
257+
"dirac-cwl",
258+
"job",
259+
"submit",
260+
"test/workflows/parallel/description.cwl",
261+
]
262+
263+
start = time.time()
264+
subprocess.run(command)
265+
end = time.time()
266+
sequential_time = end - start
267+
268+
command = [
269+
"dirac-cwl",
270+
"job",
271+
"submit",
272+
"test/workflows/parallel/description.cwl",
273+
]
274+
275+
start = time.time()
276+
subprocess.run(command)
277+
end = time.time()
278+
parallel_time = end - start
279+
280+
min_time = (1 - error_margin_percentage) * sequential_time / 2
281+
max_time = (1 + error_margin_percentage) * sequential_time / 2
282+
# Parallel time should be aproximately half the time.
283+
assert (parallel_time > min_time) and (parallel_time < max_time), (
284+
"Difference between parallel and sequential time is too large",
285+
f"Sequential: {sequential_time} # Parallel: {parallel_time}",
286+
f"Sequential time should be twice the parallel time with an error of {int(error_margin_percentage*100)}%",
287+
)
288+
289+
245290
# -----------------------------------------------------------------------------
246291
# Transformation tests
247292
# -----------------------------------------------------------------------------
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
cwlVersion: v1.2
2+
class: Workflow
3+
label: "Monte Carlo Pi Approximation Workflow"
4+
doc: >
5+
This workflow approximates the value of Pi using the Monte Carlo method.
6+
It generates random points in a square and calculates how many fall within
7+
a unit circle inscribed in the square.
8+
9+
# Define the inputs of the workflow
10+
inputs:
11+
num-points:
12+
type: int
13+
doc: "Number of random points to generate for the simulation"
14+
# This number must be big to force the whole simulation+gather take
15+
# a fair amount of time.
16+
#
17+
# - If it's too little, it would be difficult to differentiate between
18+
# a parallel and a sequetial workflow.
19+
# - If it's too large, the test will take too long to complete.
20+
#
21+
# 5.000.000 points returned reasonable results, having a good balance
22+
# between the previously mentioned issues
23+
default: 5000000
24+
25+
# Outputs for this test are not necessary
26+
outputs: []
27+
28+
# Define the steps of the workflow
29+
# Two independent groups of steps (simulate+gather) to force parallel
30+
# execution. Sequential execution time should take aproximately twice
31+
# the time of a parallelly executed one.
32+
steps:
33+
# Group 1
34+
#
35+
simulate1:
36+
in:
37+
num-points: num-points
38+
out: [result_sim]
39+
run: ../pi/pisimulate.cwl
40+
41+
gathering1:
42+
in:
43+
input-data:
44+
source: simulate1/result_sim
45+
out: [pi_result]
46+
run: ../pi/pigather.cwl
47+
48+
# Group 2
49+
#
50+
simulate2:
51+
in:
52+
num-points: num-points
53+
out: [result_sim]
54+
run: ../pi/pisimulate.cwl
55+
56+
gathering2:
57+
in:
58+
input-data:
59+
source: simulate2/result_sim
60+
out: [pi_result]
61+
run: ../pi/pigather.cwl

0 commit comments

Comments
 (0)