Skip to content

Commit f2f269a

Browse files
authored
Merge pull request #682 from gerlero/cases
Add run_all and run_all_wait static methods to AsyncFoamCase
2 parents da8de8b + 2f3c5ae commit f2f269a

File tree

2 files changed

+102
-21
lines changed

2 files changed

+102
-21
lines changed

README.md

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -150,28 +150,28 @@ with my_pitz.fv_schemes as f:
150150
f["snGradSchemes"]["default"] = "uncorrected"
151151
```
152152

153-
### ⏳ Asynchronous execution
153+
### ⏳ Run multiple cases in parallel
154+
155+
Outside an [asyncio](https://docs.python.org/3/library/asyncio.html) context:
154156

155157
```python
156-
import asyncio
157158
from foamlib import AsyncFoamCase
158159

159-
async def run_multiple_cases():
160-
"""Run multiple cases concurrently."""
161-
base_case = AsyncFoamCase(my_pitz)
162-
163-
# Create and run multiple cases with different parameters
164-
tasks = []
165-
for i, velocity in enumerate([1, 2, 3]):
166-
case = base_case.clone(f"case_{i}")
167-
case[0]["U"].boundary_field["inlet"].value = [velocity, 0, 0]
168-
tasks.append(case.run())
169-
170-
# Wait for all cases to complete
171-
await asyncio.gather(*tasks)
172-
173-
# Run the async function
174-
asyncio.run(run_multiple_cases())
160+
case1 = AsyncFoamCase("path/to/case1")
161+
case2 = AsyncFoamCase("path/to/case2")
162+
163+
AsyncFoamCase.run_all_wait([case1, case2])
164+
```
165+
166+
Within an asyncio context (e.g. in a [Jupyter](https://jupyter.org/) notebook):
167+
168+
```python
169+
from foamlib import AsyncFoamCase
170+
171+
case1 = AsyncFoamCase("path/to/case1")
172+
case2 = AsyncFoamCase("path/to/case2")
173+
174+
await AsyncFoamCase.run_all([case1, case2])
175175
```
176176

177177
### 🔢 Direct field file access
@@ -213,7 +213,7 @@ async def objective_function(x):
213213
result = differential_evolution(
214214
objective_function,
215215
bounds=[(-1, 1)],
216-
workers=AsyncSlurmFoamCase.map,
216+
workers=AsyncSlurmFoamCase.map, # Enables concurrent evaluations
217217
polish=False
218218
)
219219
print(f"Optimal inlet velocity: {result.x[0]}")

src/foamlib/_cases/async_.py

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,9 +364,90 @@ async def clone(
364364

365365
@staticmethod
366366
def map(
367-
coro: Callable[[_X], Awaitable[_Y]], iterable: Iterable[_X]
367+
coro: Callable[[_X], Awaitable[_Y]], iterable: Iterable[_X], /
368368
) -> Iterable[_Y]:
369-
"""Run an async function on each element of an iterable concurrently."""
369+
"""
370+
Run an async function on each element of an iterable concurrently.
371+
372+
:param coro: An async function to run on each element.
373+
:param iterable: An iterable of arguments to pass to the function.
374+
375+
:return: An iterable of results from the function.
376+
377+
Example usage: ::
378+
379+
import os
380+
from pathlib import Path
381+
from foamlib import AsyncSlurmFoamCase
382+
from scipy.optimize import differential_evolution
383+
384+
# Set up base case for optimization
385+
base = AsyncSlurmFoamCase(Path(os.environ["FOAM_TUTORIALS"]) / "incompressible/simpleFoam/pitzDaily")
386+
387+
async def objective_function(x):
388+
async with base.clone() as case:
389+
# Set inlet velocity based on optimization parameters
390+
case[0]["U"].boundary_field["inlet"].value = [x[0], 0, 0]
391+
392+
# Run with fallback to local execution if Slurm unavailable
393+
await case.run(fallback=True)
394+
395+
# Return objective (minimize velocity magnitude at outlet)
396+
return abs(case[-1]["U"].internal_field[0][0])
397+
398+
# Run optimization with parallel jobs
399+
result = differential_evolution(
400+
objective_function,
401+
bounds=[(-1, 1)],
402+
workers=AsyncSlurmFoamCase.map,
403+
polish=False
404+
)
405+
print(f"Optimal inlet velocity: {result.x[0]}")
406+
"""
370407
return asyncio.get_event_loop().run_until_complete(
371408
asyncio.gather(*(coro(arg) for arg in iterable))
372409
)
410+
411+
@staticmethod
412+
async def run_all(cases: Iterable["AsyncFoamCase | Awaitable[object]"], /) -> None:
413+
"""
414+
Run multiple cases concurrently.
415+
416+
Note that maximum parallelism for :class:`AsyncFoamCase` is limited by the :attr:`max_cpus`
417+
attribute in order to avoid overloading the system.
418+
419+
:param cases: Instances of :class:`AsyncFoamCase` (:meth:`run` will be called) or arbitrary
420+
awaitables (will be awaited as-is).
421+
422+
Example usage: ::
423+
from foamlib import AsyncFoamCase
424+
425+
case1 = AsyncFoamCase("path/to/case1")
426+
case2 = AsyncFoamCase("path/to/case2")
427+
428+
await AsyncFoamCase.run_all([case1, case2]) # Cases will run in parallel (as much the :attr:`max_cpus` allows)
429+
"""
430+
await asyncio.gather(
431+
*(case.run() if isinstance(case, AsyncFoamCase) else case for case in cases)
432+
)
433+
434+
@staticmethod
435+
def run_all_wait(cases: Iterable["AsyncFoamCase | Awaitable[object]"], /) -> None:
436+
"""
437+
Run multiple cases concurrently, blocking until all are complete.
438+
439+
Note that maximum parallelism for :class:`AsyncFoamCase` is limited by the :attr:`max_cpus`
440+
attribute in order to avoid overloading the system.
441+
442+
:param cases: Instances of :class:`AsyncFoamCase` (:meth:`run` will be called) or arbitrary
443+
awaitables (will be awaited as-is).
444+
445+
Example usage: ::
446+
from foamlib import AsyncFoamCase
447+
448+
case1 = AsyncFoamCase("path/to/case1")
449+
case2 = AsyncFoamCase("path/to/case2")
450+
451+
AsyncFoamCase.run_all_wait([case1, case2]) # Cases will run in parallel (as much the :attr:`max_cpus` allows)
452+
"""
453+
asyncio.get_event_loop().run_until_complete(AsyncFoamCase.run_all(cases))

0 commit comments

Comments
 (0)