Skip to content

Conversation

@cphyc
Copy link
Member

@cphyc cphyc commented Jul 18, 2025

PR Summary

This allows to iterate in parallel over data containers.

The syntax is

ds = yt.load(...)
ad = ds.all_data()

# Gathering everything in parallel
my_storage = {}
for sto, chunk in ad.piter(storage=my_storage, reduction="cat"):
    sto.result = {}
    sto.result["gas", "density"] = chunk["gas", "density"]

my_storage["gas", "density"]  # exists for all processes and contains everything


# Gathering everything on root
my_storage = {}
for sto, chunk in ad.piter(storage=my_storage, reduction="cat_on_root"):
    sto.result = {}
    sto.result["gas", "density"] = chunk["gas", "density"]

my_storage["gas", "density"]  # None for all processes but root


# Reduce everything
my_storage = {}
for sto, chunk in ad.piter(storage=my_storage, reduction="sum"):
    sto.result = {}
    sto.result["gas", "density"] = chunk["gas", "density"]

my_storage["gas", "density"]  # contains the sum of all the densities

This makes #4730 obsolete.

PR Checklist

  • New features are documented, with docstrings and narrative docs
  • Adds a test for any bugs fixed. Adds tests for new features.

@cphyc cphyc force-pushed the piter-on-data-containers branch from ad921de to 1c2649d Compare July 18, 2025 15:53
@cphyc cphyc added enhancement Making something better workshop-2025 labels Jul 18, 2025
@cphyc cphyc marked this pull request as ready for review July 18, 2025 16:09
@neutrinoceros
Copy link
Member

about type-checking errors: #5195

@cphyc cphyc force-pushed the piter-on-data-containers branch from bea773f to 6c307f3 Compare August 5, 2025 12:11
@cphyc cphyc requested review from brittonsmith, chrishavlin, chummels, Copilot and matthewturk and removed request for chummels August 5, 2025 12:12
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds parallel iteration capabilities to data containers in yt, allowing users to iterate over data chunks in parallel with optional reduction operations. The implementation introduces a new piter method on data containers that supports various reduction strategies (concatenation, sum, min, max) either on all processors or just the root processor.

  • Added a piter method to YTSelectionContainer for parallel iteration over data chunks
  • Enhanced the parallel_objects function with reduction operations and improved type annotations
  • Implemented new reduction methods (reduce, all_concat, concat) in the communication system

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.

File Description
yt/utilities/parallel_tools/parallel_analysis_interface.py Enhanced parallel processing infrastructure with reduction operations and modernized ResultsStorage class
yt/data_objects/selection_objects/data_selection_objects.py Added piter method to data containers for parallel iteration with comprehensive documentation

Comment on lines 520 to 522
- concat: the storage object will contain a flattened list of
each results.
- cat_on_root: same as concat, but only the root processor will
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation mentions 'concat' as a reduction option, but the actual parameter value is 'cat'. This inconsistency could confuse users.

Suggested change
- concat: the storage object will contain a flattened list of
each results.
- cat_on_root: same as concat, but only the root processor will
- cat: the storage object will contain a flattened list of
each result.
- cat_on_root: same as cat, but only the root processor will

Copilot uses AI. Check for mistakes.
case "cat_on_root":
new_storage = my_communicator.concat(to_share, root=0)
case "sum" | "max" | "min":
new_storage = my_communicator.reduce(to_share, op=reduction, root=0)
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reduce method is called with root=0, but when the operation is not distributed, the result should be available on all processors. This inconsistency with the documented behavior could cause issues.

Suggested change
new_storage = my_communicator.reduce(to_share, op=reduction, root=0)
if getattr(my_communicator, "distributed", False):
new_storage = my_communicator.reduce(to_share, op=reduction, root=0)
else:
new_storage = my_communicator.reduce(to_share, op=reduction)

Copilot uses AI. Check for mistakes.
Comment on lines 539 to 541
- concat: the storage object will contain a flattened list of
each results.
- cat_on_root: same as concat, but only the root processor will
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same documentation inconsistency as in the other file - 'concat' is documented but 'cat' is the actual parameter value.

Suggested change
- concat: the storage object will contain a flattened list of
each results.
- cat_on_root: same as concat, but only the root processor will
- cat: the storage object will contain a flattened list of
each results.
- cat_on_root: same as cat, but only the root processor will

Copilot uses AI. Check for mistakes.
brittonsmith
brittonsmith previously approved these changes Aug 6, 2025
Copy link
Member

@brittonsmith brittonsmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really great. Very concise, happy to see this merged. We can use this to refactor the loop over all halos in the HaloCatalog.

@cphyc cphyc force-pushed the piter-on-data-containers branch from e4447c3 to 53c52f2 Compare August 7, 2025 10:03
@cphyc
Copy link
Member Author

cphyc commented Aug 7, 2025

I need to fix the typing issues (which I'm not convinced are real) and write the doc. Please hold before merging :)

@neutrinoceros
Copy link
Member

Yes, mypy infers reachability in a slightly broken way that may give off false negatives, maybe this lint should actually be turned off entirely.

@cphyc
Copy link
Member Author

cphyc commented Sep 4, 2025

In the meantime, I've added # ignore statements to make mypy happy.

@yt-project yt-project deleted a comment from Copilot AI Sep 11, 2025
brittonsmith
brittonsmith previously approved these changes Sep 12, 2025
matthewturk
matthewturk previously approved these changes Sep 12, 2025
Copy link
Member

@matthewturk matthewturk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

@chummels
Copy link
Member

Great addition to the codebase! Will this apply to all data objects, not just ds.all_data()? Have you checked if this works with particle-based frontends? It probably does, but I don't know. If it doesn't, it might be worth having a note in the docs or an explicit error message for particle-based frontends that this it not implemented. Lastly, it might be worth including a simple unit test to ensure that the code can run (not bother with answer testing), so that we're aware if something breaks it in the future. Anyway, I'm not trying to make a ton more work, just make sure this framework remains viable in the future.

@cphyc
Copy link
Member Author

cphyc commented Nov 8, 2025

I realised I never replied to your message @chummels. I haven't tried with particle data, but there's no reason for it not to work if it supports chunking.

It also works with any selection, not only all_data().

@cphyc
Copy link
Member Author

cphyc commented Nov 8, 2025

aaaaand I just found a bug - let's not merge this just yet. The issue is with RAMSES reader, where we use progressive indexing (a spatial region isn't fully indexed until it's actually read). The problem is then that different MPI tasks end up reading different regions, and so which chunks may overlap a given region depends on what has been read previously.

If you use this PR's parallel reader one after the other, you may end up in a situation where, e.g. MPI task 1 thinks it needs to read CPUs #1, #2, and #3, but MPI task 2 already read #3 and indexed it, so it know it won't overlap with the region, leaving only #1 and #2 to be read. Since the MPI tasks aren't communicating this information with each other, this will lead to undefined behavior.

@cphyc cphyc dismissed stale reviews from matthewturk and brittonsmith via 101c22e November 10, 2025 09:23
For RAMSES datasets, each MPI rank may have already indexed fully
part of the domains, leading to different chunks across MPI ranks.

This means that we can do an MPI reduction over which domains
should be read to get the most specific subset of domains, but also
to ensure that all ranks have the same list of domains when chunking.
@cphyc cphyc force-pushed the piter-on-data-containers branch from 101c22e to 8779485 Compare November 10, 2025 09:28
@chrishavlin chrishavlin added this to the 4.5.0 milestone Nov 14, 2025
@cphyc cphyc force-pushed the piter-on-data-containers branch from 947320e to 39777f6 Compare December 1, 2025 09:44
@cphyc
Copy link
Member Author

cphyc commented Dec 1, 2025

I fixed the bug thanks to @AnatoleStorck (see 39777f6)! The test failure is unrelated, however.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Making something better workshop-2025

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants