Skip to content

Conversation

@tomvothecoder
Copy link
Collaborator

@tomvothecoder tomvothecoder commented Mar 12, 2025

Description

Changes so far

  • Add ability to configure a Dask client via CoreParameter attributes: dask_scheduler_type and dask_memory_limit
    • dask_scheduler_type -- "processes" (default for backwards compatibility) or "distributed"
    • dask_memory_limit -- "auto" or specific memory limit such as "2GB", "512MB"`, etc.
      • "auto" is the default
  • Automatic chunking using Dask's "auto" chunk setting with to allow for a generalized chunking scheme regardless of the data size. This is necessary because E3SM Diagnostics receives input data that can vary in size and shape.

TODO:

  • Full run successful with Dask chunking
    • Processes
    • Distributed
      • Fix ESMF issue when running >1 variable at a time -- IN PROGRESS
  • Benchmark v2 data performance with different Dask configurations
    • Dask processes (bag) with 4 workers -- legacy method, default
    • Dask distributed (map) with 4 workers
  • Compare performances with plots

Checklist

  • My code follows the style guidelines of this project
  • I have performed a self-review of my own code
  • My changes generate no new warnings
  • Any dependent changes have been merged and published in downstream modules

If applicable:

  • New and existing unit tests pass with my changes (locally and CI/CD build)
  • I have added tests that prove my fix is effective or that my feature works
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have noted that this is a breaking change for a major release (fix or feature that would cause existing functionality to not work as expected)

@tomvothecoder tomvothecoder force-pushed the feature/949-dask-dist branch from 1e3f6b3 to 250271d Compare April 16, 2025 18:17
@tomvothecoder tomvothecoder changed the title Initial prototype of dask distributed to replace dask.bag Add support for large scale analysis on ultra-high res data Apr 16, 2025
@tomvothecoder tomvothecoder changed the title Add support for large scale analysis on ultra-high res data Add support for large scale analysis on ultra-high res data via Dask Apr 16, 2025
@tomvothecoder tomvothecoder changed the title Add support for large scale analysis on ultra-high res data via Dask Support large scale analysis on ultra-high res data via Dask Apr 16, 2025
@tomvothecoder tomvothecoder force-pushed the feature/949-dask-dist branch from 6531fec to fa5d910 Compare July 21, 2025 21:23
Add `cluster` parameter to enable custom dask cluster configs

Add `chunks="auto"` arg to `open_mfdataset()`
- Update `convert_units()` to make udunit conversion dask compatible via `xr.apply_ufunc`

Add old multiprocessing code for backwards compatibility
- Add `dask_scheduler_type` attribute to `CoreParameter`
- Update multiprocessing logic to take into account `dask_scheduler_type`

Update comment in `run_diags()` for clarity

Update run scripts to run all variables
- Update `_cleanup_dask_resources()` to wait for workers to close before closing client and cluster
- Add data loading for lat_lon_driver.py metrics
@tomvothecoder tomvothecoder force-pushed the feature/949-dask-dist branch from fa5d910 to ba70873 Compare July 21, 2025 21:25
@tomvothecoder
Copy link
Collaborator Author

tomvothecoder commented Jul 24, 2025

ESMF + xESMF failure when using Dask Distributed

I am running into an issue where xESMF/ESMF breaks when using a Dask Distributed scheduler and the num_workers is < number of regridding tasks (based on number of variables). This also might be related to #988.

ESMCI::VM::getCurrent() Internal error: Bad condition  - Could not determine current VM
ESMF_VMGetCurrent() Internal error: Bad condition
ESMF_GridCreateNoPeriDimR Internal error: Bad condition
...

This is triggered when xESMF.Regridder is initialized inside a Dask worker, which leads to race conditions in the ESMF Fortran backend (e.g., ESMF_GridCreateDistgridReg, VMGetCurrent, etc). ESMF is not thread- or process-safe during grid/regridder setup.

Avoid initializing any xESMF.Regridder inside a function that is passed to .map() or any parallel task — this breaks ESMF internals.

Attempts to fix

  1. Wrapping regridding in a subprocess call -- doesn't work with AssertionError: daemonic processes are not allowed to have children
  2. Using custom Dask WorkerPlugin to exit a worker after completion of each run_diag, then restart a new one for the next run_diags task -- trying it here

@tomvothecoder
Copy link
Collaborator Author

tomvothecoder commented Jul 24, 2025

xESMF does not play well with Dask Distributed Scheduler

xESMF relies on ESMF/ESMPy, which is not thread-safe and uses global state, making it sensitive to concurrency issues. When using Dask, especially with client.map, it’s critical to ensure that:

  • Each xESMF task runs in its own dedicated process.
  • Tasks must not share memory or regridder instances.

If multiple xESMF operations run concurrently in the same process (e.g., through shared Dask workers), this can cause segfaults, corrupted results, or silent hangs.

✅ Recommended setup:

Client(n_workers=N, threads_per_worker=1, processes=True)

Then use client.map or client.submit with isolated tasks — ideally one task per worker at a time.

💡 Tips:

  • Precompute and reuse weights via filename='weights.nc' to avoid repeated construction overhead.
  • Never share Regridder objects between tasks.
  • Avoid the threaded scheduler (processes=False), which is unsafe for xESMF.

This pattern ensures stable and parallel-safe usage of xESMF in Dask workflows.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Enhancement]: Explore dask distributed and chunking for improved performance and support for ultra high-resolution data

2 participants