Skip to content

feat: parallelize apply_patch & apply_channels#240

Draft
anas-ibrahem wants to merge 7 commits into
noaa-ocs-modeling:mainfrom
anas-ibrahem:feat/parallize_apply_patch
Draft

feat: parallelize apply_patch & apply_channels#240
anas-ibrahem wants to merge 7 commits into
noaa-ocs-modeling:mainfrom
anas-ibrahem:feat/parallize_apply_patch

Conversation

@anas-ibrahem

Copy link
Copy Markdown
Contributor

No description provided.

@anas-ibrahem anas-ibrahem changed the title feat: parallize apply_patch & apply_channels feat: parallelize apply_patch & apply_channels Jun 15, 2026
@anas-ibrahem

anas-ibrahem commented Jun 15, 2026

Copy link
Copy Markdown
Contributor Author

@SorooshMani-NOAA @felicio93 I've opened this PR as a draft for now, so you can review the pool solution(You can find that in last 3 commits here) , if it's fine then adding tests should be remaining.

I followed the same solution implemented in _apply_contours() passing the shared pool.
So that @add_pool_args decorator take pool=p and reuse it. Only one Pool pass down chain, so no new spawn inside. No nested cascade.

If you can confirm that so I can work on providing suitable tests.
Also If you have any suggestions for suitable tests.

@anas-ibrahem

anas-ibrahem commented Jun 16, 2026

Copy link
Copy Markdown
Contributor Author

The @add_pool_args decorator enforces a single-pool-per-rank rule. The collector dispatch (_apply_patch_apply_channels) creates ONE Pool(N) at the top, then passes pool=p downward. Every inner call (add_patch → add_feature) sees pool=p already exists and reuses it — no new spawn.

Each MPI rank owns its own independent Pool — ranks never share pools with each other, so no cross-rank conflicts. Within a rank, the shared pool goes strictly downward through the call chain, never sideways or nested. This guarantees exactly one level of multiprocessing per rank: MPI Rank → Pool(N) → workers. No Pool-inside-Pool cascade.

In Phase 2, Rank 0 acts as coordinator: it owns self._hfun_list, builds task dicts (plain strings and numbers that serialize trivially), scatters them to worker ranks via comm.scatter(), and gathers results via comm.gather(). Ranks 1–N are stateless workers — they receive task chunks, call the appropriate worker function, and return output paths. Workers have no knowledge of self._hfun_list and no shared state.

Inside each worker rank, the shared-pool infrastructure from this PR is what runs. When a worker rank processes its tile, it calls _apply_patch() or _apply_channels(), which creates ONE Pool locally and shares it downward. So the full architecture becomes: mpirun distributes tiles across ranks (inter-node) → each rank's collector dispatch creates one Pool (intra-node) → that pool is shared down to add_patch → add_feature. Two clean levels of parallelism, zero nesting.

I also found this resource

Another good points I found for Phase 2

For additional MPI safety, two runtime considerations apply: First, multiprocessing.set_start_method('spawn') should be used instead of the default fork on Linux — fork copies the parent's MPI communicator state into child processes, which can cause deadlocks or silent corruption. spawn starts each worker from a clean Python interpreter with no inherited MPI state. Second, OMP_NUM_THREADS=1 should be set so that numerical libraries (NumPy/SciPy via OpenBLAS/MKL) don't spawn their own internal threads inside each Pool worker — otherwise each of the N workers spawns 8+ threads, causing core oversubscription on top of MPI.

Finally, I came into something while searching: how heavy it is to create and destroy a Pool in our code. While it's safe (they never overlap), it can be optimized. Currently a single meshdata() call spawns and destroys up to 8 separate Pools:

The [NERSC Parallel Python guide](https://docs.nersc.gov/development/languages/python/parallel-python/) explicitly warns: "Creating and destroying Pool objects repeatedly is expensive. Create a single Pool at the start and reuse it." A future optimization would lift the Pool to the meshdata() level:

meshdata()
  spawn Pool(N) ─────────────────────────────── ONE creation
  │  _apply_constraints(pool=p)    →  reuse
  │  _apply_contours(pool=p)       →  reuse
  │  _apply_channels(pool=p)       →  reuse
  │  _apply_flow_limiters(pool=p)  →  reuse
  │  _apply_const_val(pool=p)      →  reuse
  │  _apply_patch(pool=p)          →  reuse
  │  _apply_linefeatures(pool=p)   →  reuse
  destroy Pool(N) ───────────────────────────── ONE teardown

@SorooshMani-NOAA

Copy link
Copy Markdown
Collaborator

Thanks @anas-ibrahem it was a very good explainer.

In the end we'll have pool of tasks that are limited to MPI rank's cores, we have to find a way to optimize it from the top when running, e.g. in collector, etc. But at least we can be assured that it's contained within the rank.

I totally agree with the point on creating one pool at the top. add_pool_args addresses this to some degree, but we need to make sure we make full use of it.

@anas-ibrahem

Copy link
Copy Markdown
Contributor Author

Thanks @anas-ibrahem it was a very good explainer.

In the end we'll have pool of tasks that are limited to MPI rank's cores, we have to find a way to optimize it from the top when running, e.g. in collector, etc. But at least we can be assured that it's contained within the rank.

I totally agree with the point on creating one pool at the top. add_pool_args addresses this to some degree, but we need to make sure we make full use of it.

I agree, so I will continue on our approach to comeplete this PR then.
I plan to make it ready for review by tomorrow.
@felicio93 also for testing it using qgis could you provide examples and what to expect so I can test and visualise it like the previous PR

@felicio93

Copy link
Copy Markdown
Collaborator

Hey @anas-ibrahem , here is how you can test it and what to expect:

  • No refinements
    hfun = ocsmesh.Hfun(hfun_rast_list,base_shape_crs=geom.crs,hmin=200, hmax=1000,method='fast')
image
  • add_patch:
from shapely.geometry import box
hfun = ocsmesh.Hfun(hfun_rast_list,base_shape_crs=geom.crs,hmin=200, hmax=1000,method='fast')
hfun.add_patch(shape=box(-62.4, 45.56, -62.38, 45.75),target_size=300,expansion_rate=0.001) # add the min/max lat/lon that make sense to your red sea case
image
  • add_channel:
hfun = ocsmesh.Hfun(hfun_rast_list,base_shape_crs=geom.crs,hmin=200, hmax=1000,method='fast')
hfun.add_channel(level=0, width=2000, target_size=300, expansion_rate=0.001)
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants