Skip to content

Commit 1b0315a

Browse files
committed
include optional arg to allow non-default resource scaling (which is 20%)
1 parent 6734967 commit 1b0315a

File tree

1 file changed

+20
-0
lines changed

1 file changed

+20
-0
lines changed

src/rra_tools/jobmon.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ def build_parallel_task_graph( # type: ignore[no-untyped-def] # noqa: PLR0913
8585
task_args: dict[str, Any] | None = None,
8686
op_args: dict[str, Any] | None = None,
8787
max_attempts: int | None = None,
88+
resource_scales: dict[str, Any] | None = None,
8889
) -> list[Any]:
8990
"""Build a parallel task graph for jobmon.
9091
@@ -118,6 +119,14 @@ def build_parallel_task_graph( # type: ignore[no-untyped-def] # noqa: PLR0913
118119
The resources to allocate to the task.
119120
max_attempts
120121
The maximum number of attempts to make for each task.
122+
resource_scales
123+
How much users want to scale their resource request if the
124+
the initial request fails. Scale factor can be a numeric value, a Callable
125+
that will be applied to the existing resources, or an Iterator. Any Callable
126+
should take a single numeric value as its sole argument. Any Iterator should
127+
only yield numeric values. Any Iterable can be easily converted to an
128+
Iterator by using the iter() built-in (e.g. iter([80, 160, 190])).
129+
121130
122131
Returns
123132
-------
@@ -170,6 +179,7 @@ def build_parallel_task_graph( # type: ignore[no-untyped-def] # noqa: PLR0913
170179
task = task_template.create_task(
171180
**task_args,
172181
max_attempts=max_attempts,
182+
resource_scales=resource_scales,
173183
)
174184
tasks.append(task)
175185
else:
@@ -178,6 +188,7 @@ def build_parallel_task_graph( # type: ignore[no-untyped-def] # noqa: PLR0913
178188
**clean_task_args,
179189
**clean_op_args,
180190
max_attempts=max_attempts,
191+
resource_scales=resource_scales,
181192
)
182193
return tasks
183194

@@ -231,6 +242,7 @@ def run_parallel( # noqa: PLR0913
231242
op_args: dict[str, Any] | None = None,
232243
concurrency_limit: int = 10000,
233244
max_attempts: int | None = None,
245+
resource_scales: dict[str, Any] | None = None,
234246
log_root: str | Path | None = None,
235247
log_method: Callable[[str], None] = print,
236248
) -> str:
@@ -271,6 +283,13 @@ def run_parallel( # noqa: PLR0913
271283
The maximum number of tasks to run concurrently. Default is 10000.
272284
max_attempts
273285
The maximum number of attempts to make for each task.
286+
resource_scales
287+
How much users want to scale their resource request if the
288+
the initial request fails. Scale factor can be a numeric value, a Callable
289+
that will be applied to the existing resources, or an Iterator. Any Callable
290+
should take a single numeric value as its sole argument. Any Iterator should
291+
only yield numeric values. Any Iterable can be easily converted to an
292+
Iterator by using the iter() built-in (e.g. iter([80, 160, 190])).
274293
log_root
275294
The root directory for the logs. Default is None.
276295
log_method
@@ -315,6 +334,7 @@ def run_parallel( # noqa: PLR0913
315334
task_resources=task_resources,
316335
runner=runner,
317336
max_attempts=max_attempts,
337+
resource_scales=resource_scales
318338
)
319339

320340
workflow.add_tasks(tasks)

0 commit comments

Comments
 (0)