-
Notifications
You must be signed in to change notification settings - Fork 299
Allow parallel I/O for hydro and particle reading #4730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ce2aa15 to
a8d7767
Compare
| roff = [off * dtr for off in offsets] | ||
| rsize = [siz * dtr for siz in sizes] | ||
| tmp_recv = recv.view(self.__tocast) | ||
| tmp_recv = recv.view(transport_dtype) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The motivation for getting rid of the cast to CHAR is so that we do not hit the limit imposed by MPI of 2**31 elements when communicating arrays as quickly.
| self.comm.send( | ||
| (arr.dtype.str, arr.shape, transport_dtype) + unit_metadata, | ||
| dest=dest, | ||
| tag=tag, | ||
| ) | ||
| self.comm.Send([arr, mpi_type], dest=dest, tag=tag) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there a bug here? I think this should be
| self.comm.send( | |
| (arr.dtype.str, arr.shape, transport_dtype) + unit_metadata, | |
| dest=dest, | |
| tag=tag, | |
| ) | |
| self.comm.Send([arr, mpi_type], dest=dest, tag=tag) | |
| self.comm.send( | |
| (arr.dtype.str, arr.shape, transport_dtype) + unit_metadata, | |
| dest=dest, | |
| tag=tag, | |
| ) | |
| self.comm.Send([tmp, mpi_type], dest=dest, tag=tag) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be tmp, yeah, but since it has been working I think it's possible that mpi4py was fixing it implicitly.
|
@yt-fido test this please |
53f24bc to
c9fc44d
Compare
c9fc44d to
3097fde
Compare
|
After discussion with @matthewturk and @chrishavlin, we're proposing to implement this: import numpy as np
import yt
ds = yt.load(...)
yt.enable_parallelism()
ad = ds.all_data()
def expensive_function(chunk):
import time
time.sleep(np.random.rand() * 3600)
return 42
sto = {}
for chunk in ad.piter(storage=sto, reduction="min/max/sum/cat/cat_on_root"):
sto.result["gas", "density"] = chunk["gas", "density"]
sto.result["gas", "temperature"] = chunk["gas", "temperature"]
sto.result["expensive_stuff"] = expensive_function(chunk)
if yt.is_root():
mean_expensive_stuff = np.mean(sto["expensive_stuff"])
plt.hist2d(sto["gas", "density"], sto["gas", "temperature"], bins=...)
plt.title(f"{mean_expennsive_stuff=}")
plt.savefig("...") |
|
Closing since #5218 is much better. |
PR Summary
This allows parallel I/O for the RAMSES dataset. Reading a RAMSES dataset has three steps:
The strategy I have adopted is to parallelize on the first one, such that each MPI task is now in charge of a few domains and only reads those (incl. hydro + particles).