Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions .github/workflows/testsuite.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,54 @@ jobs:
env_vars: OS,PYTHON
fail_ci_if_error: true

only_progsteps:
env:
ENV_NAME: only_progsteps
PYTHON: "3.12"
name: No Tqdm Tests
defaults:
run:
# Adding -l {0} helps ensure conda can be found properly.
shell: bash -l {0}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
with:
fetch-depth: 1
- name: Setup Miniconda
uses: conda-incubator/[email protected]
with:
auto-update-conda: true
miniconda-version: "latest"
python-version: ${{ env.PYTHON }}
environment-file: ci/${{ env.ENV_NAME }}.yaml
activate-environment: ${{ env.ENV_NAME }}

- name: Conda Info
run: |
conda info -a
conda list
PYVER=`python -c "import sys; print('{:d}.{:d}'.format(sys.version_info.major, sys.version_info.minor))"`
if [[ $PYVER != ${{ env.PYTHON }} ]]; then
exit 1;
fi

- name: Install
run: |
pip install .

- name: Run Tests
run: |
python -m pytest --cov=pyuvsim --cov-config=.coveragerc --cov-report xml:./coverage.xml --junitxml=test-reports/xunit.xml

- uses: codecov/[email protected]
if: success()
with:
token: ${{secrets.CODECOV_TOKEN}} #required
file: ./coverage.xml #optional
env_vars: OS,PYTHON
fail_ci_if_error: true

warning_test:
env:
ENV_NAME: pyuvsim_tests_mpich
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Optional:
* lunarsky>=0.2.5 (for simulating telescopes on the moon)
* python-casacore>=3.5.2 (for writing CASA measurement sets, not available on Windows)
* matplotlib>=3.6 (for plotting functions)

* tqdm

### Developer Installation
If you are developing `pyuvsim`, you will need to download and install the
Expand Down
26 changes: 26 additions & 0 deletions ci/only_progsteps.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: only_progsteps
channels:
- conda-forge
- defaults
dependencies:
- astropy>=5.2
- astropy-healpix>=0.6
- coverage
- line_profiler
- mpi4py>=3.0.0
- numpy>=1.20
- openmpi
- pip
- psutil
- python-casacore>=3.3
- pytest
- pytest-cov
- pytest-xdist
- pyuvdata>=2.2.10
- pyyaml>=5.1
- scipy>=1.3
- setuptools_scm>=7.0.3
- pip:
- pyradiosky>=0.2
- lunarsky>=0.2.1
- git+https://github.com/aelanman/analytic_diffuse
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ plot = ["matplotlib>=3.6"]
sim = ["mpi4py>=3.1.3", "psutil"]
sim-test = ["pyuvsim[sim,test]", "mpi-pytest>=2025.7.0"]
all = ["pyuvsim[casa,healpix,moon,plot,sim]"]
test = ["coverage", "pooch>=1.8", "pre-commit", "pytest", "pytest-cov>=5.0"]
tqdm = [ "tqdm" ]
test = ["coverage", "pooch>=1.8", "pre-commit", "pytest", "pytest-cov>=5.0", "pyuvsim[tqdm]"]
doc = ["matplotlib", "pypandoc", "sphinx"]
profiler = ["line-profiler"]
dev = ["pyuvsim[all,test,sim-test,doc,profiler]"]
Expand All @@ -78,7 +79,7 @@ plot_csv_antpos = "pyuvsim.cli:plot_csv_antpos"
[tool.setuptools_scm]

[tool.pytest.ini_options]
addopts = "--ignore=scripts"
addopts = "--ignore=scripts"

[tool.ruff.lint]
select = [
Expand Down
36 changes: 30 additions & 6 deletions src/pyuvsim/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ def run_pyuvsim(argv=None):
help="Also save pickled LineStats data for line profiling.",
action="store_true",
)
parser.add_argument(
"--backend",
type=str,
help="Backend task collection for simulation",
choices=["rma", "send_recv"],
default="rma",
)

parser.add_argument(
"--progbar",
type=str,
help="Monitor and reporting module for simulation progress",
choices=["progsteps", "tqdm"],
default="progsteps",
)

args = parser.parse_args(argv)

Expand Down Expand Up @@ -98,7 +113,11 @@ def run_pyuvsim(argv=None):

if args.param is not None:
uvsim.run_uvsim(
args.param, quiet=args.quiet, block_nonroot_stdout=block_nonroot_stdout
args.param,
quiet=args.quiet,
block_nonroot_stdout=block_nonroot_stdout,
backend=args.backend,
progbar=args.progbar,
)
else:
uvd = UVData.from_file(args.uvdata)
Expand All @@ -113,12 +132,17 @@ def run_pyuvsim(argv=None):
catalog=skymodel,
quiet=args.quiet,
block_nonroot_stdout=block_nonroot_stdout,
backend=args.backend,
progbar=args.progbar,
)
pobj = Path(args.outfile)
utils.write_uvdata(
uvd_out,
param_dict={"outdir": str(pobj.parent), "outfile_name": str(pobj.name)},
)

if mpi.rank != 0:
return

pobj = Path(args.outfile)
utils.write_uvdata(
uvd_out, param_dict={"outdir": str(pobj.parent), "outfile_name": str(pobj.name)}
)

if args.profile:
dt = pytime.time() - t0
Expand Down
36 changes: 30 additions & 6 deletions src/pyuvsim/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""MPI setup."""

import atexit
import enum
import struct as _struct
import sys
from array import array as _array
Expand All @@ -19,14 +20,24 @@
world_comm = None
node_comm = None
rank_comm = None

status = None

# Split serialized objects into chunks of 2 GiB
INT_MAX = 2**31 - 1

shared_window_list = []


class Tags(enum.IntEnum):
"""Tags for MPI computations where worker nodes communicate with a main distribution node."""

READY = enum.auto()
START = enum.auto()
DONE = enum.auto()
EXIT = enum.auto()
ERROR = enum.auto()


def set_mpi_excepthook(mpi_comm):
"""Kill the whole job on an uncaught python exception."""

Expand All @@ -38,7 +49,7 @@ def mpi_excepthook(exctype, value, traceback): # pragma: no cover
sys.excepthook = mpi_excepthook


def start_mpi(block_nonroot_stdout=True):
def start_mpi(block_nonroot_stdout=True, thread_multiple=False):
"""
Initialize MPI if not already initialized and do setup.

Expand All @@ -51,20 +62,27 @@ def start_mpi(block_nonroot_stdout=True):
Redirect stdout on nonzero ranks to /dev/null, for cleaner output.

"""
global world_comm, node_comm, rank_comm, rank, Npus, status

do_once = False
global world_comm, node_comm, rank_comm, rank, Npus
if not MPI.Is_initialized(): # pragma: no cover
MPI.Init_thread(
MPI.THREAD_SERIALIZED
) # RMA is incompatible with THREAD_MULTIPLE.
do_once = True

if not thread_multiple:
MPI.Init_thread(
MPI.THREAD_SERIALIZED
) # RMA is incompatible with THREAD_MULTIPLE.
else:
MPI.Init_thread(MPI.THREAD_MULTIPLE)
atexit.register(MPI.Finalize)

world_comm = MPI.COMM_WORLD
node_comm = world_comm.Split_type(MPI.COMM_TYPE_SHARED)
rank_comm = world_comm.Split(color=node_comm.rank)

Npus = world_comm.Get_size()
rank = world_comm.Get_rank()
status = MPI.Status()
set_mpi_excepthook(world_comm)

world_comm.Barrier()
Expand All @@ -78,6 +96,7 @@ def start_mpi(block_nonroot_stdout=True):
with open("/dev/null", "w") as devnull:
sys.stdout = devnull
atexit.register(sys.stdout.close)

atexit.register(free_shared)


Expand Down Expand Up @@ -487,3 +506,8 @@ def get_comm():
def get_node_comm():
"""Get node_comm, the Communicator for all PUs on current node."""
return node_comm


def get_status():
"""status, the status of an mpi message."""
return status
55 changes: 55 additions & 0 deletions src/pyuvsim/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Licensed under the 3-clause BSD License
"""Define various utility functions."""

import itertools
import os
import sys
import time as pytime
Expand Down Expand Up @@ -42,6 +43,14 @@ def __init__(self, maxval=None):
self.curval = 0
self.remain = None

def __enter__(self):
"""Enter a context manager."""
return self

def __exit__(self, exc_type, exc_value, traceback):
"""Exit a context manager."""
self.finish()

def update(self, count):
"""
Update the progress bar.
Expand Down Expand Up @@ -370,3 +379,49 @@ class in simulation, accounting for its attributes as well as
[sys.getsizeof(v) * Ncomponents * Nfreqs for k, v in Ncomp_Nfreq_attrs.items()]
)
return mem_est


def _grouper_it(iterable, chunksize=1):
"""Chunk an iterator and return an iterator.

Parameters
----------
iterable : Iterable
The iterable object to chunk
chunksize : int
size of chunks desired

Returns
-------
iterable chunked into sizes
"""
it = iter(iterable)
while True:
chunk = list(itertools.islice(it, chunksize))
if not chunk:
return
yield chunk


def _chunked_iterator_product(iter1, iter2, chunksize1, chunksize2):
"""Iterate over the product of two chunked iterators.

Parameters
----------
iter1 : Iterable
One iterator to chunk through.
iter2 : Iterable
The second iterator to chunk through.
chunksize1 : int
Chunk size for iter1
chunksize2 : int
Chunk size for iter2

Returns
-------
An iterator over the chunked product of all combinations of iter1 and iter2

"""
for i1 in _grouper_it(iter1, chunksize1):
for i2 in _grouper_it(iter2, chunksize2):
yield i1, i2
Loading
Loading