|
4 | 4 | import uuid |
5 | 5 | from collections import defaultdict |
6 | 6 | from contextlib import contextmanager |
| 7 | +from typing import Literal |
7 | 8 |
|
8 | 9 | import numpy as np |
9 | 10 | from more_itertools import always_iterable |
@@ -505,10 +506,105 @@ def min_level(self, value): |
505 | 506 |
|
506 | 507 | def piter( |
507 | 508 | self, |
508 | | - *args, |
509 | | - **kwargs, |
| 509 | + njobs: int = 0, |
| 510 | + storage: dict | None = None, |
| 511 | + barrier: bool = True, |
| 512 | + dynamic: bool = False, |
| 513 | + reduction: Literal[None, "sum", "max", "min", "cat", "cat_on_root"] = None, |
510 | 514 | ): |
511 | | - yield from parallel_objects(self.chunks([], "io"), *args, **kwargs) |
| 515 | + """Iterate in parallel over data in the container. |
| 516 | +
|
| 517 | + Parameters |
| 518 | + ---------- |
| 519 | + njobs : int |
| 520 | + How many jobs to spawn. By default, one job will be dispatched for |
| 521 | + each available processor. |
| 522 | + storage : dict |
| 523 | + This is a dictionary, which will be filled with results during the |
| 524 | + course of the iteration. The keys will be the dataset |
| 525 | + indices and the values will be whatever is assigned to the *result* |
| 526 | + attribute on the storage during iteration. |
| 527 | + barrier : bool |
| 528 | + Should a barrier be placed at the end of iteration? |
| 529 | + dynamic : bool |
| 530 | + This governs whether or not dynamic load balancing will be enabled. |
| 531 | + This requires one dedicated processor; if this is enabled with a set of |
| 532 | + 128 processors available, only 127 will be available to iterate over |
| 533 | + objects as one will be load balancing the rest. |
| 534 | + reduction : Literal[None, "sum", "max", "min", "cat", "cat_on_root"] |
| 535 | + This specifies the reduction operation to be applied to the results |
| 536 | + from each processor. |
| 537 | + - None: no reduction will be applied and the storage object will |
| 538 | + contain one result per chunk in the container. |
| 539 | + - cat: the storage object will contain a flattened list of |
| 540 | + each result. |
| 541 | + - cat_on_root: same as cat, but only the root processor will |
| 542 | + contain anything. |
| 543 | + - sum, min, max: the storage object will contain the result |
| 544 | + of applying the operation until getting a single value. |
| 545 | +
|
| 546 | + Important limitation |
| 547 | + -------------------- |
| 548 | + When using `storage`, the result *must* be a dictionary. See the |
| 549 | + examples below. |
| 550 | +
|
| 551 | + Example |
| 552 | + ------- |
| 553 | +
|
| 554 | + Here is an example of how to gather all data on root, reading in |
| 555 | + parallel. Other MPI tasks will have nothing in `my_storage`. |
| 556 | +
|
| 557 | + >>> import yt |
| 558 | + >>> ds = yt.load("output_00080") |
| 559 | + ... ad = ds.all_data() |
| 560 | + >>> my_storage = {} |
| 561 | + ... for sto, chunk in ad.piter(storage=my_storage, reduction="cat_on_root"): |
| 562 | + ... sto.result = { |
| 563 | + ... ("gas", "density"): chunk["gas", "density"], |
| 564 | + ... ("gas", "temperature"): chunk["gas", "temperature"], |
| 565 | + ... } |
| 566 | + ... if yt.is_root(): |
| 567 | + ... # Contains *all* the gas densities |
| 568 | + ... my_storage["gas", "density"] |
| 569 | + ... # Contains *all* the gas temperatures |
| 570 | + ... my_storage["gas", "temperature"] |
| 571 | +
|
| 572 | + Here is an example of how to sum the total mass of all gas cells in |
| 573 | + the dataset, storing the result in `my_storage` on all processors. |
| 574 | +
|
| 575 | + >>> my_storage = {} |
| 576 | + ... for sto, chunk in ad.piter(storage=my_storage, reduction="sum"): |
| 577 | + ... sto.result = { |
| 578 | + ... "total_mass": chunk["gas", "cell_mass"].sum(), |
| 579 | + ... } |
| 580 | + ... print("Total mass: ", my_storage["total_mass"]) |
| 581 | +
|
| 582 | + Here is an example of how to read all data in parallel and |
| 583 | + have the results available on all processors. |
| 584 | +
|
| 585 | + >>> my_storage = {} |
| 586 | + ... for sto, chunk in ad.piter(storage=my_storage, reduction="cat"): |
| 587 | + ... sto.result = {("gas", "density"): chunk["gas", "density"]} |
| 588 | + ... print(my_storage["gas", "density"]) |
| 589 | +
|
| 590 | + This is equivalent (but faster, since reading is parallelized) to the |
| 591 | + following |
| 592 | +
|
| 593 | + >>> my_storage = {("gas", "density"): ad["gas", "density"]} |
| 594 | + """ |
| 595 | + if reduction is not None and storage is None: |
| 596 | + raise ValueError( |
| 597 | + "If reduction is specified, you must pass in a storage dictionary." |
| 598 | + ) |
| 599 | + |
| 600 | + yield from parallel_objects( |
| 601 | + self.chunks([], "io"), |
| 602 | + njobs=njobs, |
| 603 | + storage=storage, |
| 604 | + barrier=barrier, |
| 605 | + dynamic=dynamic, |
| 606 | + reduction=reduction, |
| 607 | + ) |
512 | 608 |
|
513 | 609 |
|
514 | 610 | class YTSelectionContainer0D(YTSelectionContainer): |
|
0 commit comments