Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/dirac_cwl_proto/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def _pre_process(

# Prepare the task for cwltool
logger.info("Preparing the task for cwltool...")
command = ["cwltool"]
command = ["cwltool", "--parallel"]

task_dict = save(executable)
task_path = job_path / "task.cwl"
Expand Down
45 changes: 45 additions & 0 deletions test/test_workflows.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
import shutil
import subprocess
import threading
import time
from pathlib import Path
Expand Down Expand Up @@ -269,6 +270,50 @@ def test_run_job_validation_failure(
)


def test_run_job_parallely():
error_margin_percentage = 0.10

# This command forces the process 'dirac-cwl' to execute ONLY in
# one core of the machine, independently of how many there are
# phisically available.
# This simulates a sequential execution of the worklflow.
command = [
"taskset",
"-c",
"0",
"dirac-cwl",
"job",
"submit",
"test/workflows/parallel/description.cwl",
]

start = time.time()
subprocess.run(command)
end = time.time()
sequential_time = end - start

command = [
"dirac-cwl",
"job",
"submit",
"test/workflows/parallel/description.cwl",
]

start = time.time()
subprocess.run(command)
end = time.time()
parallel_time = end - start

min_time = (1 - error_margin_percentage) * sequential_time / 2
max_time = (1 + error_margin_percentage) * sequential_time / 2
# Parallel time should be aproximately half the time.
assert (parallel_time > min_time) and (parallel_time < max_time), (
"Difference between parallel and sequential time is too large",
f"Sequential: {sequential_time} # Parallel: {parallel_time}",
f"Sequential time should be twice the parallel time with an error of {int(error_margin_percentage*100)}%",
)


# -----------------------------------------------------------------------------
# Transformation tests
# -----------------------------------------------------------------------------
Expand Down
61 changes: 61 additions & 0 deletions test/workflows/parallel/description.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
cwlVersion: v1.2
class: Workflow
label: "Monte Carlo Pi Approximation Workflow"
doc: >
This workflow approximates the value of Pi using the Monte Carlo method.
It generates random points in a square and calculates how many fall within
a unit circle inscribed in the square.

# Define the inputs of the workflow
inputs:
num-points:
type: int
doc: "Number of random points to generate for the simulation"
# This number must be big to force the whole simulation+gather take
# a fair amount of time.
#
# - If it's too little, it would be difficult to differentiate between
# a parallel and a sequetial workflow.
# - If it's too large, the test will take too long to complete.
#
# 5.000.000 points returned reasonable results, having a good balance
# between the previously mentioned issues
default: 5000000

# Outputs for this test are not necessary
outputs: []

# Define the steps of the workflow
# Two independent groups of steps (simulate+gather) to force parallel
# execution. Sequential execution time should take aproximately twice
# the time of a parallelly executed one.
steps:
# Group 1
#
simulate1:
in:
num-points: num-points
out: [result_sim]
run: ../pi/pisimulate.cwl

gathering1:
in:
input-data:
source: simulate1/result_sim
out: [pi_result]
run: ../pi/pigather.cwl

# Group 2
#
simulate2:
in:
num-points: num-points
out: [result_sim]
run: ../pi/pisimulate.cwl

gathering2:
in:
input-data:
source: simulate2/result_sim
out: [pi_result]
run: ../pi/pigather.cwl
Loading