diff --git a/src/matvis/_utils.py b/src/matvis/_utils.py index 0c8d141..c4c5323 100644 --- a/src/matvis/_utils.py +++ b/src/matvis/_utils.py @@ -3,9 +3,12 @@ import logging import time import tracemalloc as tm +from typing import Union import numpy as np import psutil +from pyuvdata import BeamInterface, UVBeam +from pyuvdata.analytic_beam import AnalyticBeam try: import cupy as cp @@ -207,9 +210,9 @@ def get_required_chunks( def get_desired_chunks( - freemem: int, + freemem: int | float, min_chunks: int, - beam_list: int, + beam_list: list[UVBeam | AnalyticBeam | BeamInterface], nax: int, nfeed: int, nant: int, @@ -222,7 +225,7 @@ def get_desired_chunks( Parameters ---------- - freemem : int + freemem : int | float The amount of free memory in bytes. min_chunks : int The minimum number of chunks desired. diff --git a/src/matvis/core/getz.py b/src/matvis/core/getz.py index e887b10..ae0dc24 100644 --- a/src/matvis/core/getz.py +++ b/src/matvis/core/getz.py @@ -79,7 +79,7 @@ def __call__( """ exptau *= sqrt_flux - self.z.shape = (self.nant, self.nfeed, self.nax, self.nsrc) + self.z = self.z.reshape(self.nant, self.nfeed, self.nax, self.nsrc) for fd in range(self.nfeed): for ax in range(self.nax): @@ -88,11 +88,16 @@ def __call__( if beam_idx is None: self.z *= beam else: - self.z *= beam[beam_idx] + # Since beam_idx is an array of integers, using it as an index into beam + # is "fancy indexing", which causes a memory copy. To avoid this, we loop + # over the indices. While this might be a bit slower, it avoids the memory + # copy and thus is more memory efficient. + for ant, bmidx in enumerate(beam_idx): + self.z[ant] *= beam[bmidx] # Here we expand the beam to all ants (from its beams), then broadcast to # the shape of exptau, so we end up with shape (Nant, Nfeed, Nax, Nsources) - self.z.shape = (self.nant * self.nfeed, self.nax * self.nsrc) + self.z = self.z.reshape(self.nant * self.nfeed, self.nax * self.nsrc) if self.gpu: cp.cuda.Device().synchronize() diff --git a/src/matvis/cpu/cpu.py b/src/matvis/cpu/cpu.py index 354b0e8..3a78b38 100644 --- a/src/matvis/cpu/cpu.py +++ b/src/matvis/cpu/cpu.py @@ -148,9 +148,14 @@ def simulate( shape (NTIMES, NBLS). """ + if not 0 < source_buffer <= 1: + raise ValueError("source_buffer must satisfy 0 < source_buffer <= 1") + if not 0 < memory_buffer <= 1: + raise ValueError("memory_buffer must satisfy 0 < memory_buffer <= 1") + init_time = time.time() - if not tm.is_tracing() and logger.isEnabledFor(logging.INFO): + if not tm.is_tracing(): tm.start() highest_peak = memtrace(0) @@ -161,8 +166,10 @@ def simulate( rtype, ctype = get_dtypes(precision) + current_memory = tm.get_traced_memory()[0] + nchunks, npixc = get_desired_chunks( - min(max_memory, psutil.virtual_memory().available), + min(max_memory - current_memory, psutil.virtual_memory().available), min_chunks, beam_list, nax, diff --git a/src/matvis/gpu/gpu.py b/src/matvis/gpu/gpu.py index e2cf1b8..d2d773a 100644 --- a/src/matvis/gpu/gpu.py +++ b/src/matvis/gpu/gpu.py @@ -84,8 +84,10 @@ def simulate( if not HAVE_CUDA: raise ImportError("You need to install the [gpu] extra to use this function!") - if source_buffer > 1.0: - raise ValueError("source_buffer must be <=1.0") + if not 0 < source_buffer <= 1: + raise ValueError("source_buffer must satisfy 0 < source_buffer <= 1") + if not 0 < memory_buffer <= 1: + raise ValueError("memory_buffer must satisfy 0 < memory_buffer <= 1") pr = psutil.Process() nax, nfeed, nant, ntimes = _validate_inputs( diff --git a/tests/test_matvis_cpu.py b/tests/test_matvis_cpu.py index a2d82e7..4e9ab21 100644 --- a/tests/test_matvis_cpu.py +++ b/tests/test_matvis_cpu.py @@ -4,9 +4,11 @@ import pytest from astropy.time import Time from pyuvdata.analytic_beam import GaussianBeam +from pyuvdata.beam_interface import BeamInterface from pyuvdata.telescopes import Telescope from matvis import simulate_vis +from matvis._test_utils import get_standard_sim_params NTIMES = 10 NFREQ = 5 @@ -54,3 +56,69 @@ def test_simulate_vis(polarized): max_progress_reports=2, ) assert np.all(~np.isnan(vis)) # check that there are no NaN values + + +def test_multibeam_matches_single_beam_cpu(): + """Ensure per-antenna identical beams match a single shared beam on CPU.""" + kw, *_ = get_standard_sim_params( + use_analytic_beam=True, polarized=False, nfreq=1, nsource=2, ntime=1 + ) + kw |= {"precision": 2, "use_gpu": False} + + beams = kw.pop("beams") + beam_idx = np.zeros(len(kw["ants"]), dtype=int) + + vis_multi = simulate_vis(beams=beams * len(kw["ants"]), beam_idx=beam_idx, **kw) + vis_single = simulate_vis(beams=beams, **kw) + + np.testing.assert_allclose(vis_multi, vis_single, atol=0, rtol=0) + + +def test_multibeam_permutation_invariant_cpu(): + """Ensure beam-list reordering with remapped beam_idx is invariant on CPU.""" + kw, *_ = get_standard_sim_params( + use_analytic_beam=True, polarized=False, nfreq=1, nsource=2, ntime=1 + ) + kw |= {"precision": 2, "use_gpu": False} + kw.pop("beams") + + beam_a = BeamInterface(GaussianBeam(diameter=14.0), beam_type="power") + beam_b = BeamInterface(GaussianBeam(diameter=8.0), beam_type="power") + + vis_ab = simulate_vis( + beams=[beam_a, beam_b], + beam_idx=np.array([0, 1, 0], dtype=int), + **kw, + ) + vis_ba = simulate_vis( + beams=[beam_b, beam_a], + beam_idx=np.array([1, 0, 1], dtype=int), + **kw, + ) + + np.testing.assert_allclose(vis_ab, vis_ba, atol=0, rtol=0) + + +def test_multibeam_assignment_change_affects_output_cpu(): + """Ensure changing beam_idx assignment changes visibilities on CPU.""" + kw, *_ = get_standard_sim_params( + use_analytic_beam=True, polarized=False, nfreq=1, nsource=2, ntime=1 + ) + kw |= {"precision": 2, "use_gpu": False} + kw.pop("beams") + + beam_a = BeamInterface(GaussianBeam(diameter=14.0), beam_type="power") + beam_b = BeamInterface(GaussianBeam(diameter=8.0), beam_type="power") + + vis_010 = simulate_vis( + beams=[beam_a, beam_b], + beam_idx=np.array([0, 1, 0], dtype=int), + **kw, + ) + vis_101 = simulate_vis( + beams=[beam_a, beam_b], + beam_idx=np.array([1, 0, 1], dtype=int), + **kw, + ) + + assert not np.allclose(vis_010, vis_101, rtol=0, atol=1e-12) diff --git a/tests/test_matvis_gpu.py b/tests/test_matvis_gpu.py index db6e780..4b2eb71 100644 --- a/tests/test_matvis_gpu.py +++ b/tests/test_matvis_gpu.py @@ -6,18 +6,12 @@ import numpy as np from pyuvdata.analytic_beam import GaussianBeam +from pyuvdata.beam_interface import BeamInterface from matvis import simulate_vis from matvis._test_utils import get_standard_sim_params -def test_source_buffer_validation(): - """Ensure that source_buffer > 1.0 raises a ValueError.""" - kw, *_ = get_standard_sim_params(False, False) - with pytest.raises(ValueError, match="source_buffer must be <=1.0"): - simulate_vis(use_gpu=True, source_buffer=1.5, **kw) - - def test_antizenith(): """Ensure that a single source at anti-zenith produces zero visibilities.""" kw, *_ = get_standard_sim_params( @@ -50,6 +44,72 @@ def test_multibeam(): assert np.all(vis1 == vis2) +def test_multibeam_matches_single_beam_gpu(): + """Ensure per-antenna identical beams match a single shared beam on GPU.""" + kw, *_ = get_standard_sim_params( + use_analytic_beam=True, polarized=False, nfreq=1, nsource=2, ntime=1 + ) + kw |= {"precision": 2, "use_gpu": True} + + beams = kw.pop("beams") + beam_idx = np.zeros(len(kw["ants"]), dtype=int) + + vis_multi = simulate_vis(beams=beams * len(kw["ants"]), beam_idx=beam_idx, **kw) + vis_single = simulate_vis(beams=beams, **kw) + + np.testing.assert_allclose(vis_multi, vis_single, atol=1e-12, rtol=0) + + +def test_multibeam_permutation_invariant_gpu(): + """Ensure beam-list reordering with remapped beam_idx is invariant on GPU.""" + kw, *_ = get_standard_sim_params( + use_analytic_beam=True, polarized=False, nfreq=1, nsource=2, ntime=1 + ) + kw |= {"precision": 2, "use_gpu": True} + kw.pop("beams") + + beam_a = BeamInterface(GaussianBeam(diameter=14.0), beam_type="power") + beam_b = BeamInterface(GaussianBeam(diameter=8.0), beam_type="power") + + vis_ab = simulate_vis( + beams=[beam_a, beam_b], + beam_idx=np.array([0, 1, 0], dtype=int), + **kw, + ) + vis_ba = simulate_vis( + beams=[beam_b, beam_a], + beam_idx=np.array([1, 0, 1], dtype=int), + **kw, + ) + + np.testing.assert_allclose(vis_ab, vis_ba, atol=1e-12, rtol=0) + + +def test_multibeam_assignment_change_affects_output_gpu(): + """Ensure changing beam_idx assignment changes visibilities on GPU.""" + kw, *_ = get_standard_sim_params( + use_analytic_beam=True, polarized=False, nfreq=1, nsource=2, ntime=1 + ) + kw |= {"precision": 2, "use_gpu": True} + kw.pop("beams") + + beam_a = BeamInterface(GaussianBeam(diameter=14.0), beam_type="power") + beam_b = BeamInterface(GaussianBeam(diameter=8.0), beam_type="power") + + vis_010 = simulate_vis( + beams=[beam_a, beam_b], + beam_idx=np.array([0, 1, 0], dtype=int), + **kw, + ) + vis_101 = simulate_vis( + beams=[beam_a, beam_b], + beam_idx=np.array([1, 0, 1], dtype=int), + **kw, + ) + + assert not np.allclose(vis_010, vis_101, rtol=0, atol=1e-12) + + def test_mixed_beams(uvbeam): """Test that error is raised when using a mixed beam list.""" kw, *_ = get_standard_sim_params(use_analytic_beam=True, polarized=False) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index e2f719c..2e0d01f 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -5,11 +5,14 @@ """ import numpy as np +import pytest from astropy.coordinates import EarthLocation from astropy.time import Time from pyuvdata.analytic_beam import GaussianBeam from matvis import simulate_vis +from matvis._test_utils import get_standard_sim_params +from matvis.gpu.gpu import HAVE_CUDA def test_passing_matprod_method_with_prefix(): @@ -31,3 +34,37 @@ def test_passing_matprod_method_with_prefix(): assert np.iscomplexobj( vis ) # check that the output is complex, as expected for CPUMatMul + + +@pytest.mark.parametrize( + "use_gpu", + [ + pytest.param(False, id="cpu"), + pytest.param( + True, + id="gpu", + marks=pytest.mark.skipif(not HAVE_CUDA, reason="GPU is not available"), + ), + ], +) +@pytest.mark.parametrize( + ("kwarg", "value", "errmsg"), + [ + ( + "source_buffer", + 0.0, + "source_buffer must satisfy 0 < source_buffer <= 1", + ), + ( + "memory_buffer", + 1.1, + "memory_buffer must satisfy 0 < memory_buffer <= 1", + ), + ], +) +def test_buffer_validation(use_gpu, kwarg, value, errmsg): + """Ensure buffer validation is enforced for both CPU and GPU wrapper calls.""" + kw, *_ = get_standard_sim_params(False, False) + + with pytest.raises(ValueError, match=errmsg): + simulate_vis(use_gpu=use_gpu, **{kwarg: value}, **kw)