Skip to content

Commit 595a1a1

Browse files
Upgrade terminate function (#715)
* Remove default terminate function * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add function to shutdown tasks * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Only try to delete the job if it was not finished before * validate status * extend flux test * selection function * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix types * return none * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * restructure * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * ignore type * fix mypy --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent a4cac80 commit 595a1a1

8 files changed

Lines changed: 90 additions & 4 deletions

File tree

executorlib/executor/flux.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
check_plot_dependency_graph,
1010
check_pmi,
1111
check_refresh_rate,
12+
check_terminate_tasks_on_shutdown,
1213
validate_number_of_cores,
1314
)
1415
from executorlib.task_scheduler.interactive.blockallocation import (
@@ -63,6 +64,7 @@ class FluxJobExecutor(BaseExecutor):
6364
debugging purposes and to get an overview of the specified dependencies.
6465
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
6566
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
67+
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
6668
6769
Examples:
6870
```
@@ -103,6 +105,7 @@ def __init__(
103105
plot_dependency_graph: bool = False,
104106
plot_dependency_graph_filename: Optional[str] = None,
105107
log_obj_size: bool = False,
108+
terminate_tasks_on_shutdown: bool = True,
106109
):
107110
"""
108111
The executorlib.FluxJobExecutor leverages either the message passing interface (MPI), the SLURM workload manager
@@ -148,6 +151,7 @@ def __init__(
148151
debugging purposes and to get an overview of the specified dependencies.
149152
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
150153
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
154+
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
151155
152156
"""
153157
default_resource_dict: dict = {
@@ -163,6 +167,9 @@ def __init__(
163167
resource_dict.update(
164168
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
165169
)
170+
check_terminate_tasks_on_shutdown(
171+
terminate_tasks_on_shutdown=terminate_tasks_on_shutdown
172+
)
166173
if not disable_dependencies:
167174
super().__init__(
168175
executor=DependencyTaskScheduler(
@@ -248,6 +255,7 @@ class FluxClusterExecutor(BaseExecutor):
248255
debugging purposes and to get an overview of the specified dependencies.
249256
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
250257
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
258+
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
251259
252260
Examples:
253261
```
@@ -285,6 +293,7 @@ def __init__(
285293
plot_dependency_graph: bool = False,
286294
plot_dependency_graph_filename: Optional[str] = None,
287295
log_obj_size: bool = False,
296+
terminate_tasks_on_shutdown: bool = True,
288297
):
289298
"""
290299
The executorlib.FluxClusterExecutor leverages either the message passing interface (MPI), the SLURM workload
@@ -327,6 +336,7 @@ def __init__(
327336
debugging purposes and to get an overview of the specified dependencies.
328337
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
329338
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
339+
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
330340
331341
"""
332342
default_resource_dict: dict = {
@@ -366,6 +376,7 @@ def __init__(
366376
block_allocation=block_allocation,
367377
init_function=init_function,
368378
disable_dependencies=disable_dependencies,
379+
terminate_tasks_on_shutdown=terminate_tasks_on_shutdown,
369380
)
370381
)
371382
else:

executorlib/executor/slurm.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
check_log_obj_size,
77
check_plot_dependency_graph,
88
check_refresh_rate,
9+
check_terminate_tasks_on_shutdown,
910
validate_number_of_cores,
1011
)
1112
from executorlib.task_scheduler.interactive.blockallocation import (
@@ -60,6 +61,7 @@ class SlurmClusterExecutor(BaseExecutor):
6061
debugging purposes and to get an overview of the specified dependencies.
6162
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
6263
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
64+
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
6365
6466
Examples:
6567
```
@@ -97,6 +99,7 @@ def __init__(
9799
plot_dependency_graph: bool = False,
98100
plot_dependency_graph_filename: Optional[str] = None,
99101
log_obj_size: bool = False,
102+
terminate_tasks_on_shutdown: bool = True,
100103
):
101104
"""
102105
The executorlib.SlurmClusterExecutor leverages either the message passing interface (MPI), the SLURM workload
@@ -139,6 +142,7 @@ def __init__(
139142
debugging purposes and to get an overview of the specified dependencies.
140143
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
141144
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
145+
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
142146
143147
"""
144148
default_resource_dict: dict = {
@@ -178,6 +182,7 @@ def __init__(
178182
block_allocation=block_allocation,
179183
init_function=init_function,
180184
disable_dependencies=disable_dependencies,
185+
terminate_tasks_on_shutdown=terminate_tasks_on_shutdown,
181186
)
182187
)
183188
else:
@@ -244,6 +249,7 @@ class SlurmJobExecutor(BaseExecutor):
244249
debugging purposes and to get an overview of the specified dependencies.
245250
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
246251
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
252+
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
247253
248254
Examples:
249255
```
@@ -280,6 +286,7 @@ def __init__(
280286
plot_dependency_graph: bool = False,
281287
plot_dependency_graph_filename: Optional[str] = None,
282288
log_obj_size: bool = False,
289+
terminate_tasks_on_shutdown: bool = True,
283290
):
284291
"""
285292
The executorlib.SlurmJobExecutor leverages either the message passing interface (MPI), the SLURM workload
@@ -325,6 +332,7 @@ def __init__(
325332
debugging purposes and to get an overview of the specified dependencies.
326333
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
327334
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
335+
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
328336
329337
"""
330338
default_resource_dict: dict = {
@@ -340,6 +348,9 @@ def __init__(
340348
resource_dict.update(
341349
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
342350
)
351+
check_terminate_tasks_on_shutdown(
352+
terminate_tasks_on_shutdown=terminate_tasks_on_shutdown
353+
)
343354
if not disable_dependencies:
344355
super().__init__(
345356
executor=DependencyTaskScheduler(

executorlib/standalone/inputcheck.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,3 +212,15 @@ def check_log_obj_size(log_obj_size: bool) -> None:
212212
"log_obj_size is not supported for the executorlib.SlurmClusterExecutor and executorlib.FluxClusterExecutor."
213213
"Please use log_obj_size=False instead of log_obj_size=True."
214214
)
215+
216+
217+
def check_terminate_tasks_on_shutdown(terminate_tasks_on_shutdown: bool) -> None:
218+
"""
219+
Check if terminate_tasks_on_shutdown is False and raise a ValueError if it is.
220+
"""
221+
if not terminate_tasks_on_shutdown:
222+
raise ValueError(
223+
"terminate_tasks_on_shutdown is not supported for the executorlib.SingleNodeExecutor, "
224+
"executorlib.SlurmJobExecutor and executorlib.FluxJobExecutor."
225+
"Please use terminate_tasks_on_shutdown=True instead of terminate_tasks_on_shutdown=False."
226+
)

executorlib/task_scheduler/file/queue_spawner.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,29 @@ def execute_with_pysqa(
8181
return queue_id
8282

8383

84+
def terminate_with_pysqa(
85+
queue_id: int,
86+
config_directory: Optional[str] = None,
87+
backend: Optional[str] = None,
88+
):
89+
"""
90+
Delete job from queuing system
91+
92+
Args:
93+
queue_id (int): Queuing system ID of the job to delete.
94+
config_directory (str, optional): path to the config directory.
95+
backend (str, optional): name of the backend used to spawn tasks.
96+
"""
97+
qa = QueueAdapter(
98+
directory=config_directory,
99+
queue_type=backend,
100+
execute_command=_pysqa_execute_command,
101+
)
102+
status = qa.get_status_of_job(process_id=queue_id)
103+
if status is not None and status not in ["finished", "error"]:
104+
qa.delete_job(process_id=queue_id)
105+
106+
84107
def _pysqa_execute_command(
85108
commands: str,
86109
working_directory: Optional[str] = None,

executorlib/task_scheduler/file/shared.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from executorlib.standalone.command import get_command_path
1111
from executorlib.standalone.serialize import serialize_funct_h5
1212
from executorlib.task_scheduler.file.hdf import dump, get_output
13+
from executorlib.task_scheduler.file.subprocess_spawner import terminate_subprocess
1314

1415

1516
class FutureItem:
@@ -96,9 +97,19 @@ def execute_tasks_h5(
9697
for key, value in memory_dict.items()
9798
if not value.done()
9899
}
99-
if terminate_function is not None:
100+
if (
101+
terminate_function is not None
102+
and terminate_function == terminate_subprocess
103+
):
100104
for task in process_dict.values():
101105
terminate_function(task=task)
106+
elif terminate_function is not None:
107+
for queue_id in process_dict.values():
108+
terminate_function(
109+
queue_id=queue_id,
110+
config_directory=pysqa_config_directory,
111+
backend=backend,
112+
)
102113
future_queue.task_done()
103114
future_queue.join()
104115
break

executorlib/task_scheduler/file/task_scheduler.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,18 @@
1313
from executorlib.task_scheduler.file.shared import execute_tasks_h5
1414
from executorlib.task_scheduler.file.subprocess_spawner import (
1515
execute_in_subprocess,
16+
terminate_subprocess,
1617
)
1718

1819
try:
19-
from executorlib.task_scheduler.file.queue_spawner import execute_with_pysqa
20+
from executorlib.task_scheduler.file.queue_spawner import (
21+
execute_with_pysqa,
22+
terminate_with_pysqa,
23+
)
2024
except ImportError:
2125
# If pysqa is not available fall back to executing tasks in a subprocess
2226
execute_with_pysqa = execute_in_subprocess # type: ignore
27+
terminate_with_pysqa = None # type: ignore
2328

2429

2530
class FileTaskScheduler(TaskSchedulerBase):
@@ -90,7 +95,7 @@ def create_file_executor(
9095
init_function: Optional[Callable] = None,
9196
disable_dependencies: bool = False,
9297
execute_function: Callable = execute_with_pysqa,
93-
terminate_function: Optional[Callable] = None,
98+
terminate_tasks_on_shutdown: bool = True,
9499
):
95100
if block_allocation:
96101
raise ValueError(
@@ -108,6 +113,12 @@ def create_file_executor(
108113
check_executor(executor=flux_executor)
109114
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
110115
check_flux_log_files(flux_log_files=flux_log_files)
116+
if terminate_tasks_on_shutdown and execute_function != execute_in_subprocess:
117+
terminate_function = terminate_with_pysqa # type: ignore
118+
elif terminate_tasks_on_shutdown and execute_function == execute_in_subprocess:
119+
terminate_function = terminate_subprocess # type: ignore
120+
else:
121+
terminate_function = None # type: ignore
111122
return FileTaskScheduler(
112123
resource_dict=resource_dict,
113124
pysqa_config_directory=pysqa_config_directory,

tests/test_fluxclusterexecutor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def test_executor(self):
3737
resource_dict={"cores": 2, "cwd": "executorlib_cache"},
3838
block_allocation=False,
3939
cache_directory="executorlib_cache",
40+
terminate_tasks_on_shutdown=False,
4041
) as exe:
4142
cloudpickle_register(ind=1)
4243
fs1 = exe.submit(mpi_funct, 1)
@@ -50,6 +51,7 @@ def test_executor_no_cwd(self):
5051
resource_dict={"cores": 2},
5152
block_allocation=False,
5253
cache_directory="executorlib_cache",
54+
terminate_tasks_on_shutdown=True,
5355
) as exe:
5456
cloudpickle_register(ind=1)
5557
fs1 = exe.submit(mpi_funct, 1)

tests/test_standalone_inputcheck.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
check_hostname_localhost,
1919
check_pysqa_config_directory,
2020
check_file_exists,
21+
check_terminate_tasks_on_shutdown,
2122
check_log_obj_size,
2223
validate_number_of_cores,
2324
)
@@ -123,4 +124,8 @@ def test_validate_number_of_cores(self):
123124

124125
def test_check_log_obj_size(self):
125126
with self.assertRaises(ValueError):
126-
check_log_obj_size(log_obj_size=True)
127+
check_log_obj_size(log_obj_size=True)
128+
129+
def test_terminate_tasks_on_shutdown(self):
130+
with self.assertRaises(ValueError):
131+
check_terminate_tasks_on_shutdown(terminate_tasks_on_shutdown=False)

0 commit comments

Comments
 (0)