From cdd3519ee4f4b213dfc95111e2930ff859fdf3f7 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Fri, 3 Apr 2026 15:42:03 -0700 Subject: [PATCH 01/32] add new beam evaluation functions that use 2 beams --- src/fftvis/cpu/beams.py | 68 +++++++++++++++++++++++++++++++++- src/fftvis/cpu/cpu_simulate.py | 2 + 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/src/fftvis/cpu/beams.py b/src/fftvis/cpu/beams.py index 29c5246..42a2ea6 100644 --- a/src/fftvis/cpu/beams.py +++ b/src/fftvis/cpu/beams.py @@ -139,4 +139,70 @@ def get_apparent_flux_polarized(beam, coherency): beam[0,0,i] = tmp00*a00 + tmp01*a10 beam[0,1,i] = tmp00*a01 + tmp01*a11 beam[1,0,i] = tmp10*a00 + tmp11*a10 - beam[1,1,i] = tmp10*a01 + tmp11*a11 \ No newline at end of file + beam[1,1,i] = tmp10*a01 + tmp11*a11 + + @staticmethod + @nb.jit(nopython=True, parallel=False, nogil=False) + def get_apparent_flux_polarized_beam_pair( + beam_i: np.ndarray, + beam_j: np.ndarray, + flux: np.ndarray, + out: np.ndarray, + ): + """Compute A_i^H * diag(flux) * A_j for an unpolarized sky with two beams. + + Generalises get_apparent_flux_polarized_beam to the case where the two + antennas forming a baseline have different beam patterns. When beam_i is + beam_j this gives the same result as the existing in-place method. + + Unlike the existing method, i10 cannot be inferred from i01 by conjugation + since A_i^H A_j is not Hermitian in general, so all four elements are + computed explicitly. + """ + nax, nfd, nsrc = beam_i.shape + for isrc in range(nsrc): + ci = np.conj(beam_i[:, :, isrc]) # ci[a, p] = conj(A_i[a, p]) + + i00 = ci[0, 0] * beam_j[0, 0, isrc] + ci[1, 0] * beam_j[1, 0, isrc] + i01 = ci[0, 0] * beam_j[0, 1, isrc] + ci[1, 0] * beam_j[1, 1, isrc] + i10 = ci[0, 1] * beam_j[0, 0, isrc] + ci[1, 1] * beam_j[1, 0, isrc] + i11 = ci[0, 1] * beam_j[0, 1, isrc] + ci[1, 1] * beam_j[1, 1, isrc] + + out[0, 0, isrc] = i00 * flux[isrc] + out[0, 1, isrc] = i01 * flux[isrc] + out[1, 0, isrc] = i10 * flux[isrc] + out[1, 1, isrc] = i11 * flux[isrc] + + + @staticmethod + @nb.jit(nopython=True, parallel=False, nogil=False) + def get_apparent_flux_polarized_pair( + beam_i: np.ndarray, + beam_j: np.ndarray, + coherency: np.ndarray, + out: np.ndarray, + ): + """Compute A_i^H @ C @ A_j for a polarized sky with two beams. + + Generalises get_apparent_flux_polarized to the cross-beam case. + The left Jones matrix is A_i (conjugate transpose applied) and the right + is A_j, giving the measurement equation for a mixed-beam baseline. + """ + nsources = beam_i.shape[2] + for i in range(nsources): + ai00, ai01 = beam_i[0, 0, i], beam_i[0, 1, i] + ai10, ai11 = beam_i[1, 0, i], beam_i[1, 1, i] + aj00, aj01 = beam_j[0, 0, i], beam_j[0, 1, i] + aj10, aj11 = beam_j[1, 0, i], beam_j[1, 1, i] + + # A_i^H @ C + tmp00 = np.conj(ai00) * coherency[0, 0, i] + np.conj(ai10) * coherency[1, 0, i] + tmp01 = np.conj(ai00) * coherency[0, 1, i] + np.conj(ai10) * coherency[1, 1, i] + tmp10 = np.conj(ai01) * coherency[0, 0, i] + np.conj(ai11) * coherency[1, 0, i] + tmp11 = np.conj(ai01) * coherency[0, 1, i] + np.conj(ai11) * coherency[1, 1, i] + + # (A_i^H C) @ A_j + out[0, 0, i] = tmp00 * aj00 + tmp01 * aj10 + out[0, 1, i] = tmp00 * aj01 + tmp01 * aj11 + out[1, 0, i] = tmp10 * aj00 + tmp11 * aj10 + out[1, 1, i] = tmp10 * aj01 + tmp11 * aj11 \ No newline at end of file diff --git a/src/fftvis/cpu/cpu_simulate.py b/src/fftvis/cpu/cpu_simulate.py index ba07596..f8c863e 100644 --- a/src/fftvis/cpu/cpu_simulate.py +++ b/src/fftvis/cpu/cpu_simulate.py @@ -470,6 +470,8 @@ def _evaluate_vis_chunk( _cpu_beam_evaluator.polarized = polarized _cpu_beam_evaluator.freq = freq + # TODO: Loop over multiple beams if needed, currently we only support one beam for simplicity. + # TODO: Identify the pairs of baselines that have the same set of the beam pairs to avoid redundant beam evaluations. apparent_coherency = _cpu_beam_evaluator.evaluate_beam( beam, az, From 53e233c2c166e448f9d20f0fd26312faa9c42f01 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Wed, 15 Apr 2026 14:22:18 -0700 Subject: [PATCH 02/32] update multi-beam sims --- src/fftvis/cpu/beams.py | 25 ++++ src/fftvis/cpu/cpu_simulate.py | 250 ++++++++++++++++++++------------- src/fftvis/wrapper.py | 47 +++++-- 3 files changed, 208 insertions(+), 114 deletions(-) diff --git a/src/fftvis/cpu/beams.py b/src/fftvis/cpu/beams.py index 42a2ea6..7d21481 100644 --- a/src/fftvis/cpu/beams.py +++ b/src/fftvis/cpu/beams.py @@ -87,6 +87,31 @@ def evaluate_beam( raise ValueError("Beam interpolation resulted in an invalid value") return interp_beam + + @staticmethod + def prepare_beam_evaluation(antnums, baselines, beam_idx): + """ + Compute the minimal flips for the given baselines and beam indices. + """ + # Get number of unique beams + nbeams = len(np.unique(beam_idx)) + + # Get the unique beam pairs that we need to evaluate the apparent flux for + unique_beam_pairs = [(bi, bj) for bi in range(nbeams) for bj in range(bi, nbeams)] + + # Determine which baselines correspond to which beam pairs, and whether they need to be flipped + antnum_to_beam_idx = {ai: bidx for ai, bidx in zip(antnums, beam_idx)} + flipped = [] + + for (ai, aj) in baselines: + if (antnum_to_beam_idx[ai], antnum_to_beam_idx[aj]) in unique_beam_pairs: + flipped.append(False) + elif (antnum_to_beam_idx[aj], antnum_to_beam_idx[ai]) in unique_beam_pairs: + flipped.append(True) + else: + raise ValueError("Beam pair not in beam pair list") + + return unique_beam_pairs, flipped @staticmethod @nb.jit(nopython=True, parallel=False, nogil=False) diff --git a/src/fftvis/cpu/cpu_simulate.py b/src/fftvis/cpu/cpu_simulate.py index f8c863e..5026f4c 100644 --- a/src/fftvis/cpu/cpu_simulate.py +++ b/src/fftvis/cpu/cpu_simulate.py @@ -40,13 +40,15 @@ def _evaluate_vis_chunk_remote( time_idx: slice, freq_idx: slice, - beam, + beam_list: list[Union[UVBeam, BeamInterface]], coord_mgr, rotation_matrix: np.ndarray, + antnums: list, bls: np.ndarray, freqs: np.ndarray, complex_dtype: np.dtype, nfeeds: int, + beam_idx: list[int] = None, polarized: bool = False, polarized_sky_model: bool = False, eps: float = None, @@ -67,9 +69,11 @@ def _evaluate_vis_chunk_remote( return engine._evaluate_vis_chunk( # pragma: no cover time_idx=time_idx, freq_idx=freq_idx, - beam=beam, + beam_list=beam_list, + beam_idx=beam_idx, coord_mgr=coord_mgr, rotation_matrix=rotation_matrix, + antnums=antnums, bls=bls, freqs=freqs, complex_dtype=complex_dtype, @@ -97,12 +101,13 @@ def simulate( ants: dict, freqs: np.ndarray, fluxes: np.ndarray, - beam, + beam_list: list[Union[UVBeam, BeamInterface]], ra: np.ndarray, dec: np.ndarray, times: Union[np.ndarray, Time], telescope_loc: EarthLocation, baselines: list[tuple] = None, + beam_idx: list[int] = None, precision: int = 2, polarized: bool = False, eps: float = None, @@ -165,6 +170,7 @@ def simulate( coherency = coherency.astype(complex_dtype) # Flatten antenna positions + antnums = list(ants.keys()) antkey_to_idx = dict(zip(ants.keys(), range(len(ants)))) antvecs = np.array([ants[ant] for ant in ants], dtype=real_dtype) @@ -253,8 +259,9 @@ def simulate( if isinstance(val, np.ndarray): required_shm += val.nbytes - if isinstance(beam, UVBeam): - required_shm += beam.data_array.nbytes + for beam in beam_list: + if isinstance(beam, UVBeam): + required_shm += beam.data_array.nbytes # Add visibility memory required_shm += (ntimes * nfreqs * nbls * nax * nfeeds) * np.dtype( @@ -291,10 +298,11 @@ def simulate( os.system("ray memory --units MB > before-puts.txt") # Put data into shared-memory pool + antnums = ray.put(antnums) bls = ray.put(bls) rotation_matrix = ray.put(rotation_matrix) freqs = ray.put(freqs) - beam = ray.put(beam) + beam_list = ray.put(beam_list) coord_mgr = ray.put(coord_mgr) if trace_mem: os.system("ray memory --units MB > after-puts.txt") @@ -333,13 +341,15 @@ def simulate( fnc( time_idx=tc, freq_idx=fc, - beam=beam, + beam_list=beam_list, coord_mgr=coord_mgr, rotation_matrix=rotation_matrix, + antnums=antnums, bls=bls, freqs=freqs, complex_dtype=complex_dtype, nfeeds=nfeeds, + beam_idx=beam_idx, polarized=polarized, polarized_sky_model=polarized_sky_model, eps=eps, @@ -384,13 +394,15 @@ def _evaluate_vis_chunk( self, time_idx: slice, freq_idx: slice, - beam, + beam_list: list[Union[UVBeam, BeamInterface]], coord_mgr: CoordinateRotation, rotation_matrix: np.ndarray, + antnums: list, bls: np.ndarray, freqs: np.ndarray, complex_dtype: np.dtype, nfeeds: int, + beam_idx: list[int] = None, polarized: bool = False, polarized_sky_model: bool = False, eps: float = None, @@ -425,8 +437,16 @@ def _evaluate_vis_chunk( dtype=complex_dtype, shape=(nt_here, nbls, nfeeds, nfeeds, nf_here) ) + # Set up the coordinate manager for this chunk coord_mgr.setup() + # Get beam indices for this chunk + unique_beam_pairs, flipped = _cpu_beam_evaluator.prepare_beam_evaluation( + antnums=antnums, + baselines=bls, + beam_idx=beam_idx, + ) + # Check to see if the rotation matrix is an identity matrix is_rotation_identity = np.allclose(rotation_matrix, np.eye(3)) @@ -464,104 +484,136 @@ def _evaluate_vis_chunk( if not use_type1: uvw = bls * freq - # Update beam evaluator for matvis compatibility - _cpu_beam_evaluator.beam_list = [beam] - _cpu_beam_evaluator.nsrc = len(az) - _cpu_beam_evaluator.polarized = polarized - _cpu_beam_evaluator.freq = freq - - # TODO: Loop over multiple beams if needed, currently we only support one beam for simplicity. - # TODO: Identify the pairs of baselines that have the same set of the beam pairs to avoid redundant beam evaluations. - apparent_coherency = _cpu_beam_evaluator.evaluate_beam( - beam, - az, - za, - polarized, - freq, - spline_opts=beam_spline_opts, - interpolation_function=interpolation_function, - ).astype(complex_dtype) - - if polarized and polarized_sky_model: - logger.info( - "Using polarized sky model. " - "Computing apparent flux for polarized sources." - ) - # Flip to match the expected order - apparent_coherency = np.flip(apparent_coherency, axis=0) - - # Compute the polarized apparent flux - _cpu_beam_evaluator.get_apparent_flux_polarized( - apparent_coherency, np.transpose(flux[:nsim_sources, freqidx], (1, 2, 0)) - ) - elif polarized: - logger.info( - "Using polarized beam. " - "Computing apparent flux for unpolarized sources." - ) - _cpu_beam_evaluator.get_apparent_flux_polarized_beam( - apparent_coherency, flux[:nsim_sources, freqidx] - ) - else: - logger.info( - "Using unpolarized beam. " - "Computing apparent flux for unpolarized sources." - ) - apparent_coherency *= flux[:nsim_sources, freqidx] - - # Try to reshape safely - try: - apparent_coherency = np.reshape( - apparent_coherency, (nfeeds**2, nsim_sources) - ) - pass - except ValueError: # pragma: no cover - logger.error(f"Cannot reshape A_s with shape {apparent_coherency.shape} to {(nfeeds**2, nsim_sources)}") # pragma: no cover - continue # pragma: no cover + # Interpolate each beam at the source positions + beam_evaluations = [] + for beam in beam_list: + apparent_coherency = _cpu_beam_evaluator.evaluate_beam( + beam, + az, + za, + polarized, + freq, + spline_opts=beam_spline_opts, + interpolation_function=interpolation_function, + ).astype(complex_dtype) + beam_evaluations.append(apparent_coherency) + + for (bi, bj) in zip(unique_beam_pairs): + is_cross_pair = bi != bj + if polarized and polarized_sky_model: + logger.info( + "Using polarized sky model. " + "Computing apparent flux for polarized sources." + ) + + if is_cross_pair: + apparent_coherency = np.zeros_like(beam_evaluations[bi]) + + # Compute the polarized apparent flux + _cpu_beam_evaluator.get_apparent_flux_polarized_pair( + beam1=np.flip(beam_evaluations[bi], axis=0), + beam2=np.flip(beam_evaluations[bj], axis=0), + coherency=np.transpose(flux[:nsim_sources, freqidx], (1, 2, 0)), + out=apparent_coherency + ) + else: + # Flip to match the expected order + apparent_coherency = np.flip(np.copy(beam_evaluations[bi]), axis=0) - # Check if the dtype is complex - if apparent_coherency.dtype != complex_dtype: - apparent_coherency = apparent_coherency.astype(complex_dtype) - - # Compute visibilities w/ non-uniform FFT - if use_type1: - _vis_here = cpu_nufft2d_type1( - topo[0] * freq, - topo[1] * freq, - apparent_coherency, - n_modes=type1_n_modes, - index=bls, - eps=eps, - n_threads=n_threads, - upsample_factor=upsample_factor, - ) - else: - if is_coplanar: - _vis_here = cpu_nufft2d( - topo[0], - topo[1], - apparent_coherency, - uvw[0], - uvw[1], - eps=eps, - n_threads=n_threads, - upsample_factor=upsample_factor, + # Compute the polarized apparent flux + _cpu_beam_evaluator.get_apparent_flux_polarized( + apparent_coherency, np.transpose(flux[:nsim_sources, freqidx], (1, 2, 0)) + ) + elif polarized: + logger.info( + "Using polarized beam. " + "Computing apparent flux for unpolarized sources." ) + + if is_cross_pair: + apparent_coherency = np.zeros_like(beam_evaluations[bi]) + + _cpu_beam_evaluator.get_apparent_flux_polarized_beam( + beam1=np.flip(beam_evaluations[bi], axis=0), + beam2=np.flip(np.conj(beam_evaluations[bj]), axis=0), + flux=flux[:nsim_sources, freqidx], + out=apparent_coherency + ) + else: + _cpu_beam_evaluator.get_apparent_flux_polarized_beam( + apparent_coherency, flux[:nsim_sources, freqidx] + ) else: - _vis_here = cpu_nufft3d( - topo[0], - topo[1], - topo[2], + logger.info( + "Using unpolarized beam. " + "Computing apparent flux for unpolarized sources." + ) + if is_cross_pair: + apparent_coherency = beam_evaluations[bi] * np.conj(beam_evaluations[bj]) + apparent_coherency *= flux[:nsim_sources, freqidx] + else: + apparent_coherency = beam_evaluations[bi] * flux[:nsim_sources, freqidx] + + # Try to reshape safely + try: + apparent_coherency = np.reshape( + apparent_coherency, (nfeeds**2, nsim_sources) + ) + pass + except ValueError: # pragma: no cover + logger.error(f"Cannot reshape A_s with shape {apparent_coherency.shape} to {(nfeeds**2, nsim_sources)}") # pragma: no cover + continue # pragma: no cover + + # Check if the dtype is complex + if apparent_coherency.dtype != complex_dtype: + apparent_coherency = apparent_coherency.astype(complex_dtype) + + # Compute visibilities w/ non-uniform FFT + if use_type1: + _vis_here = cpu_nufft2d_type1( + topo[0] * freq, + topo[1] * freq, apparent_coherency, - uvw[0], - uvw[1], - uvw[2], + n_modes=type1_n_modes, + index=bls, eps=eps, n_threads=n_threads, upsample_factor=upsample_factor, ) - vis[time_index, ..., freqidx] = np.swapaxes( - _vis_here.reshape(nfeeds, nfeeds, nbls), 2, 0 - ) + else: + if is_coplanar: + _vis_here = cpu_nufft2d( + topo[0], + topo[1], + apparent_coherency, + np.where(flipped, -uvw[0], uvw[0]), + np.where(flipped, -uvw[1], uvw[1]), + eps=eps, + n_threads=n_threads, + upsample_factor=upsample_factor, + ) + else: + _vis_here = cpu_nufft3d( + topo[0], + topo[1], + topo[2], + apparent_coherency, + np.where(flipped, -uvw[0], uvw[0]), + np.where(flipped, -uvw[1], uvw[1]), + np.where(flipped, -uvw[2], uvw[2]), + eps=eps, + n_threads=n_threads, + upsample_factor=upsample_factor, + ) + + # Flip visibilities for flipped baselines. This is equivalent to conjugating the beam for the flipped antenna, + # which is what we do for the beam evaluation, but since the beam evaluation is done separately for each antenna, + # we have to do the flipping at the end when we combine them. + _vis_here = np.where(flipped, np.conj(_vis_here), _vis_here) + + # TODO: Need to do something like nbls_here + vis[time_index, ..., freqidx] = np.swapaxes( + _vis_here.reshape(nfeeds, nfeeds, nbls), 2, 0 + ) return vis diff --git a/src/fftvis/wrapper.py b/src/fftvis/wrapper.py index c386ae0..d58183d 100644 --- a/src/fftvis/wrapper.py +++ b/src/fftvis/wrapper.py @@ -90,6 +90,7 @@ def simulate_vis( times: Union[np.ndarray, Time], beam, telescope_loc: EarthLocation, + beam_idx: list[int] = None, baselines: list[tuple] = None, precision: int = 2, polarized: bool = False, @@ -131,10 +132,16 @@ def simulate_vis( Frequencies to evaluate visibilities in Hz. times : astropy.Time instance or array_like Times of the observation (can be a numpy array of Julian dates or astropy.Time object). - beam : UVBeam - Beam object to use for the array. Per-antenna beams are not yet supported. + beam : UVBeam | BeamInterface, or list of UVBeam | BeamInterface + Beam object to use for the array. If a single beam object is provided, it will be assumed that all antennas have the same beam. + If a list of beam objects is provided, the length of the list should be equal to the number of unique beams in the array, + and the beam_idx parameter should be used to specify which beam corresponds to each antenna. telescope_loc An EarthLocation object representing the center of the array. + beam_idx : list of int, default = None + An array of integers, of the same length as ``ants``. Each entry is for an antenna of the same index, and + its value should be the index of the beam in the beam list that corresponds to the antenna. If None, all + antennas will be assumed to have the same beam, and the beam list will be ignored. baselines : list of tuples, default = None If provided, only the baselines within the list will be simulated and array of shape (nbls, nfreqs, ntimes) will be returned if polarized is False, and (nbls, nfreqs, ntimes, 2, 2) if polarized is True. @@ -212,20 +219,30 @@ def simulate_vis( # Make sure antpos has the right format ants = {k: np.array(v) for k, v in ants.items()} - # Interpolate the beam to the desired frequencies to avoid redundant - # interpolation in the simulation engine - if isinstance(beam, UVBeam): - if hasattr(beam, "Nfreqs") and beam.Nfreqs > 1: - beam = beam.interp(freq_array=freqs, new_object=True, run_check=False) # pragma: no cover - elif isinstance(beam, BeamInterface) and beam._isuvbeam: - if hasattr(beam.beam, "Nfreqs") and beam.beam.Nfreqs > 1: - beam.beam = beam.beam.interp(freq_array=freqs, new_object=True, run_check=False) + if not isinstance(beam, list): + _beam_list = [beam] + else: + _beam_list = beam + + beam_list = [] + + for beam in _beam_list: + # Interpolate the beam to the desired frequencies to avoid redundant + # interpolation in the simulation engine + if isinstance(beam, UVBeam): + if hasattr(beam, "Nfreqs") and beam.Nfreqs > 1: + beam = beam.interp(freq_array=freqs, new_object=True, run_check=False) # pragma: no cover + elif isinstance(beam, BeamInterface) and beam._isuvbeam: + if hasattr(beam.beam, "Nfreqs") and beam.beam.Nfreqs > 1: + beam.beam = beam.beam.interp(freq_array=freqs, new_object=True, run_check=False) + + beam = BeamInterface(beam) - beam = BeamInterface(beam) + # Prepare the beam + if not polarized: + beam = prepare_beam_unpolarized(beam, use_feed=use_feed) - # Prepare the beam - if not polarized: - beam = prepare_beam_unpolarized(beam, use_feed=use_feed) + beam_list.append(beam) # Create the simulation engine for the desired backend engine = create_simulation_engine(backend=backend) @@ -235,7 +252,7 @@ def simulate_vis( ants=ants, freqs=freqs, fluxes=fluxes, - beam=beam, + beam_list=beam_list, ra=ra, dec=dec, times=times, From 14840c1dff38d02ae353c209328dd510bf527761 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Wed, 15 Apr 2026 14:30:43 -0700 Subject: [PATCH 03/32] update type of beam_idx --- src/fftvis/cpu/cpu_simulate.py | 6 +++--- src/fftvis/wrapper.py | 20 ++++++++++++++++++-- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/fftvis/cpu/cpu_simulate.py b/src/fftvis/cpu/cpu_simulate.py index 5026f4c..b9801cf 100644 --- a/src/fftvis/cpu/cpu_simulate.py +++ b/src/fftvis/cpu/cpu_simulate.py @@ -48,7 +48,7 @@ def _evaluate_vis_chunk_remote( freqs: np.ndarray, complex_dtype: np.dtype, nfeeds: int, - beam_idx: list[int] = None, + beam_idx: np.ndarray = None, polarized: bool = False, polarized_sky_model: bool = False, eps: float = None, @@ -107,7 +107,7 @@ def simulate( times: Union[np.ndarray, Time], telescope_loc: EarthLocation, baselines: list[tuple] = None, - beam_idx: list[int] = None, + beam_idx: np.ndarray = None, precision: int = 2, polarized: bool = False, eps: float = None, @@ -402,7 +402,7 @@ def _evaluate_vis_chunk( freqs: np.ndarray, complex_dtype: np.dtype, nfeeds: int, - beam_idx: list[int] = None, + beam_idx: np.ndarray = None, polarized: bool = False, polarized_sky_model: bool = False, eps: float = None, diff --git a/src/fftvis/wrapper.py b/src/fftvis/wrapper.py index d58183d..ad93f81 100644 --- a/src/fftvis/wrapper.py +++ b/src/fftvis/wrapper.py @@ -90,7 +90,7 @@ def simulate_vis( times: Union[np.ndarray, Time], beam, telescope_loc: EarthLocation, - beam_idx: list[int] = None, + beam_idx: np.ndarray = None, baselines: list[tuple] = None, precision: int = 2, polarized: bool = False, @@ -138,7 +138,7 @@ def simulate_vis( and the beam_idx parameter should be used to specify which beam corresponds to each antenna. telescope_loc An EarthLocation object representing the center of the array. - beam_idx : list of int, default = None + beam_idx : np.ndarray, default = None An array of integers, of the same length as ``ants``. Each entry is for an antenna of the same index, and its value should be the index of the beam in the beam list that corresponds to the antenna. If None, all antennas will be assumed to have the same beam, and the beam list will be ignored. @@ -224,6 +224,22 @@ def simulate_vis( else: _beam_list = beam + nbeam = len(_beam_list) + nant = len(ants) + + # Check the beam indices + if beam_idx is None and nbeam not in (1, nant): + raise ValueError( + "If number of beams provided is not 1 or nant, beam_idx must be provided." + ) + if beam_idx is not None: + if beam_idx.shape != (nant,): + raise ValueError("beam_idx must be length nant") + if not all(0 <= i < nbeam for i in beam_idx): + raise ValueError( + "beam_idx contains indices greater than the number of beams" + ) + beam_list = [] for beam in _beam_list: From f284a2463942dc0de7b840590b2949827bf6134e Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Wed, 15 Apr 2026 14:31:48 -0700 Subject: [PATCH 04/32] add beaminterface import to cpu_simulate --- src/fftvis/cpu/cpu_simulate.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/fftvis/cpu/cpu_simulate.py b/src/fftvis/cpu/cpu_simulate.py index b9801cf..4d26df1 100644 --- a/src/fftvis/cpu/cpu_simulate.py +++ b/src/fftvis/cpu/cpu_simulate.py @@ -18,6 +18,7 @@ from astropy import units as un from astropy.time import Time from pyuvdata import UVBeam +from pyuvdata.beam_interface import BeamInterface from matvis import coordinates from matvis.core.coords import CoordinateRotation From 334aa87bc82070793d08c30f2a77ff94bf45a3c1 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Wed, 15 Apr 2026 16:05:03 -0700 Subject: [PATCH 05/32] properly place visibilities in correct location --- src/fftvis/cpu/beams.py | 24 ++++++++++++------ src/fftvis/cpu/cpu_simulate.py | 45 ++++++++++++++++++++++------------ tests/test_cpu_simulate.py | 14 +++++------ 3 files changed, 53 insertions(+), 30 deletions(-) diff --git a/src/fftvis/cpu/beams.py b/src/fftvis/cpu/beams.py index 7d21481..fa3feb9 100644 --- a/src/fftvis/cpu/beams.py +++ b/src/fftvis/cpu/beams.py @@ -93,6 +93,9 @@ def prepare_beam_evaluation(antnums, baselines, beam_idx): """ Compute the minimal flips for the given baselines and beam indices. """ + if beam_idx is None: + return [(0, 0)], {(0, 0): np.arange(len(baselines))}, {(0, 0): [False] * len(baselines)} + # Get number of unique beams nbeams = len(np.unique(beam_idx)) @@ -101,17 +104,22 @@ def prepare_beam_evaluation(antnums, baselines, beam_idx): # Determine which baselines correspond to which beam pairs, and whether they need to be flipped antnum_to_beam_idx = {ai: bidx for ai, bidx in zip(antnums, beam_idx)} - flipped = [] - - for (ai, aj) in baselines: - if (antnum_to_beam_idx[ai], antnum_to_beam_idx[aj]) in unique_beam_pairs: - flipped.append(False) - elif (antnum_to_beam_idx[aj], antnum_to_beam_idx[ai]) in unique_beam_pairs: - flipped.append(True) + beam_pair_to_bls_idxs = {bp: [] for bp in unique_beam_pairs} + beam_pair_to_flipped = {bp: [] for bp in unique_beam_pairs} + + for idx, (ai, aj) in enumerate(baselines): + bi, bj = antnum_to_beam_idx[ai], antnum_to_beam_idx[aj] + if (bi, bj) in unique_beam_pairs: + bp, flipped = (bi, bj), False + elif (bj, bi) in unique_beam_pairs: + bp, flipped = (bj, bi), True else: raise ValueError("Beam pair not in beam pair list") - return unique_beam_pairs, flipped + beam_pair_to_bls_idxs[bp].append(idx) + beam_pair_to_flipped[bp].append(flipped) + + return unique_beam_pairs, beam_pair_to_bls_idxs, beam_pair_to_flipped @staticmethod @nb.jit(nopython=True, parallel=False, nogil=False) diff --git a/src/fftvis/cpu/cpu_simulate.py b/src/fftvis/cpu/cpu_simulate.py index 4d26df1..822797b 100644 --- a/src/fftvis/cpu/cpu_simulate.py +++ b/src/fftvis/cpu/cpu_simulate.py @@ -300,6 +300,7 @@ def simulate( # Put data into shared-memory pool antnums = ray.put(antnums) + baselines = ray.put(baselines) bls = ray.put(bls) rotation_matrix = ray.put(rotation_matrix) freqs = ray.put(freqs) @@ -346,6 +347,7 @@ def simulate( coord_mgr=coord_mgr, rotation_matrix=rotation_matrix, antnums=antnums, + baselines=baselines, bls=bls, freqs=freqs, complex_dtype=complex_dtype, @@ -399,6 +401,7 @@ def _evaluate_vis_chunk( coord_mgr: CoordinateRotation, rotation_matrix: np.ndarray, antnums: list, + baselines: list[tuple], bls: np.ndarray, freqs: np.ndarray, complex_dtype: np.dtype, @@ -442,9 +445,13 @@ def _evaluate_vis_chunk( coord_mgr.setup() # Get beam indices for this chunk - unique_beam_pairs, flipped = _cpu_beam_evaluator.prepare_beam_evaluation( + ( + unique_beam_pairs, + beam_pair_to_bls_idxs, + beam_pair_to_flipped, + ) = _cpu_beam_evaluator.prepare_beam_evaluation( antnums=antnums, - baselines=bls, + baselines=baselines, beam_idx=beam_idx, ) @@ -499,8 +506,18 @@ def _evaluate_vis_chunk( ).astype(complex_dtype) beam_evaluations.append(apparent_coherency) - for (bi, bj) in zip(unique_beam_pairs): + for (bi, bj) in unique_beam_pairs: is_cross_pair = bi != bj + bls_idxs = beam_pair_to_bls_idxs[(bi, bj)] + flipped = beam_pair_to_flipped[(bi, bj)] + + # Select the appropriate UVW coordinates for this beam pair. + # If the pair is flipped, we need to flip the UVW coordinates as well to get the correct apparent flux. + if not use_type1: + _uvw = np.where(flipped, -uvw[:, bls_idxs], uvw[:, bls_idxs]) + else: + bls_here = np.where(flipped, -bls[:, bls_idxs], bls[:, bls_idxs]) + if polarized and polarized_sky_model: logger.info( "Using polarized sky model. " @@ -535,8 +552,8 @@ def _evaluate_vis_chunk( apparent_coherency = np.zeros_like(beam_evaluations[bi]) _cpu_beam_evaluator.get_apparent_flux_polarized_beam( - beam1=np.flip(beam_evaluations[bi], axis=0), - beam2=np.flip(np.conj(beam_evaluations[bj]), axis=0), + beam1=beam_evaluations[bi], + beam2=beam_evaluations[bj], flux=flux[:nsim_sources, freqidx], out=apparent_coherency ) @@ -550,7 +567,7 @@ def _evaluate_vis_chunk( "Computing apparent flux for unpolarized sources." ) if is_cross_pair: - apparent_coherency = beam_evaluations[bi] * np.conj(beam_evaluations[bj]) + apparent_coherency = np.sqrt(beam_evaluations[bi] * beam_evaluations[bj]) apparent_coherency *= flux[:nsim_sources, freqidx] else: apparent_coherency = beam_evaluations[bi] * flux[:nsim_sources, freqidx] @@ -576,7 +593,7 @@ def _evaluate_vis_chunk( topo[1] * freq, apparent_coherency, n_modes=type1_n_modes, - index=bls, + index=bls_here, eps=eps, n_threads=n_threads, upsample_factor=upsample_factor, @@ -587,8 +604,8 @@ def _evaluate_vis_chunk( topo[0], topo[1], apparent_coherency, - np.where(flipped, -uvw[0], uvw[0]), - np.where(flipped, -uvw[1], uvw[1]), + _uvw[0], + _uvw[1], eps=eps, n_threads=n_threads, upsample_factor=upsample_factor, @@ -599,9 +616,9 @@ def _evaluate_vis_chunk( topo[1], topo[2], apparent_coherency, - np.where(flipped, -uvw[0], uvw[0]), - np.where(flipped, -uvw[1], uvw[1]), - np.where(flipped, -uvw[2], uvw[2]), + _uvw[0], + _uvw[1], + _uvw[2], eps=eps, n_threads=n_threads, upsample_factor=upsample_factor, @@ -611,9 +628,7 @@ def _evaluate_vis_chunk( # which is what we do for the beam evaluation, but since the beam evaluation is done separately for each antenna, # we have to do the flipping at the end when we combine them. _vis_here = np.where(flipped, np.conj(_vis_here), _vis_here) - - # TODO: Need to do something like nbls_here - vis[time_index, ..., freqidx] = np.swapaxes( + vis[time_index, bls_idxs, ..., freqidx] = np.swapaxes( _vis_here.reshape(nfeeds, nfeeds, nbls), 2, 0 ) diff --git a/tests/test_cpu_simulate.py b/tests/test_cpu_simulate.py index 9e7b1b1..7064dae 100644 --- a/tests/test_cpu_simulate.py +++ b/tests/test_cpu_simulate.py @@ -519,7 +519,7 @@ def test_simulate_with_basic_beam(): ants=ants, freqs=freqs, fluxes=fluxes, - beam=beam, + beam_list=[beam], ra=ra, dec=dec, times=times, @@ -537,7 +537,7 @@ def test_simulate_with_basic_beam(): ants=ants, freqs=freqs, fluxes=fluxes, - beam=beam, + beam_list=[beam], ra=ra, dec=dec, times=times, @@ -600,7 +600,7 @@ def test_simulate_with_specified_baselines(): ants=ants, freqs=freqs, fluxes=fluxes, - beam=beam, + beam_list=[beam], ra=ra, dec=dec, times=times, @@ -660,7 +660,7 @@ def test_beam_interpolation(): ants=ants, freqs=freqs, fluxes=fluxes, - beam=beam, + beam_list=[beam], ra=ra, dec=dec, times=times, @@ -726,7 +726,7 @@ def simulate_with_empty_baselines(): ants=ants, freqs=freqs, fluxes=fluxes, - beam=beam, + beam_list=[beam], ra=ra, dec=dec, times=times, @@ -864,7 +864,7 @@ def test_time_array_handling(): ants=ants, freqs=freqs, fluxes=fluxes, - beam=beam, + beam_list=[beam], ra=ra, dec=dec, times=scalar_time, @@ -881,7 +881,7 @@ def test_time_array_handling(): ants=ants, freqs=freqs, fluxes=fluxes, - beam=beam, + beam_list=[beam], ra=ra, dec=dec, times=array_time, From c9be26d76c760b09e7bdaef1991b467150cdf19e Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Wed, 15 Apr 2026 16:06:28 -0700 Subject: [PATCH 06/32] make sure to copy beam so that results are not overwritten --- src/fftvis/cpu/cpu_simulate.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/fftvis/cpu/cpu_simulate.py b/src/fftvis/cpu/cpu_simulate.py index 822797b..ce448b1 100644 --- a/src/fftvis/cpu/cpu_simulate.py +++ b/src/fftvis/cpu/cpu_simulate.py @@ -558,6 +558,7 @@ def _evaluate_vis_chunk( out=apparent_coherency ) else: + apparent_coherency = np.copy(beam_evaluations[bi]) _cpu_beam_evaluator.get_apparent_flux_polarized_beam( apparent_coherency, flux[:nsim_sources, freqidx] ) From 62ab5ea4f385e71e0cc2950e0621c710339e1425 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Wed, 15 Apr 2026 16:40:00 -0700 Subject: [PATCH 07/32] pass beam_idx properly --- src/fftvis/cpu/cpu_simulate.py | 15 +++++++++------ src/fftvis/wrapper.py | 1 + 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/fftvis/cpu/cpu_simulate.py b/src/fftvis/cpu/cpu_simulate.py index ce448b1..c04eee1 100644 --- a/src/fftvis/cpu/cpu_simulate.py +++ b/src/fftvis/cpu/cpu_simulate.py @@ -529,8 +529,8 @@ def _evaluate_vis_chunk( # Compute the polarized apparent flux _cpu_beam_evaluator.get_apparent_flux_polarized_pair( - beam1=np.flip(beam_evaluations[bi], axis=0), - beam2=np.flip(beam_evaluations[bj], axis=0), + beam_i=np.flip(beam_evaluations[bi], axis=0), + beam_j=np.flip(beam_evaluations[bj], axis=0), coherency=np.transpose(flux[:nsim_sources, freqidx], (1, 2, 0)), out=apparent_coherency ) @@ -549,11 +549,12 @@ def _evaluate_vis_chunk( ) if is_cross_pair: + logger.info("Processing cross pair") apparent_coherency = np.zeros_like(beam_evaluations[bi]) - _cpu_beam_evaluator.get_apparent_flux_polarized_beam( - beam1=beam_evaluations[bi], - beam2=beam_evaluations[bj], + _cpu_beam_evaluator.get_apparent_flux_polarized_beam_pair( + beam_i=beam_evaluations[bi], + beam_j=beam_evaluations[bj], flux=flux[:nsim_sources, freqidx], out=apparent_coherency ) @@ -629,8 +630,10 @@ def _evaluate_vis_chunk( # which is what we do for the beam evaluation, but since the beam evaluation is done separately for each antenna, # we have to do the flipping at the end when we combine them. _vis_here = np.where(flipped, np.conj(_vis_here), _vis_here) + + nbls_here = len(bls_idxs) vis[time_index, bls_idxs, ..., freqidx] = np.swapaxes( - _vis_here.reshape(nfeeds, nfeeds, nbls), 2, 0 + _vis_here.reshape(nfeeds, nfeeds, nbls_here), 2, 0 ) return vis diff --git a/src/fftvis/wrapper.py b/src/fftvis/wrapper.py index ad93f81..addf3ca 100644 --- a/src/fftvis/wrapper.py +++ b/src/fftvis/wrapper.py @@ -269,6 +269,7 @@ def simulate_vis( freqs=freqs, fluxes=fluxes, beam_list=beam_list, + beam_idx=beam_idx, ra=ra, dec=dec, times=times, From 480f9012a16a79a9a6dee16d94666b66e8f0322b Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Thu, 30 Apr 2026 13:44:06 -0700 Subject: [PATCH 08/32] fix linux tests where _evaluate_vis_chunk is called directly --- tests/test_cpu_simulate.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_cpu_simulate.py b/tests/test_cpu_simulate.py index 7064dae..6510439 100644 --- a/tests/test_cpu_simulate.py +++ b/tests/test_cpu_simulate.py @@ -906,7 +906,7 @@ def test_evaluate_vis_chunk_remote_matches_direct(tmp_path): beam = UVBeam() beam.data_array = np.ones((1,1,1)) params = dict( - beam=beam, + beam_list=[beam], coord_method_params={"source_buffer": 0.5}, ants=ants, freqs=freqs, @@ -956,7 +956,7 @@ def test_evaluate_vis_chunk_remote_matches_direct(tmp_path): direct = engine._evaluate_vis_chunk( time_idx=slice(None), freq_idx=slice(None), - beam=beam, + beam_list=[beam], coord_mgr=coord_mgr, rotation_matrix=rotation_matrix, bls=bls, @@ -977,7 +977,7 @@ def test_evaluate_vis_chunk_remote_matches_direct(tmp_path): fut = _evaluate_vis_chunk_remote.remote( time_idx=slice(None), freq_idx=slice(None), - beam=beam, + beam_list=[beam], coord_mgr=coord_mgr, rotation_matrix=rotation_matrix, bls=bls, @@ -1093,7 +1093,7 @@ def test_chunk_eval_trace_mem(tmp_path): vis = engine._evaluate_vis_chunk( time_idx=slice(None), freq_idx=slice(None), - beam=beam, + beam_list=[beam], coord_mgr=coord_mgr, rotation_matrix=rotation_matrix, bls=bls, From 8a8a7dd7d2fee263b212ccaf21b05a9fec979024 Mon Sep 17 00:00:00 2001 From: Steven Murray Date: Thu, 30 Apr 2026 23:05:21 +0200 Subject: [PATCH 09/32] fix: send baselines to remote fnc --- src/fftvis/cpu/cpu_simulate.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/fftvis/cpu/cpu_simulate.py b/src/fftvis/cpu/cpu_simulate.py index c04eee1..e376220 100644 --- a/src/fftvis/cpu/cpu_simulate.py +++ b/src/fftvis/cpu/cpu_simulate.py @@ -46,6 +46,7 @@ def _evaluate_vis_chunk_remote( rotation_matrix: np.ndarray, antnums: list, bls: np.ndarray, + baselines: list[tuple], freqs: np.ndarray, complex_dtype: np.dtype, nfeeds: int, @@ -76,6 +77,7 @@ def _evaluate_vis_chunk_remote( rotation_matrix=rotation_matrix, antnums=antnums, bls=bls, + baselines=baselines, freqs=freqs, complex_dtype=complex_dtype, nfeeds=nfeeds, From a4ecc88c360b5ac1f003be8964b179f22c61fa8c Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Thu, 30 Apr 2026 16:25:22 -0700 Subject: [PATCH 10/32] add missing kwargs to evaluate_vis_chunk --- tests/test_cpu_simulate.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/test_cpu_simulate.py b/tests/test_cpu_simulate.py index 6510439..1c9f300 100644 --- a/tests/test_cpu_simulate.py +++ b/tests/test_cpu_simulate.py @@ -270,6 +270,10 @@ def test_simulate_gridded_type1_vs_type3(polarized, precision, shear_array, rota fvis_type1, fvis_type3, atol=1e-5 if precision == 2 else 1e-4 ) +@pytest.mark.parametrize("use_analytic_beam", [True, False]) +def test_sim_multiple_beams(): + pass + @pytest.mark.parametrize("use_analytic_beam", [True, False]) def test_sim_polarized_sky(use_analytic_beam): """ @@ -938,6 +942,7 @@ def test_evaluate_vis_chunk_remote_matches_direct(tmp_path): # Flatten antenna positions antkey_to_idx = dict(zip(ants.keys(), range(len(ants)))) antvecs = np.array([ants[ant] for ant in ants], dtype=np.float64) + antnums = list(ants.keys()) # Rotate the array to the xy-plane rotation_matrix = utils.get_plane_to_xy_rotation_matrix(antvecs) @@ -959,6 +964,8 @@ def test_evaluate_vis_chunk_remote_matches_direct(tmp_path): beam_list=[beam], coord_mgr=coord_mgr, rotation_matrix=rotation_matrix, + antnums=antnums, + baselines=[(0, 1),], bls=bls, freqs=freqs, complex_dtype=np.complex128, @@ -981,6 +988,8 @@ def test_evaluate_vis_chunk_remote_matches_direct(tmp_path): coord_mgr=coord_mgr, rotation_matrix=rotation_matrix, bls=bls, + antnums=antnums, + baselines=[(0, 1),], freqs=freqs, complex_dtype=np.complex128, nfeeds=1, From b1a54ced035909497ee438d1a8fe813fae7caba9 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Thu, 30 Apr 2026 16:41:49 -0700 Subject: [PATCH 11/32] add new test that checks fftivs against matvis for per-antenna beams --- tests/test_cpu_simulate.py | 128 ++++++++++++++++++++++++++++++++++++- 1 file changed, 126 insertions(+), 2 deletions(-) diff --git a/tests/test_cpu_simulate.py b/tests/test_cpu_simulate.py index 1c9f300..90f2541 100644 --- a/tests/test_cpu_simulate.py +++ b/tests/test_cpu_simulate.py @@ -270,10 +270,134 @@ def test_simulate_gridded_type1_vs_type3(polarized, precision, shear_array, rota fvis_type1, fvis_type3, atol=1e-5 if precision == 2 else 1e-4 ) +@pytest.mark.parametrize("polarized", [False, True]) +@pytest.mark.parametrize("precision", [2, 1]) @pytest.mark.parametrize("use_analytic_beam", [True, False]) -def test_sim_multiple_beams(): - pass +def test_sim_multiple_beams(use_analytic_beam, polarized, precision): + """Test fftvis vs matvis with per-antenna (multiple) beams. + + Runs two sub-cases back to back: + + 1. **Identical beams** – two beam slots, but both point at the same object. + Checks that the beam_idx dispatch path doesn't corrupt results compared + to the matvis reference. + + 2. **Different beams** – beam1 is a genuinely modified copy of beam0. + Checks that (a) fftvis still matches matvis, and (b) the result is + actually different from the identical-beam case, proving that beam + diversity propagates through the NUFFT correctly. + """ + params, *_ = get_standard_sim_params( + use_analytic_beam=use_analytic_beam, polarized=polarized + ) + ants = params.pop("ants") + nant = len(ants) + sim_baselines = np.array([[0, 1]]) + # Alternate antennas between the two beam slots so both are exercised. + beam_idx = np.array([i % 2 for i in range(nant)]) + + beam0 = params["beams"][0] + + # Build a second beam that is genuinely different from beam0. + # Strategy depends on beam type so we stay robust across both parametrise branches. + if isinstance(beam0, UVBeam): + beam1 = beam0.copy() + beam1.data_array *= 0.5 + elif isinstance(beam0, BeamInterface) and beam0._isuvbeam: + # Unwrap, copy, scale, and rewrap + raw = beam0.beam.copy() + raw.data_array *= 0.5 + beam1 = raw # wrapper adds it back inside simulate_vis + elif hasattr(beam0, "diameter"): # e.g. AiryBeam + beam1 = type(beam0)(diameter=beam0.diameter * 0.75) + elif hasattr(beam0, "sigma"): # e.g. GaussianBeam + beam1 = type(beam0)(sigma=beam0.sigma * 0.75) + else: + # Unknown analytic type: fall back to a scaled UVBeam from the test data + beam1 = UVBeam() + beam1.read_cst_beam( + str(TEST_DIR / "data" / "HERA_NicCST_150MHz.txt"), + frequency=[150e6], + telescope_name="HERA", + feed_name="Dipole", + feed_version="1.0", + feed_pol=["x"], + model_name="Test", + model_version="1.0", + ) + beam1.data_array *= 0.5 + identical_beam_list = [beam0, beam0] + different_beam_list = [beam0, beam1] + + # --------------------------------------------------------------- + # Run both matvis reference calls while params["times"] is still + # an astropy.Time (matvis requires it; fftvis accepts JD floats). + # --------------------------------------------------------------- + shared_matvis_kwargs = dict( + ants=ants, + precision=precision, + antpairs=sim_baselines, + coord_method="CoordinateRotationERFA", + source_buffer=0.75, + beam_idx=beam_idx, + ) + + params["beams"] = identical_beam_list + mvis_identical = matvis.simulate_vis(**shared_matvis_kwargs, **params) + + params["beams"] = different_beam_list + mvis_different = matvis.simulate_vis(**shared_matvis_kwargs, **params) + + # Now safe to pop times and beams for the fftvis calls. + times = params.pop("times").jd + _ = params.pop("beams") + freqs = params["freqs"] + + atol = 1e-5 if precision == 2 else 1e-4 + expected_shape = ( + (len(freqs), len(times), 2, 2, len(sim_baselines)) + if polarized + else (len(freqs), len(times), len(sim_baselines)) + ) + + shared_fftvis_kwargs = dict( + ants=ants, + eps=1e-10 if precision == 2 else 6e-8, + baselines=sim_baselines, + precision=precision, + coord_method_params={"source_buffer": 0.75}, + beam_idx=beam_idx, + times=times, + backend="cpu", + **params, + ) + + # ---- Sub-case 1: identical beams ---- + fvis_identical = simulate_vis(beam=identical_beam_list, **shared_fftvis_kwargs) + + assert fvis_identical.shape == expected_shape + for bi in range(len(sim_baselines)): + np.testing.assert_allclose( + fvis_identical[..., bi], mvis_identical[:, :, bi], atol=atol + ) + + # ---- Sub-case 2: different beams ---- + fvis_different = simulate_vis(beam=different_beam_list, **shared_fftvis_kwargs) + + assert fvis_different.shape == expected_shape + for bi in range(len(sim_baselines)): + np.testing.assert_allclose( + fvis_different[..., bi], mvis_different[:, :, bi], atol=atol + ) + + # ---- Sanity check: beam diversity must actually change the answer ---- + # For an unpolarized sky, V_ij ∝ sqrt(B_i · B_j). Scaling B_j by 0.5 / 0.75 + # changes that product, so the two arrays should not be close. + assert not np.allclose(fvis_different, fvis_identical, atol=atol), ( + "Visibilities with different beams should not match those with identical beams" + ) + @pytest.mark.parametrize("use_analytic_beam", [True, False]) def test_sim_polarized_sky(use_analytic_beam): """ From 0fbb68f8342c4ddee10877b7d4a26d6b44a80fba Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Thu, 30 Apr 2026 16:54:40 -0700 Subject: [PATCH 12/32] remove logic branch in BeamInterface --- tests/test_cpu_simulate.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/test_cpu_simulate.py b/tests/test_cpu_simulate.py index 90f2541..00109a9 100644 --- a/tests/test_cpu_simulate.py +++ b/tests/test_cpu_simulate.py @@ -303,11 +303,6 @@ def test_sim_multiple_beams(use_analytic_beam, polarized, precision): if isinstance(beam0, UVBeam): beam1 = beam0.copy() beam1.data_array *= 0.5 - elif isinstance(beam0, BeamInterface) and beam0._isuvbeam: - # Unwrap, copy, scale, and rewrap - raw = beam0.beam.copy() - raw.data_array *= 0.5 - beam1 = raw # wrapper adds it back inside simulate_vis elif hasattr(beam0, "diameter"): # e.g. AiryBeam beam1 = type(beam0)(diameter=beam0.diameter * 0.75) elif hasattr(beam0, "sigma"): # e.g. GaussianBeam From 7868e33d9832fe23de9b9a6ad63ccd0955620f69 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Thu, 30 Apr 2026 16:58:21 -0700 Subject: [PATCH 13/32] restore BeamInterface branch --- tests/test_cpu_simulate.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/test_cpu_simulate.py b/tests/test_cpu_simulate.py index 00109a9..6e33590 100644 --- a/tests/test_cpu_simulate.py +++ b/tests/test_cpu_simulate.py @@ -8,7 +8,7 @@ from astropy.coordinates import EarthLocation, SkyCoord, Latitude, Longitude from astropy import units as un from astropy.units import Quantity -from pyuvdata import UVBeam +from pyuvdata import UVBeam, BeamInterface from matvis.core.coords import CoordinateRotation from fftvis.core.simulate import SimulationEngine @@ -303,6 +303,11 @@ def test_sim_multiple_beams(use_analytic_beam, polarized, precision): if isinstance(beam0, UVBeam): beam1 = beam0.copy() beam1.data_array *= 0.5 + elif isinstance(beam0, BeamInterface) and beam0._isuvbeam: + # Unwrap, copy, scale, and rewrap + raw = beam0.beam.copy() + raw.data_array *= 0.5 + beam1 = raw # wrapper adds it back inside simulate_vis elif hasattr(beam0, "diameter"): # e.g. AiryBeam beam1 = type(beam0)(diameter=beam0.diameter * 0.75) elif hasattr(beam0, "sigma"): # e.g. GaussianBeam From 4b2955d029cd07af841da8f24911aa6f8c013fa4 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Fri, 1 May 2026 16:54:46 -0700 Subject: [PATCH 14/32] add source chunking to reduce total memory usage --- src/fftvis/cpu/cpu_simulate.py | 271 +++++++++++++++++---------------- src/fftvis/wrapper.py | 5 + 2 files changed, 145 insertions(+), 131 deletions(-) diff --git a/src/fftvis/cpu/cpu_simulate.py b/src/fftvis/cpu/cpu_simulate.py index e376220..fe5d849 100644 --- a/src/fftvis/cpu/cpu_simulate.py +++ b/src/fftvis/cpu/cpu_simulate.py @@ -63,6 +63,7 @@ def _evaluate_vis_chunk_remote( basis_matrix: np.ndarray = None, type1_n_modes: int = None, trace_mem: bool = False, + nchunks: int = 1, ): """Ray-compatible remote version of _evaluate_vis_chunk.""" # Create a simulation engine instance @@ -93,6 +94,7 @@ def _evaluate_vis_chunk_remote( basis_matrix=basis_matrix, type1_n_modes=type1_n_modes, trace_mem=trace_mem, + nchunks=nchunks, ) @@ -128,6 +130,7 @@ def simulate( force_use_type3: bool = False, trace_mem: bool = False, enable_memory_monitor: bool = False, + nchunks: int = 1, ) -> np.ndarray: """ Simulate visibilities using CPU implementation. @@ -367,6 +370,7 @@ def simulate( basis_matrix=basis_matrix if is_gridded else None, type1_n_modes=n_modes if is_gridded else None, trace_mem=(nprocesses > 1 or force_use_ray) and trace_mem, + nchunks=nchunks, ) ) if trace_mem: @@ -421,6 +425,7 @@ def _evaluate_vis_chunk( basis_matrix: float = None, type1_n_modes: int = None, trace_mem: bool = False, + nchunks: int = 1, ) -> np.ndarray: """ Evaluate a chunk of visibility data using CPU. @@ -487,6 +492,7 @@ def _evaluate_vis_chunk( cpu_utils.inplace_rot(basis_matrix.T, topo) topo *= 2 * np.pi + chunks = np.array_split(range(nsim_sources), nchunks) for freqidx in range(nfreqs)[freq_idx]: freq = freqs[freqidx] @@ -494,148 +500,151 @@ def _evaluate_vis_chunk( if not use_type1: uvw = bls * freq - # Interpolate each beam at the source positions - beam_evaluations = [] - for beam in beam_list: - apparent_coherency = _cpu_beam_evaluator.evaluate_beam( - beam, - az, - za, - polarized, - freq, - spline_opts=beam_spline_opts, - interpolation_function=interpolation_function, - ).astype(complex_dtype) - beam_evaluations.append(apparent_coherency) - - for (bi, bj) in unique_beam_pairs: - is_cross_pair = bi != bj - bls_idxs = beam_pair_to_bls_idxs[(bi, bj)] - flipped = beam_pair_to_flipped[(bi, bj)] - - # Select the appropriate UVW coordinates for this beam pair. - # If the pair is flipped, we need to flip the UVW coordinates as well to get the correct apparent flux. - if not use_type1: - _uvw = np.where(flipped, -uvw[:, bls_idxs], uvw[:, bls_idxs]) - else: - bls_here = np.where(flipped, -bls[:, bls_idxs], bls[:, bls_idxs]) - - if polarized and polarized_sky_model: - logger.info( - "Using polarized sky model. " - "Computing apparent flux for polarized sources." - ) - - if is_cross_pair: - apparent_coherency = np.zeros_like(beam_evaluations[bi]) - - # Compute the polarized apparent flux - _cpu_beam_evaluator.get_apparent_flux_polarized_pair( - beam_i=np.flip(beam_evaluations[bi], axis=0), - beam_j=np.flip(beam_evaluations[bj], axis=0), - coherency=np.transpose(flux[:nsim_sources, freqidx], (1, 2, 0)), - out=apparent_coherency - ) - else: - # Flip to match the expected order - apparent_coherency = np.flip(np.copy(beam_evaluations[bi]), axis=0) - # Compute the polarized apparent flux - _cpu_beam_evaluator.get_apparent_flux_polarized( - apparent_coherency, np.transpose(flux[:nsim_sources, freqidx], (1, 2, 0)) + + for chunk in chunks: + # Interpolate each beam at the source positions + beam_evaluations = [] + for beam in beam_list: + apparent_coherency = _cpu_beam_evaluator.evaluate_beam( + beam, + az[chunk], + za[chunk], + polarized, + freq, + spline_opts=beam_spline_opts, + interpolation_function=interpolation_function, + ).astype(complex_dtype) + beam_evaluations.append(apparent_coherency) + + for (bi, bj) in unique_beam_pairs: + is_cross_pair = bi != bj + bls_idxs = beam_pair_to_bls_idxs[(bi, bj)] + flipped = beam_pair_to_flipped[(bi, bj)] + + # Select the appropriate UVW coordinates for this beam pair. + # If the pair is flipped, we need to flip the UVW coordinates as well to get the correct apparent flux. + if not use_type1: + _uvw = np.where(flipped, -uvw[:, bls_idxs], uvw[:, bls_idxs]) + else: + bls_here = np.where(flipped, -bls[:, bls_idxs], bls[:, bls_idxs]) + + if polarized and polarized_sky_model: + logger.info( + "Using polarized sky model. " + "Computing apparent flux for polarized sources." ) - elif polarized: - logger.info( - "Using polarized beam. " - "Computing apparent flux for unpolarized sources." - ) + + if is_cross_pair: + apparent_coherency = np.zeros_like(beam_evaluations[bi]) + + # Compute the polarized apparent flux + _cpu_beam_evaluator.get_apparent_flux_polarized_pair( + beam_i=np.flip(beam_evaluations[bi], axis=0), + beam_j=np.flip(beam_evaluations[bj], axis=0), + coherency=np.transpose(flux[chunk, freqidx], (1, 2, 0)), + out=apparent_coherency + ) + else: + # Flip to match the expected order + apparent_coherency = np.flip(np.copy(beam_evaluations[bi]), axis=0) - if is_cross_pair: - logger.info("Processing cross pair") - apparent_coherency = np.zeros_like(beam_evaluations[bi]) - - _cpu_beam_evaluator.get_apparent_flux_polarized_beam_pair( - beam_i=beam_evaluations[bi], - beam_j=beam_evaluations[bj], - flux=flux[:nsim_sources, freqidx], - out=apparent_coherency + # Compute the polarized apparent flux + _cpu_beam_evaluator.get_apparent_flux_polarized( + apparent_coherency, np.transpose(flux[chunk, freqidx], (1, 2, 0)) + ) + elif polarized: + logger.info( + "Using polarized beam. " + "Computing apparent flux for unpolarized sources." ) + + if is_cross_pair: + logger.info("Processing cross pair") + apparent_coherency = np.zeros_like(beam_evaluations[bi]) + + _cpu_beam_evaluator.get_apparent_flux_polarized_beam_pair( + beam_i=beam_evaluations[bi], + beam_j=beam_evaluations[bj], + flux=flux[chunk, freqidx], + out=apparent_coherency + ) + else: + apparent_coherency = np.copy(beam_evaluations[bi]) + _cpu_beam_evaluator.get_apparent_flux_polarized_beam( + apparent_coherency, flux[chunk, freqidx] + ) else: - apparent_coherency = np.copy(beam_evaluations[bi]) - _cpu_beam_evaluator.get_apparent_flux_polarized_beam( - apparent_coherency, flux[:nsim_sources, freqidx] + logger.info( + "Using unpolarized beam. " + "Computing apparent flux for unpolarized sources." ) - else: - logger.info( - "Using unpolarized beam. " - "Computing apparent flux for unpolarized sources." - ) - if is_cross_pair: - apparent_coherency = np.sqrt(beam_evaluations[bi] * beam_evaluations[bj]) - apparent_coherency *= flux[:nsim_sources, freqidx] - else: - apparent_coherency = beam_evaluations[bi] * flux[:nsim_sources, freqidx] - - # Try to reshape safely - try: - apparent_coherency = np.reshape( - apparent_coherency, (nfeeds**2, nsim_sources) - ) - pass - except ValueError: # pragma: no cover - logger.error(f"Cannot reshape A_s with shape {apparent_coherency.shape} to {(nfeeds**2, nsim_sources)}") # pragma: no cover - continue # pragma: no cover - - # Check if the dtype is complex - if apparent_coherency.dtype != complex_dtype: - apparent_coherency = apparent_coherency.astype(complex_dtype) - - # Compute visibilities w/ non-uniform FFT - if use_type1: - _vis_here = cpu_nufft2d_type1( - topo[0] * freq, - topo[1] * freq, - apparent_coherency, - n_modes=type1_n_modes, - index=bls_here, - eps=eps, - n_threads=n_threads, - upsample_factor=upsample_factor, - ) - else: - if is_coplanar: - _vis_here = cpu_nufft2d( - topo[0], - topo[1], - apparent_coherency, - _uvw[0], - _uvw[1], - eps=eps, - n_threads=n_threads, - upsample_factor=upsample_factor, + if is_cross_pair: + apparent_coherency = np.sqrt(beam_evaluations[bi] * beam_evaluations[bj]) + apparent_coherency *= flux[chunk, freqidx] + else: + apparent_coherency = beam_evaluations[bi] * flux[chunk, freqidx] + + # Try to reshape safely + try: + apparent_coherency = np.reshape( + apparent_coherency, (nfeeds**2, chunk.size) ) - else: - _vis_here = cpu_nufft3d( - topo[0], - topo[1], - topo[2], + pass + except ValueError: # pragma: no cover + logger.error(f"Cannot reshape A_s with shape {apparent_coherency.shape} to {(nfeeds**2, chunk.size)}") # pragma: no cover + continue # pragma: no cover + + # Check if the dtype is complex + if apparent_coherency.dtype != complex_dtype: + apparent_coherency = apparent_coherency.astype(complex_dtype) + + # Compute visibilities w/ non-uniform FFT + if use_type1: + _vis_here = cpu_nufft2d_type1( + topo[0][chunk] * freq, + topo[1][chunk] * freq, apparent_coherency, - _uvw[0], - _uvw[1], - _uvw[2], + n_modes=type1_n_modes, + index=bls_here, eps=eps, n_threads=n_threads, upsample_factor=upsample_factor, ) - - # Flip visibilities for flipped baselines. This is equivalent to conjugating the beam for the flipped antenna, - # which is what we do for the beam evaluation, but since the beam evaluation is done separately for each antenna, - # we have to do the flipping at the end when we combine them. - _vis_here = np.where(flipped, np.conj(_vis_here), _vis_here) - - nbls_here = len(bls_idxs) - vis[time_index, bls_idxs, ..., freqidx] = np.swapaxes( - _vis_here.reshape(nfeeds, nfeeds, nbls_here), 2, 0 - ) + else: + if is_coplanar: + _vis_here = cpu_nufft2d( + topo[0][chunk], + topo[1][chunk], + apparent_coherency, + _uvw[0], + _uvw[1], + eps=eps, + n_threads=n_threads, + upsample_factor=upsample_factor, + ) + else: + _vis_here = cpu_nufft3d( + topo[0][chunk], + topo[1][chunk], + topo[2][chunk], + apparent_coherency, + _uvw[0], + _uvw[1], + _uvw[2], + eps=eps, + n_threads=n_threads, + upsample_factor=upsample_factor, + ) + + # Flip visibilities for flipped baselines. This is equivalent to conjugating the beam for the flipped antenna, + # which is what we do for the beam evaluation, but since the beam evaluation is done separately for each antenna, + # we have to do the flipping at the end when we combine them. + _vis_here = np.where(flipped, np.conj(_vis_here), _vis_here) + + nbls_here = len(bls_idxs) + vis[time_index, bls_idxs, ..., freqidx] += np.swapaxes( + _vis_here.reshape(nfeeds, nfeeds, nbls_here), 2, 0 + ) return vis diff --git a/src/fftvis/wrapper.py b/src/fftvis/wrapper.py index addf3ca..ff5243d 100644 --- a/src/fftvis/wrapper.py +++ b/src/fftvis/wrapper.py @@ -110,6 +110,7 @@ def simulate_vis( force_use_ray: bool = False, trace_mem: bool = False, backend: Literal["cpu", "gpu"] = "cpu", + nchunks: int = 1, ) -> np.ndarray: """ Parameters: @@ -205,6 +206,9 @@ def simulate_vis( Whether to show progress bar during simulation. backend : str Backend to use for simulation ("cpu" or "gpu"). + nchunks : int, default = 1 + Number of chunks to split the sources into for simulation. This can be used to reduce memory usage at the cost of increased computation time. + The optimal number of chunks will depend on the number of sources, the available memory, and the desired accuracy. If the number of sources is very large, using more chunks can help to reduce memory usage. However, using too many chunks can increase computation time due to overhead from combining the results from each chunk. The default value is 1, which means that all sources will be simulated in a single chunk. Returns: ------- @@ -289,4 +293,5 @@ def simulate_vis( force_use_type3=force_use_type3, force_use_ray=force_use_ray, trace_mem=trace_mem, + nchunks=nchunks, ) From 2deb7857524036a28b3889a4f4a7a4ba966e5111 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Sun, 3 May 2026 12:17:38 -0700 Subject: [PATCH 15/32] add chunking to the simulation --- src/fftvis/cpu/cpu_simulate.py | 91 ++++++++++++++++++---------------- src/fftvis/wrapper.py | 6 ++- 2 files changed, 52 insertions(+), 45 deletions(-) diff --git a/src/fftvis/cpu/cpu_simulate.py b/src/fftvis/cpu/cpu_simulate.py index fe5d849..f3aca00 100644 --- a/src/fftvis/cpu/cpu_simulate.py +++ b/src/fftvis/cpu/cpu_simulate.py @@ -131,6 +131,7 @@ def simulate( trace_mem: bool = False, enable_memory_monitor: bool = False, nchunks: int = 1, + source_buffer=1.0 ) -> np.ndarray: """ Simulate visibilities using CPU implementation. @@ -236,6 +237,8 @@ def simulate( if isinstance(times, np.ndarray): times = Time(times, format="jd") + chunk_size = int(np.ceil(dec.size / nchunks)) + coord_method = CoordinateRotation._methods[coord_method] coord_method_params = coord_method_params or {} coord_mgr = coord_method( @@ -244,6 +247,8 @@ def simulate( telescope_loc=telescope_loc, skycoords=SkyCoord(ra=ra * un.rad, dec=dec * un.rad, frame="icrs"), precision=precision, + source_buffer=source_buffer, + chunk_size=chunk_size, **coord_method_params, ) @@ -468,48 +473,46 @@ def _evaluate_vis_chunk( with threadpool_limits(limits=n_threads, user_api="blas"): for time_index, ti in enumerate(range(ntimes)[time_idx]): coord_mgr.rotate(ti) - topo, flux, nsim_sources = coord_mgr.select_chunk(0, ti) - - # truncate to nsim_sources - topo = topo[:, :nsim_sources] - flux = flux[:nsim_sources] - if nsim_sources == 0: - continue + for chunk in range(nchunks): + topo, flux, nsim_sources = coord_mgr.select_chunk(chunk, ti) - # Compute azimuth and zenith angles - az, za = coordinates.enu_to_az_za( - enu_e=topo[0], enu_n=topo[1], orientation="uvbeam" - ) + # truncate to nsim_sources + topo = topo[:, :nsim_sources] + flux = flux[:nsim_sources] - # Rotate source coordinates with rotation matrix. - if not is_rotation_identity: - cpu_utils.inplace_rot(rotation_matrix, topo) - - # Rotate the basis matrix - if basis_matrix is not None: - # Rotate antenna positions with basis matrix - cpu_utils.inplace_rot(basis_matrix.T, topo) + if nsim_sources == 0: + continue - topo *= 2 * np.pi - chunks = np.array_split(range(nsim_sources), nchunks) + # Compute azimuth and zenith angles + az, za = coordinates.enu_to_az_za( + enu_e=topo[0], enu_n=topo[1], orientation="uvbeam" + ) - for freqidx in range(nfreqs)[freq_idx]: - freq = freqs[freqidx] + # Rotate source coordinates with rotation matrix. + if not is_rotation_identity: + cpu_utils.inplace_rot(rotation_matrix, topo) + + # Rotate the basis matrix + if basis_matrix is not None: + # Rotate antenna positions with basis matrix + cpu_utils.inplace_rot(basis_matrix.T, topo) - if not use_type1: - uvw = bls * freq + topo *= 2 * np.pi + + for freqidx in range(nfreqs)[freq_idx]: + freq = freqs[freqidx] - + if not use_type1: + uvw = bls * freq - for chunk in chunks: # Interpolate each beam at the source positions beam_evaluations = [] for beam in beam_list: apparent_coherency = _cpu_beam_evaluator.evaluate_beam( beam, - az[chunk], - za[chunk], + az, + za, polarized, freq, spline_opts=beam_spline_opts, @@ -542,7 +545,7 @@ def _evaluate_vis_chunk( _cpu_beam_evaluator.get_apparent_flux_polarized_pair( beam_i=np.flip(beam_evaluations[bi], axis=0), beam_j=np.flip(beam_evaluations[bj], axis=0), - coherency=np.transpose(flux[chunk, freqidx], (1, 2, 0)), + coherency=np.transpose(flux[:, freqidx], (1, 2, 0)), out=apparent_coherency ) else: @@ -551,7 +554,7 @@ def _evaluate_vis_chunk( # Compute the polarized apparent flux _cpu_beam_evaluator.get_apparent_flux_polarized( - apparent_coherency, np.transpose(flux[chunk, freqidx], (1, 2, 0)) + apparent_coherency, np.transpose(flux[:, freqidx], (1, 2, 0)) ) elif polarized: logger.info( @@ -566,13 +569,13 @@ def _evaluate_vis_chunk( _cpu_beam_evaluator.get_apparent_flux_polarized_beam_pair( beam_i=beam_evaluations[bi], beam_j=beam_evaluations[bj], - flux=flux[chunk, freqidx], + flux=flux[:, freqidx], out=apparent_coherency ) else: apparent_coherency = np.copy(beam_evaluations[bi]) _cpu_beam_evaluator.get_apparent_flux_polarized_beam( - apparent_coherency, flux[chunk, freqidx] + apparent_coherency, flux[:, freqidx] ) else: logger.info( @@ -581,18 +584,18 @@ def _evaluate_vis_chunk( ) if is_cross_pair: apparent_coherency = np.sqrt(beam_evaluations[bi] * beam_evaluations[bj]) - apparent_coherency *= flux[chunk, freqidx] + apparent_coherency *= flux[:, freqidx] else: - apparent_coherency = beam_evaluations[bi] * flux[chunk, freqidx] + apparent_coherency = beam_evaluations[bi] * flux[:, freqidx] # Try to reshape safely try: apparent_coherency = np.reshape( - apparent_coherency, (nfeeds**2, chunk.size) + apparent_coherency, (nfeeds**2, nsim_sources) ) pass except ValueError: # pragma: no cover - logger.error(f"Cannot reshape A_s with shape {apparent_coherency.shape} to {(nfeeds**2, chunk.size)}") # pragma: no cover + logger.error(f"Cannot reshape A_s with shape {apparent_coherency.shape} to {(nfeeds**2, nsim_sources)}") # pragma: no cover continue # pragma: no cover # Check if the dtype is complex @@ -602,8 +605,8 @@ def _evaluate_vis_chunk( # Compute visibilities w/ non-uniform FFT if use_type1: _vis_here = cpu_nufft2d_type1( - topo[0][chunk] * freq, - topo[1][chunk] * freq, + topo[0] * freq, + topo[1] * freq, apparent_coherency, n_modes=type1_n_modes, index=bls_here, @@ -614,8 +617,8 @@ def _evaluate_vis_chunk( else: if is_coplanar: _vis_here = cpu_nufft2d( - topo[0][chunk], - topo[1][chunk], + topo[0], + topo[1], apparent_coherency, _uvw[0], _uvw[1], @@ -625,9 +628,9 @@ def _evaluate_vis_chunk( ) else: _vis_here = cpu_nufft3d( - topo[0][chunk], - topo[1][chunk], - topo[2][chunk], + topo[0], + topo[1], + topo[2], apparent_coherency, _uvw[0], _uvw[1], diff --git a/src/fftvis/wrapper.py b/src/fftvis/wrapper.py index ff5243d..1e40b48 100644 --- a/src/fftvis/wrapper.py +++ b/src/fftvis/wrapper.py @@ -111,6 +111,7 @@ def simulate_vis( trace_mem: bool = False, backend: Literal["cpu", "gpu"] = "cpu", nchunks: int = 1, + source_buffer=1.0, ) -> np.ndarray: """ Parameters: @@ -208,7 +209,9 @@ def simulate_vis( Backend to use for simulation ("cpu" or "gpu"). nchunks : int, default = 1 Number of chunks to split the sources into for simulation. This can be used to reduce memory usage at the cost of increased computation time. - The optimal number of chunks will depend on the number of sources, the available memory, and the desired accuracy. If the number of sources is very large, using more chunks can help to reduce memory usage. However, using too many chunks can increase computation time due to overhead from combining the results from each chunk. The default value is 1, which means that all sources will be simulated in a single chunk. + The optimal number of chunks will depend on the number of sources, the available memory, and the desired accuracy. If the number of sources is very large, + using more chunks can help to reduce memory usage. However, using too many chunks can increase computation time due to overhead from combining the results from each chunk. + The default value is 1, which means that all sources will be simulated in a single chunk. Returns: ------- @@ -294,4 +297,5 @@ def simulate_vis( force_use_ray=force_use_ray, trace_mem=trace_mem, nchunks=nchunks, + source_buffer=source_buffer, ) From 77f65fda81e625f6ac9bac5aafe8d773c17cf4cb Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Sun, 3 May 2026 22:13:56 -0700 Subject: [PATCH 16/32] make some changes to reduce the memory --- src/fftvis/cpu/cpu_simulate.py | 75 +++++++++++++++++++++++++++------- 1 file changed, 61 insertions(+), 14 deletions(-) diff --git a/src/fftvis/cpu/cpu_simulate.py b/src/fftvis/cpu/cpu_simulate.py index f3aca00..4332437 100644 --- a/src/fftvis/cpu/cpu_simulate.py +++ b/src/fftvis/cpu/cpu_simulate.py @@ -484,6 +484,16 @@ def _evaluate_vis_chunk( if nsim_sources == 0: continue + # Pre-allocate a single work buffer for apparent_coherency that is + # reused across all (freq, beam-pair) iterations within this chunk. + # nsim_sources is fixed for the duration of the chunk, making this safe. + if polarized: + _apparent_buf = np.empty( + (nfeeds, nfeeds, nsim_sources), dtype=complex_dtype + ) + else: + _apparent_buf = np.empty(nsim_sources, dtype=complex_dtype) + # Compute azimuth and zenith angles az, za = coordinates.enu_to_az_za( enu_e=topo[0], enu_n=topo[1], orientation="uvbeam" @@ -506,10 +516,13 @@ def _evaluate_vis_chunk( if not use_type1: uvw = bls * freq - # Interpolate each beam at the source positions + # Interpolate each beam at the source positions. Only copy to + # complex_dtype when necessary; the unconditional .astype() would + # allocate a full (nfeeds, nfeeds, nsrc) copy even when the dtype + # already matches. beam_evaluations = [] for beam in beam_list: - apparent_coherency = _cpu_beam_evaluator.evaluate_beam( + be = _cpu_beam_evaluator.evaluate_beam( beam, az, za, @@ -517,8 +530,18 @@ def _evaluate_vis_chunk( freq, spline_opts=beam_spline_opts, interpolation_function=interpolation_function, - ).astype(complex_dtype) - beam_evaluations.append(apparent_coherency) + ) + beam_evaluations.append( + be if be.dtype == complex_dtype else be.astype(complex_dtype) + ) + + # Pre-compute frequency-scaled source coordinates once per + # frequency. topo and freq are identical for every beam pair, + # so computing these inside the pair loop would allocate two + # redundant (nsrc,) arrays per pair. + if use_type1: + tx = topo[0] * freq + ty = topo[1] * freq for (bi, bj) in unique_beam_pairs: is_cross_pair = bi != bj @@ -539,7 +562,10 @@ def _evaluate_vis_chunk( ) if is_cross_pair: - apparent_coherency = np.zeros_like(beam_evaluations[bi]) + # Zero the pre-allocated buffer in-place rather than + # allocating a fresh (nfeeds, nfeeds, nsrc) array. + _apparent_buf[:] = 0 + apparent_coherency = _apparent_buf # Compute the polarized apparent flux _cpu_beam_evaluator.get_apparent_flux_polarized_pair( @@ -549,8 +575,10 @@ def _evaluate_vis_chunk( out=apparent_coherency ) else: - # Flip to match the expected order - apparent_coherency = np.flip(np.copy(beam_evaluations[bi]), axis=0) + # Copy into the pre-allocated buffer then take a free + # flip view, avoiding a separate np.copy allocation. + np.copyto(_apparent_buf, beam_evaluations[bi]) + apparent_coherency = np.flip(_apparent_buf, axis=0) # Compute the polarized apparent flux _cpu_beam_evaluator.get_apparent_flux_polarized( @@ -564,7 +592,9 @@ def _evaluate_vis_chunk( if is_cross_pair: logger.info("Processing cross pair") - apparent_coherency = np.zeros_like(beam_evaluations[bi]) + # Zero the pre-allocated buffer in-place. + _apparent_buf[:] = 0 + apparent_coherency = _apparent_buf _cpu_beam_evaluator.get_apparent_flux_polarized_beam_pair( beam_i=beam_evaluations[bi], @@ -573,7 +603,11 @@ def _evaluate_vis_chunk( out=apparent_coherency ) else: - apparent_coherency = np.copy(beam_evaluations[bi]) + # Copy into the pre-allocated buffer so the in-place + # numba kernel can mutate it without touching the + # cached beam_evaluations entry. + np.copyto(_apparent_buf, beam_evaluations[bi]) + apparent_coherency = _apparent_buf _cpu_beam_evaluator.get_apparent_flux_polarized_beam( apparent_coherency, flux[:, freqidx] ) @@ -583,10 +617,23 @@ def _evaluate_vis_chunk( "Computing apparent flux for unpolarized sources." ) if is_cross_pair: - apparent_coherency = np.sqrt(beam_evaluations[bi] * beam_evaluations[bj]) - apparent_coherency *= flux[:, freqidx] + # Compute product and sqrt into the pre-allocated buffer, + # avoiding two intermediate (nsrc,) temporaries. + np.multiply( + beam_evaluations[bi], beam_evaluations[bj], + out=_apparent_buf, + ) + np.sqrt(_apparent_buf, out=_apparent_buf) + _apparent_buf *= flux[:, freqidx] + apparent_coherency = _apparent_buf else: - apparent_coherency = beam_evaluations[bi] * flux[:, freqidx] + # Multiply into the pre-allocated buffer instead of + # creating a new (nsrc,) array. + np.multiply( + beam_evaluations[bi], flux[:, freqidx], + out=_apparent_buf, + ) + apparent_coherency = _apparent_buf # Try to reshape safely try: @@ -605,8 +652,8 @@ def _evaluate_vis_chunk( # Compute visibilities w/ non-uniform FFT if use_type1: _vis_here = cpu_nufft2d_type1( - topo[0] * freq, - topo[1] * freq, + tx, + ty, apparent_coherency, n_modes=type1_n_modes, index=bls_here, From fbbad458f96f1dabdd4a11de5421bf8f8d2427bc Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Sun, 3 May 2026 22:25:10 -0700 Subject: [PATCH 17/32] change source buffer arg --- tests/test_cpu_simulate.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_cpu_simulate.py b/tests/test_cpu_simulate.py index 6e33590..e5a865f 100644 --- a/tests/test_cpu_simulate.py +++ b/tests/test_cpu_simulate.py @@ -153,7 +153,7 @@ def test_simulate( baselines=sim_baselines, precision=precision, nprocesses=nprocesses, - coord_method_params={"source_buffer": 0.75}, + source_buffer=0.75, trace_mem=False, beam=beam, times=times, @@ -167,7 +167,7 @@ def test_simulate( eps=1e-10 if precision == 2 else 6e-8, precision=precision, nprocesses=nprocesses, - coord_method_params={"source_buffer": 0.75}, + source_buffer=0.75, trace_mem=False, beam=beam, times=times, @@ -244,7 +244,7 @@ def test_simulate_gridded_type1_vs_type3(polarized, precision, shear_array, rota ants=ants, eps=1e-10 if precision == 2 else 6e-8, precision=precision, - coord_method_params={"source_buffer": 0.75}, + source_buffer=0.75, beam=beam, times=times, force_use_type3=False, @@ -257,7 +257,7 @@ def test_simulate_gridded_type1_vs_type3(polarized, precision, shear_array, rota ants=ants, eps=1e-10 if precision == 2 else 6e-8, precision=precision, - coord_method_params={"source_buffer": 0.75}, + source_buffer=0.75, beam=beam, times=times, force_use_type3=True, @@ -366,7 +366,7 @@ def test_sim_multiple_beams(use_analytic_beam, polarized, precision): eps=1e-10 if precision == 2 else 6e-8, baselines=sim_baselines, precision=precision, - coord_method_params={"source_buffer": 0.75}, + source_buffer=0.75, beam_idx=beam_idx, times=times, backend="cpu", From 13f14b42b6d2a6f571b3546d3ea37200912a4e0b Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Sun, 3 May 2026 22:36:39 -0700 Subject: [PATCH 18/32] fix adjustments to beams in tests --- tests/test_cpu_simulate.py | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/tests/test_cpu_simulate.py b/tests/test_cpu_simulate.py index e5a865f..bf96b9c 100644 --- a/tests/test_cpu_simulate.py +++ b/tests/test_cpu_simulate.py @@ -300,32 +300,15 @@ def test_sim_multiple_beams(use_analytic_beam, polarized, precision): # Build a second beam that is genuinely different from beam0. # Strategy depends on beam type so we stay robust across both parametrise branches. - if isinstance(beam0, UVBeam): - beam1 = beam0.copy() - beam1.data_array *= 0.5 - elif isinstance(beam0, BeamInterface) and beam0._isuvbeam: + if isinstance(beam0, BeamInterface) and beam0._isuvbeam: # Unwrap, copy, scale, and rewrap raw = beam0.beam.copy() raw.data_array *= 0.5 beam1 = raw # wrapper adds it back inside simulate_vis - elif hasattr(beam0, "diameter"): # e.g. AiryBeam - beam1 = type(beam0)(diameter=beam0.diameter * 0.75) - elif hasattr(beam0, "sigma"): # e.g. GaussianBeam - beam1 = type(beam0)(sigma=beam0.sigma * 0.75) + elif isinstance(beam0, BeamInterface) and not beam0._isuvbeam: # e.g. AiryBeam + beam1 = BeamInterface(type(beam0.beam)(diameter=beam0.beam.diameter * 0.75)) else: - # Unknown analytic type: fall back to a scaled UVBeam from the test data - beam1 = UVBeam() - beam1.read_cst_beam( - str(TEST_DIR / "data" / "HERA_NicCST_150MHz.txt"), - frequency=[150e6], - telescope_name="HERA", - feed_name="Dipole", - feed_version="1.0", - feed_pol=["x"], - model_name="Test", - model_version="1.0", - ) - beam1.data_array *= 0.5 + raise ValueError("Unrecognized beam") identical_beam_list = [beam0, beam0] different_beam_list = [beam0, beam1] From effe7f666d8bd0553345a53376411ff836b18401 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Sun, 3 May 2026 22:41:06 -0700 Subject: [PATCH 19/32] fix tests for ubuntu --- tests/test_cpu_simulate.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_cpu_simulate.py b/tests/test_cpu_simulate.py index bf96b9c..ce190cc 100644 --- a/tests/test_cpu_simulate.py +++ b/tests/test_cpu_simulate.py @@ -1212,6 +1212,8 @@ def test_chunk_eval_trace_mem(tmp_path): beam_list=[beam], coord_mgr=coord_mgr, rotation_matrix=rotation_matrix, + antnums=list(ants.keys()), + baselines=[(0, 1),], bls=bls, freqs=freqs, complex_dtype=np.complex128, From 4a1eaa3c66fd3c389a69ecf408646d33705ebc33 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Sun, 3 May 2026 23:08:45 -0700 Subject: [PATCH 20/32] test wrapper errors with multiple beams --- tests/test_wrapper.py | 49 ++++++++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 6215e74..dd6a605 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -106,24 +106,39 @@ def test_simulate_vis_basic(): # Ensure we get non-zero values assert np.any(vis != 0) - - # Test the polarized case too - disabled for now as it's causing issues - # vis_pol = simulate_vis( - # ants=ants, - # fluxes=fluxes, - # ra=ra, - # dec=dec, - # freqs=freqs, - # times=time_array, - # beam=beam, - # telescope_loc=telescope_loc, - # baselines=baselines, - # polarized=True, - # ) - - # Check output shape for polarized - # assert vis_pol.shape == (nfreqs, ntimes, 2, 2, len(baselines)) + params = dict( + ants=ants, + fluxes=fluxes, + ra=ra, + dec=dec, + freqs=freqs, + times=time_array, # Using array of Julian dates + telescope_loc=telescope_loc, + baselines=baselines, + polarized=False, + ) + + + with pytest.raises(ValueError, match="If number of beams provided is not 1 or nant, beam_idx must be provided."): + vis = simulate_vis( + beam=[beam, beam], # Provide 2 beams for 3 antennas to trigger error + **params, + ) + + with pytest.raises(ValueError, match="beam_idx must be length nant"): + vis = simulate_vis( + beam=[beam, beam], + beam_idx=[0, 1], # Provide 2 beam indices for 3 antennas to trigger error + **params, + ) + + with pytest.raises(ValueError, match="beam_idx contains indices greater than the number of beams"): + vis = simulate_vis( + beam=[beam, beam], + beam_idx=[0, 1, 8], + **params, + ) def test_simulate_vis_all_baselines(): """Test simulate_vis with all baselines. From fe30b29956bb49163f8d1d5e9c4a6d5bcd012ab3 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Sun, 3 May 2026 23:22:28 -0700 Subject: [PATCH 21/32] add better coverage to new beam methods --- tests/test_cpu_beams.py | 297 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 297 insertions(+) diff --git a/tests/test_cpu_beams.py b/tests/test_cpu_beams.py index 9f9693f..9fc58a6 100644 --- a/tests/test_cpu_beams.py +++ b/tests/test_cpu_beams.py @@ -699,3 +699,300 @@ def test_evaluate_beam_additional_paths(): ) assert not np.isnan(result_simple).any() + + +# =========================================================================== +# prepare_beam_evaluation +# =========================================================================== + +class TestPrepareBeamEvaluation: + """Tests for CPUBeamEvaluator.prepare_beam_evaluation.""" + + # ----------------------------------------------------------------------- + # beam_idx is None → trivial single-beam result + # ----------------------------------------------------------------------- + + def test_none_beam_idx_returns_single_pair(self): + antnums = [0, 1, 2] + baselines = [(0, 1), (1, 2), (0, 2)] + unique_pairs, _, _ = CPUBeamEvaluator.prepare_beam_evaluation( + antnums, baselines, beam_idx=None + ) + assert unique_pairs == [(0, 0)] + + def test_none_beam_idx_maps_all_baselines(self): + antnums = [0, 1, 2] + baselines = [(0, 1), (1, 2), (0, 2)] + _, pair_to_idxs, _ = CPUBeamEvaluator.prepare_beam_evaluation( + antnums, baselines, beam_idx=None + ) + np.testing.assert_array_equal(pair_to_idxs[(0, 0)], np.arange(len(baselines))) + + def test_none_beam_idx_no_flipped(self): + antnums = [0, 1, 2] + baselines = [(0, 1), (1, 2), (0, 2)] + _, _, pair_to_flipped = CPUBeamEvaluator.prepare_beam_evaluation( + antnums, baselines, beam_idx=None + ) + assert pair_to_flipped[(0, 0)] == [False, False, False] + + # ----------------------------------------------------------------------- + # Single beam type → only pair (0, 0) via the real logic path + # ----------------------------------------------------------------------- + + def test_single_beam_type(self): + antnums = [0, 1, 2] + beam_idx = [0, 0, 0] + baselines = [(0, 1), (1, 2), (0, 2)] + unique_pairs, pair_to_idxs, pair_to_flipped = ( + CPUBeamEvaluator.prepare_beam_evaluation(antnums, baselines, beam_idx) + ) + assert unique_pairs == [(0, 0)] + assert list(pair_to_idxs[(0, 0)]) == [0, 1, 2] + assert pair_to_flipped[(0, 0)] == [False, False, False] + + # ----------------------------------------------------------------------- + # Two beam types → pairs (0,0), (0,1), (1,1) + # ----------------------------------------------------------------------- + + def test_two_beam_types_unique_pairs(self): + antnums = [0, 1] + beam_idx = [0, 1] + baselines = [(0, 1)] + unique_pairs, _, _ = CPUBeamEvaluator.prepare_beam_evaluation( + antnums, baselines, beam_idx + ) + assert set(unique_pairs) == {(0, 0), (0, 1), (1, 1)} + + def test_two_beam_types_baseline_routing(self): + antnums = [0, 1] + beam_idx = [0, 1] + baselines = [(0, 1)] # beam pair (0, 1) — not flipped + _, pair_to_idxs, pair_to_flipped = CPUBeamEvaluator.prepare_beam_evaluation( + antnums, baselines, beam_idx + ) + assert pair_to_idxs[(0, 1)] == [0] + assert pair_to_flipped[(0, 1)] == [False] + + def test_flipped_baseline_detected(self): + """A baseline (ant_j, ant_i) where beam_j < beam_i should be recorded as flipped.""" + antnums = [0, 1] + beam_idx = [0, 1] + # Baseline given as (ant1, ant0) → beam pair (1, 0) → stored as (0, 1) flipped + baselines = [(1, 0)] + _, pair_to_idxs, pair_to_flipped = CPUBeamEvaluator.prepare_beam_evaluation( + antnums, baselines, beam_idx + ) + assert pair_to_idxs[(0, 1)] == [0] + assert pair_to_flipped[(0, 1)] == [True] + + def test_mixed_flipped_and_not_flipped(self): + antnums = [0, 1] + beam_idx = [0, 1] + baselines = [(0, 1), (1, 0)] + _, pair_to_idxs, pair_to_flipped = CPUBeamEvaluator.prepare_beam_evaluation( + antnums, baselines, beam_idx + ) + assert pair_to_idxs[(0, 1)] == [0, 1] + assert pair_to_flipped[(0, 1)] == [False, True] + + def test_multiple_baselines_same_pair(self): + """Several baselines all sharing the same beam pair.""" + antnums = [0, 1, 2, 3] + beam_idx = [0, 0, 1, 1] + baselines = [(0, 2), (0, 3), (1, 2), (1, 3)] + _, pair_to_idxs, pair_to_flipped = CPUBeamEvaluator.prepare_beam_evaluation( + antnums, baselines, beam_idx + ) + assert sorted(pair_to_idxs[(0, 1)]) == [0, 1, 2, 3] + assert pair_to_flipped[(0, 1)] == [False, False, False, False] + + def test_empty_baselines(self): + antnums = [0, 1] + beam_idx = [0, 1] + baselines = [] + unique_pairs, pair_to_idxs, pair_to_flipped = ( + CPUBeamEvaluator.prepare_beam_evaluation(antnums, baselines, beam_idx) + ) + for bp in unique_pairs: + assert pair_to_idxs[bp] == [] + assert pair_to_flipped[bp] == [] + + def test_three_beam_types_pair_count(self): + """3 beam types → 6 unique upper-triangle pairs.""" + antnums = [0, 1, 2] + beam_idx = [0, 1, 2] + baselines = [(0, 1), (0, 2), (1, 2)] + unique_pairs, _, _ = CPUBeamEvaluator.prepare_beam_evaluation( + antnums, baselines, beam_idx + ) + assert len(unique_pairs) == 6 # (0,0),(0,1),(0,2),(1,1),(1,2),(2,2) + + +# =========================================================================== +# get_apparent_flux_polarized_beam_pair +# =========================================================================== + +class TestGetApparentFluxPolarizedBeamPair: + """Tests for CPUBeamEvaluator.get_apparent_flux_polarized_beam_pair. + + Computes out[a, p, s] = sum_b conj(A_i[b,a,s]) * flux[s] * A_j[b,p,s], + i.e. einsum("bas,s,bps->aps", beam_i.conj(), flux, beam_j). + """ + + @staticmethod + def _reference(beam_i, beam_j, flux): + return np.einsum("bas,s,bps->aps", beam_i.conj(), flux, beam_j) + + def _run(self, beam_i, beam_j, flux): + out = np.zeros_like(beam_i) + CPUBeamEvaluator.get_apparent_flux_polarized_beam_pair(beam_i, beam_j, flux, out) + return out + + def test_identical_beams_matches_single_beam_method(self): + """When beam_i == beam_j the result should equal get_apparent_flux_polarized_beam.""" + rng = np.random.default_rng(0) + beam = rng.standard_normal((2, 2, 5)) + 1j * rng.standard_normal((2, 2, 5)) + flux = rng.standard_normal(5) + + out_pair = self._run(beam.copy(), beam.copy(), flux.copy()) + + beam_single = beam.copy() + CPUBeamEvaluator.get_apparent_flux_polarized_beam(beam_single, flux.copy()) + + np.testing.assert_allclose(out_pair, beam_single, rtol=1e-12) + + def test_different_beams_match_einsum(self): + rng = np.random.default_rng(1) + beam_i = rng.standard_normal((2, 2, 8)) + 1j * rng.standard_normal((2, 2, 8)) + beam_j = rng.standard_normal((2, 2, 8)) + 1j * rng.standard_normal((2, 2, 8)) + flux = rng.standard_normal(8) + + out = self._run(beam_i.copy(), beam_j.copy(), flux.copy()) + np.testing.assert_allclose(out, self._reference(beam_i, beam_j, flux), rtol=1e-12) + + def test_zero_flux_gives_zero_output(self): + rng = np.random.default_rng(2) + beam_i = rng.standard_normal((2, 2, 4)) + 1j * rng.standard_normal((2, 2, 4)) + beam_j = rng.standard_normal((2, 2, 4)) + 1j * rng.standard_normal((2, 2, 4)) + out = self._run(beam_i.copy(), beam_j.copy(), np.zeros(4)) + np.testing.assert_allclose(out, 0.0) + + def test_identity_beams_unit_flux_gives_identity(self): + nsrc = 3 + beam_i = np.zeros((2, 2, nsrc), dtype=complex) + beam_j = np.zeros((2, 2, nsrc), dtype=complex) + for s in range(nsrc): + beam_i[0, 0, s] = beam_i[1, 1, s] = 1.0 + beam_j[0, 0, s] = beam_j[1, 1, s] = 1.0 + + out = self._run(beam_i, beam_j, np.ones(nsrc)) + + expected = np.zeros((2, 2, nsrc), dtype=complex) + expected[0, 0, :] = expected[1, 1, :] = 1.0 + np.testing.assert_allclose(out, expected) + + def test_single_source(self): + rng = np.random.default_rng(3) + beam_i = rng.standard_normal((2, 2, 1)) + 1j * rng.standard_normal((2, 2, 1)) + beam_j = rng.standard_normal((2, 2, 1)) + 1j * rng.standard_normal((2, 2, 1)) + flux = rng.standard_normal(1) + + out = self._run(beam_i.copy(), beam_j.copy(), flux.copy()) + np.testing.assert_allclose(out, self._reference(beam_i, beam_j, flux), rtol=1e-12) + + def test_result_is_not_hermitian_for_distinct_beams(self): + """For different beams, out[0,1] != conj(out[1,0]) in general.""" + rng = np.random.default_rng(4) + beam_i = rng.standard_normal((2, 2, 6)) + 1j * rng.standard_normal((2, 2, 6)) + beam_j = rng.standard_normal((2, 2, 6)) + 1j * rng.standard_normal((2, 2, 6)) + flux = np.abs(rng.standard_normal(6)) + + out = self._run(beam_i.copy(), beam_j.copy(), flux.copy()) + assert not np.allclose(out[0, 1], np.conj(out[1, 0])) + + +# =========================================================================== +# get_apparent_flux_polarized_pair +# =========================================================================== + +class TestGetApparentFluxPolarizedPair: + """Tests for CPUBeamEvaluator.get_apparent_flux_polarized_pair. + + Computes out = A_i^H @ C @ A_j per source, i.e. + out[a,p,s] = einsum("bas,bks,kps->aps", beam_i.conj(), coherency, beam_j). + """ + + @staticmethod + def _reference(beam_i, beam_j, coherency): + return np.einsum("bas,bks,kps->aps", beam_i.conj(), coherency, beam_j) + + def _run(self, beam_i, beam_j, coherency): + out = np.zeros_like(beam_i) + CPUBeamEvaluator.get_apparent_flux_polarized_pair(beam_i, beam_j, coherency, out) + return out + + def test_identity_beams_recover_coherency(self): + """With identity Jones matrices, out should equal the coherency.""" + nsrc = 4 + beam = np.zeros((2, 2, nsrc), dtype=complex) + for s in range(nsrc): + beam[0, 0, s] = beam[1, 1, s] = 1.0 + + rng = np.random.default_rng(10) + coh = rng.standard_normal((2, 2, nsrc)) + 1j * rng.standard_normal((2, 2, nsrc)) + + out = self._run(beam.copy(), beam.copy(), coh.copy()) + np.testing.assert_allclose(out, coh, rtol=1e-12) + + def test_random_beams_match_einsum(self): + rng = np.random.default_rng(11) + beam_i = rng.standard_normal((2, 2, 7)) + 1j * rng.standard_normal((2, 2, 7)) + beam_j = rng.standard_normal((2, 2, 7)) + 1j * rng.standard_normal((2, 2, 7)) + coh = rng.standard_normal((2, 2, 7)) + 1j * rng.standard_normal((2, 2, 7)) + + out = self._run(beam_i.copy(), beam_j.copy(), coh.copy()) + np.testing.assert_allclose(out, self._reference(beam_i, beam_j, coh), rtol=1e-12) + + def test_same_beam_matches_get_apparent_flux_polarized(self): + """When beam_i == beam_j, must agree with the single-beam polarized method.""" + rng = np.random.default_rng(12) + beam = rng.standard_normal((2, 2, 5)) + 1j * rng.standard_normal((2, 2, 5)) + coh = rng.standard_normal((2, 2, 5)) + 1j * rng.standard_normal((2, 2, 5)) + + out_pair = self._run(beam.copy(), beam.copy(), coh.copy()) + + beam_single = beam.copy() + CPUBeamEvaluator.get_apparent_flux_polarized(beam_single, coh.copy()) + + np.testing.assert_allclose(out_pair, beam_single, rtol=1e-12) + + def test_zero_coherency_gives_zero_output(self): + rng = np.random.default_rng(13) + beam_i = rng.standard_normal((2, 2, 6)) + 1j * rng.standard_normal((2, 2, 6)) + beam_j = rng.standard_normal((2, 2, 6)) + 1j * rng.standard_normal((2, 2, 6)) + out = self._run(beam_i.copy(), beam_j.copy(), np.zeros((2, 2, 6), dtype=complex)) + np.testing.assert_allclose(out, 0.0) + + def test_single_source(self): + rng = np.random.default_rng(14) + beam_i = rng.standard_normal((2, 2, 1)) + 1j * rng.standard_normal((2, 2, 1)) + beam_j = rng.standard_normal((2, 2, 1)) + 1j * rng.standard_normal((2, 2, 1)) + coh = rng.standard_normal((2, 2, 1)) + 1j * rng.standard_normal((2, 2, 1)) + + out = self._run(beam_i.copy(), beam_j.copy(), coh.copy()) + np.testing.assert_allclose(out, self._reference(beam_i, beam_j, coh), rtol=1e-12) + + def test_stokes_i_diagonal_coherency(self): + """Diagonal coherency (Stokes I only) should still match the reference einsum.""" + rng = np.random.default_rng(15) + nsrc = 5 + beam_i = rng.standard_normal((2, 2, nsrc)) + 1j * rng.standard_normal((2, 2, nsrc)) + beam_j = rng.standard_normal((2, 2, nsrc)) + 1j * rng.standard_normal((2, 2, nsrc)) + flux = rng.standard_normal(nsrc) + + coh = np.zeros((2, 2, nsrc), dtype=complex) + coh[0, 0, :] = coh[1, 1, :] = flux / 2 + + out = self._run(beam_i.copy(), beam_j.copy(), coh.copy()) + np.testing.assert_allclose(out, self._reference(beam_i, beam_j, coh), rtol=1e-12) \ No newline at end of file From dbd772ee2e5a1acba11538459352dd4e5b09fdfb Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Sun, 3 May 2026 23:26:49 -0700 Subject: [PATCH 22/32] test failure modes in wrapper --- tests/test_wrapper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index dd6a605..dfbbe75 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -129,14 +129,14 @@ def test_simulate_vis_basic(): with pytest.raises(ValueError, match="beam_idx must be length nant"): vis = simulate_vis( beam=[beam, beam], - beam_idx=[0, 1], # Provide 2 beam indices for 3 antennas to trigger error + beam_idx=np.array([0, 1]), # Provide 2 beam indices for 3 antennas to trigger error **params, ) with pytest.raises(ValueError, match="beam_idx contains indices greater than the number of beams"): vis = simulate_vis( beam=[beam, beam], - beam_idx=[0, 1, 8], + beam_idx=np.array([0, 1, 8]), **params, ) From 8ac952920a791c5ad1cf6713a4566889d7236a3d Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Sun, 3 May 2026 23:32:13 -0700 Subject: [PATCH 23/32] address coverage of cpu/beams.py --- src/fftvis/cpu/beams.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/fftvis/cpu/beams.py b/src/fftvis/cpu/beams.py index fa3feb9..cf157d6 100644 --- a/src/fftvis/cpu/beams.py +++ b/src/fftvis/cpu/beams.py @@ -141,7 +141,7 @@ def get_apparent_flux_polarized_beam(beam: np.ndarray, flux: np.ndarray): # pra @staticmethod @nb.jit(nopython=True, parallel=False, nogil=False) - def get_apparent_flux_polarized(beam, coherency): + def get_apparent_flux_polarized(beam, coherency): # pragma: no cover """ Calculate the apparent flux of the sources using the beam and coherency matrices. @@ -181,7 +181,7 @@ def get_apparent_flux_polarized_beam_pair( beam_j: np.ndarray, flux: np.ndarray, out: np.ndarray, - ): + ): # pragma: no cover """Compute A_i^H * diag(flux) * A_j for an unpolarized sky with two beams. Generalises get_apparent_flux_polarized_beam to the case where the two @@ -214,7 +214,7 @@ def get_apparent_flux_polarized_pair( beam_j: np.ndarray, coherency: np.ndarray, out: np.ndarray, - ): + ): # pragma: no cover """Compute A_i^H @ C @ A_j for a polarized sky with two beams. Generalises get_apparent_flux_polarized to the cross-beam case. From ec8797997350a57691eee5240bcb6bfda6ab6626 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Sun, 3 May 2026 23:41:56 -0700 Subject: [PATCH 24/32] add documentation --- src/fftvis/wrapper.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/fftvis/wrapper.py b/src/fftvis/wrapper.py index 1e40b48..f489b9d 100644 --- a/src/fftvis/wrapper.py +++ b/src/fftvis/wrapper.py @@ -212,6 +212,9 @@ def simulate_vis( The optimal number of chunks will depend on the number of sources, the available memory, and the desired accuracy. If the number of sources is very large, using more chunks can help to reduce memory usage. However, using too many chunks can increase computation time due to overhead from combining the results from each chunk. The default value is 1, which means that all sources will be simulated in a single chunk. + source_buffer : float, optional + The fraction of the total number of sources to use when allocating memory + for the sources above horizon. Returns: ------- From 769c9942585978454df5fecd9cfca1c96d69d3e7 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Mon, 4 May 2026 08:26:23 -0700 Subject: [PATCH 25/32] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- src/fftvis/wrapper.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/fftvis/wrapper.py b/src/fftvis/wrapper.py index f489b9d..4165be6 100644 --- a/src/fftvis/wrapper.py +++ b/src/fftvis/wrapper.py @@ -238,10 +238,13 @@ def simulate_vis( nant = len(ants) # Check the beam indices - if beam_idx is None and nbeam not in (1, nant): - raise ValueError( - "If number of beams provided is not 1 or nant, beam_idx must be provided." - ) + if beam_idx is None: + if nbeam == nant: + beam_idx = np.arange(nant) + elif nbeam != 1: + raise ValueError( + "If number of beams provided is not 1 or nant, beam_idx must be provided." + ) if beam_idx is not None: if beam_idx.shape != (nant,): raise ValueError("beam_idx must be length nant") From a4914ff148b5a71e803313ef8d8dfa5b9a794e60 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Mon, 4 May 2026 08:28:54 -0700 Subject: [PATCH 26/32] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- src/fftvis/cpu/cpu_simulate.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/fftvis/cpu/cpu_simulate.py b/src/fftvis/cpu/cpu_simulate.py index 4332437..7d110ea 100644 --- a/src/fftvis/cpu/cpu_simulate.py +++ b/src/fftvis/cpu/cpu_simulate.py @@ -273,6 +273,8 @@ def simulate( for beam in beam_list: if isinstance(beam, UVBeam): required_shm += beam.data_array.nbytes + elif isinstance(beam, BeamInterface) and getattr(beam, "_isuvbeam", False): + required_shm += beam.beam.data_array.nbytes # Add visibility memory required_shm += (ntimes * nfreqs * nbls * nax * nfeeds) * np.dtype( From 2449bbd925e55f453252b0d86731df897ef61fa2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 4 May 2026 15:30:14 +0000 Subject: [PATCH 27/32] Fix nbeams calculation for non-contiguous beam_idx in prepare_beam_evaluation Agent-Logs-Url: https://github.com/tyler-a-cox/fftvis/sessions/53224b26-52a0-4803-be07-4c1c70648550 Co-authored-by: tyler-a-cox <17678594+tyler-a-cox@users.noreply.github.com> --- src/fftvis/cpu/beams.py | 5 +++-- tests/test_cpu_beams.py | 25 +++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/fftvis/cpu/beams.py b/src/fftvis/cpu/beams.py index cf157d6..98773e2 100644 --- a/src/fftvis/cpu/beams.py +++ b/src/fftvis/cpu/beams.py @@ -96,8 +96,9 @@ def prepare_beam_evaluation(antnums, baselines, beam_idx): if beam_idx is None: return [(0, 0)], {(0, 0): np.arange(len(baselines))}, {(0, 0): [False] * len(baselines)} - # Get number of unique beams - nbeams = len(np.unique(beam_idx)) + # Get number of beams: use max index + 1 so non-contiguous indices + # (e.g. [0, 2, 2]) still produce the correct set of beam pairs. + nbeams = int(np.max(beam_idx)) + 1 # Get the unique beam pairs that we need to evaluate the apparent flux for unique_beam_pairs = [(bi, bj) for bi in range(nbeams) for bj in range(bi, nbeams)] diff --git a/tests/test_cpu_beams.py b/tests/test_cpu_beams.py index 9fc58a6..4085746 100644 --- a/tests/test_cpu_beams.py +++ b/tests/test_cpu_beams.py @@ -828,6 +828,31 @@ def test_three_beam_types_pair_count(self): ) assert len(unique_pairs) == 6 # (0,0),(0,1),(0,2),(1,1),(1,2),(2,2) + def test_non_contiguous_beam_idx(self): + """Non-contiguous beam_idx (e.g. [0, 2, 2]) must not raise. + + Previously ``nbeams = len(np.unique(beam_idx))`` gave 2, so + ``unique_beam_pairs`` only covered indices 0 and 1, causing a + ``ValueError`` when a baseline with beam index 2 was encountered. + The fix uses ``int(np.max(beam_idx)) + 1`` so that all indices up + to the maximum are included. + """ + # Antenna 0 has beam 0; antennas 1 and 2 share beam 2 (index 1 unused). + antnums = [0, 1, 2] + beam_idx = [0, 2, 2] + baselines = [(0, 1), (0, 2), (1, 2)] + # Should not raise a ValueError + unique_pairs, pair_to_idxs, pair_to_flipped = ( + CPUBeamEvaluator.prepare_beam_evaluation(antnums, baselines, beam_idx) + ) + # Beam indices go up to 2, so nbeams = 3 → 6 upper-triangle pairs + assert len(unique_pairs) == 6 + # Baseline (0,1): beams (0,2) → pair (0,2), not flipped + assert 0 in pair_to_idxs[(0, 2)] + assert pair_to_flipped[(0, 2)][pair_to_idxs[(0, 2)].index(0)] is False + # Baseline (1,2): both beam 2 → pair (2,2), not flipped + assert 2 in pair_to_idxs[(2, 2)] + # =========================================================================== # get_apparent_flux_polarized_beam_pair From 6c3cf6d075053908c8d88d1f5e23957b1e5166e0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 4 May 2026 15:34:34 +0000 Subject: [PATCH 28/32] Downgrade hot-loop logger.info calls to logger.debug in cpu_simulate.py Agent-Logs-Url: https://github.com/tyler-a-cox/fftvis/sessions/becb649d-88b1-4811-be20-a764b9d4db52 Co-authored-by: tyler-a-cox <17678594+tyler-a-cox@users.noreply.github.com> --- src/fftvis/cpu/cpu_simulate.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/fftvis/cpu/cpu_simulate.py b/src/fftvis/cpu/cpu_simulate.py index 7d110ea..906601c 100644 --- a/src/fftvis/cpu/cpu_simulate.py +++ b/src/fftvis/cpu/cpu_simulate.py @@ -558,7 +558,7 @@ def _evaluate_vis_chunk( bls_here = np.where(flipped, -bls[:, bls_idxs], bls[:, bls_idxs]) if polarized and polarized_sky_model: - logger.info( + logger.debug( "Using polarized sky model. " "Computing apparent flux for polarized sources." ) @@ -587,13 +587,13 @@ def _evaluate_vis_chunk( apparent_coherency, np.transpose(flux[:, freqidx], (1, 2, 0)) ) elif polarized: - logger.info( + logger.debug( "Using polarized beam. " "Computing apparent flux for unpolarized sources." ) if is_cross_pair: - logger.info("Processing cross pair") + logger.debug("Processing cross pair") # Zero the pre-allocated buffer in-place. _apparent_buf[:] = 0 apparent_coherency = _apparent_buf @@ -614,7 +614,7 @@ def _evaluate_vis_chunk( apparent_coherency, flux[:, freqidx] ) else: - logger.info( + logger.debug( "Using unpolarized beam. " "Computing apparent flux for unpolarized sources." ) From 552cc7fbabb2d8f0bd4630a7992b1ae28e883a74 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Mon, 4 May 2026 16:36:12 -0700 Subject: [PATCH 29/32] use min_chunks instead of nchunks --- src/fftvis/core/utils.py | 148 +++++++++++++++++++++++++++++++++++++++ src/fftvis/utils.py | 4 ++ src/fftvis/wrapper.py | 35 +++++++-- 3 files changed, 180 insertions(+), 7 deletions(-) diff --git a/src/fftvis/core/utils.py b/src/fftvis/core/utils.py index 26519a7..73ea2d5 100644 --- a/src/fftvis/core/utils.py +++ b/src/fftvis/core/utils.py @@ -1,6 +1,10 @@ +import logging + import numpy as np from scipy import linalg +logger = logging.getLogger(__name__) + IDEALIZED_BL_TOL = 1e-8 # bl_error_tol for redcal.get_reds when using antenna positions calculated from reds speed_of_light = 299792458.0 # m/s @@ -205,3 +209,147 @@ def inplace_rot_base(rot, b): out[1] = rot[1, 0] * b[0, n] + rot[1, 1] * b[1, n] + rot[1, 2] * b[2, n] out[2] = rot[2, 0] * b[0, n] + rot[2, 1] * b[1, n] + rot[2, 2] * b[2, n] b[:, n] = out + +def get_required_chunks( + freemem: int, + nax: int, + nfeed: int, + nant: int, + nsrc: int, + nbeam: int, + nbeampix: int, + precision: int, + source_buffer: float = 1.0, + nprocesses: int = 1, +) -> int: + """ + Compute number of chunks (over sources) required to fit data into available memory. + + Parameters + ---------- + freemem : int + The amount of free memory in bytes. + nax : int + The number of axes. + nfeed : int + The number of feeds. + nant : int + The number of antennas. + nsrc : int + The number of sources. + nbeam : int + The number of beams. + nbeampix : int + The number of beam pixels. + precision : int + The precision of the data. + + Returns + ------- + int + The number of chunks required. + + Examples + -------- + >>> get_required_chunks(1024, 2, 4, 8, 16, 32, 64, 32) + 1 + """ + rsize = 4 * precision + csize = 2 * rsize + + gpusize = {"a": freemem} + ch = 0 + while sum(gpusize.values()) >= freemem and ch < 100: + ch += 1 + nchunk = int(nsrc // ch * source_buffer) + + gpusize = { + "antpos": nant * 3 * rsize, + "flux": nsrc * rsize, + "beam": nbeampix * nfeed * nax * csize, + "crd_eq": 3 * nsrc * rsize, + "crd_top": 3 * nsrc * rsize * nprocesses, + "crd_chunk": 3 * nchunk * rsize * nprocesses, + "flux_chunk": nchunk * rsize * nprocesses, + "beam_interp": nbeam * nfeed * nax * nchunk * csize * nprocesses, + "vis": ch * nfeed * nant * nfeed * nant * csize, + } + logger.debug( + f"nchunks={ch}. Array Sizes (bytes)={gpusize}. Total={sum(gpusize.values())}" + ) + + logger.info( + f"Total free mem: {freemem / (1024**3):.2f} GB. Requires {ch} chunks " + f"(estimate {sum(gpusize.values()) / 1024**3:.2f} GB)" + ) + return ch + +def get_desired_chunks( + freemem: int, + min_chunks: int, + beam_list: int, + nax: int, + nfeed: int, + nant: int, + nsrc: int, + precision: int, + source_buffer: float = 1.0, +) -> tuple[int, int]: + """Get the desired number of chunks. + + Parameters + ---------- + freemem : int + The amount of free memory in bytes. + min_chunks : int + The minimum number of chunks desired. + beam_list : list + A list of beams. + nax : int + The number of axes. + nfeed : int + The number of feeds. + nant : int + The number of antennas. + nsrc : int + The number of sources. + precision : int + The precision of the data. + + Returns + ------- + nchunk + Number of chunks + nsrcs_per_chunk + Number of sources per chunk + + Examples + -------- + >>> get_desired_chunks(1024, 2, [beam1, beam2], 3, 4, 8, 16, 32) + (2, 8) + """ + nbeampix = sum( + beam.data_array.shape[-2] * beam.data_array.shape[-1] + for beam in beam_list + if hasattr(beam, "data_array") + ) + + nchunks = min( + max( + min_chunks, + get_required_chunks( + freemem, + nax, + nfeed, + nant, + nsrc, + len(beam_list), + nbeampix, + precision, + source_buffer, + ), + ), + nsrc, + ) + + return nchunks, int(np.ceil(nsrc / nchunks)) \ No newline at end of file diff --git a/src/fftvis/utils.py b/src/fftvis/utils.py index 01efee4..4942364 100644 --- a/src/fftvis/utils.py +++ b/src/fftvis/utils.py @@ -12,6 +12,8 @@ get_pos_reds, get_plane_to_xy_rotation_matrix, get_task_chunks, + get_desired_chunks, + get_required_chunks, ) @@ -36,4 +38,6 @@ def _use_gpu(): "get_plane_to_xy_rotation_matrix", "get_task_chunks", "inplace_rot", + "get_desired_chunks", + "get_required_chunks", ] diff --git a/src/fftvis/wrapper.py b/src/fftvis/wrapper.py index 4165be6..a8eb09c 100644 --- a/src/fftvis/wrapper.py +++ b/src/fftvis/wrapper.py @@ -1,4 +1,5 @@ from typing import Literal, Union +import psutil import numpy as np from astropy.coordinates import EarthLocation from astropy.time import Time @@ -10,7 +11,7 @@ from .cpu.beams import CPUBeamEvaluator from .core.simulate import SimulationEngine, default_accuracy_dict from .cpu.cpu_simulate import CPUSimulationEngine - +from .core.utils import get_desired_chunks def create_beam_evaluator( backend: Literal["cpu", "gpu"] = "cpu", **kwargs @@ -110,7 +111,8 @@ def simulate_vis( force_use_ray: bool = False, trace_mem: bool = False, backend: Literal["cpu", "gpu"] = "cpu", - nchunks: int = 1, + max_memory: int | float = np.inf, + min_chunks: int = 1, source_buffer=1.0, ) -> np.ndarray: """ @@ -207,11 +209,13 @@ def simulate_vis( Whether to show progress bar during simulation. backend : str Backend to use for simulation ("cpu" or "gpu"). - nchunks : int, default = 1 - Number of chunks to split the sources into for simulation. This can be used to reduce memory usage at the cost of increased computation time. - The optimal number of chunks will depend on the number of sources, the available memory, and the desired accuracy. If the number of sources is very large, - using more chunks can help to reduce memory usage. However, using too many chunks can increase computation time due to overhead from combining the results from each chunk. - The default value is 1, which means that all sources will be simulated in a single chunk. + max_memory : int, optional + The maximum memory (in bytes) to use for the visibility calculation. This is + not a hard-set limit, but rather a guideline for how much memory to use. If the + expected memory usage is more than this, the calculation will be broken up into + chunks. + min_chunks : int, optional + The minimum number of chunks to break the source axis into. source_buffer : float, optional The fraction of the total number of sources to use when allocating memory for the sources above horizon. @@ -273,6 +277,23 @@ def simulate_vis( beam_list.append(beam) + if polarized: + nax = nfeed = 2 + else: + nax = nfeed = 1 + + nchunks, _ = get_desired_chunks( + min(max_memory, psutil.virtual_memory().available), + min_chunks, + beam_list, + nax, + nfeed, + nant, + len(fluxes), + precision, + source_buffer=source_buffer, + ) + # Create the simulation engine for the desired backend engine = create_simulation_engine(backend=backend) From c136b98e36613fdb6ff047da3b3a58d845c226ff Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Tue, 5 May 2026 10:35:05 -0700 Subject: [PATCH 30/32] get correct beam indicies --- src/fftvis/cpu/beams.py | 9 +++++++-- src/fftvis/cpu/cpu_simulate.py | 4 +--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/fftvis/cpu/beams.py b/src/fftvis/cpu/beams.py index 98773e2..d9d86f0 100644 --- a/src/fftvis/cpu/beams.py +++ b/src/fftvis/cpu/beams.py @@ -98,10 +98,15 @@ def prepare_beam_evaluation(antnums, baselines, beam_idx): # Get number of beams: use max index + 1 so non-contiguous indices # (e.g. [0, 2, 2]) still produce the correct set of beam pairs. - nbeams = int(np.max(beam_idx)) + 1 + unique_beam_idx = np.unique(beam_idx) + nbeams = len(unique_beam_idx) # Get the unique beam pairs that we need to evaluate the apparent flux for - unique_beam_pairs = [(bi, bj) for bi in range(nbeams) for bj in range(bi, nbeams)] + unique_beam_pairs = [ + (unique_beam_idx[bi], unique_beam_idx[bj]) + for bi in range(nbeams) + for bj in range(bi, nbeams) + ] # Determine which baselines correspond to which beam pairs, and whether they need to be flipped antnum_to_beam_idx = {ai: bidx for ai, bidx in zip(antnums, beam_idx)} diff --git a/src/fftvis/cpu/cpu_simulate.py b/src/fftvis/cpu/cpu_simulate.py index 906601c..2be79b3 100644 --- a/src/fftvis/cpu/cpu_simulate.py +++ b/src/fftvis/cpu/cpu_simulate.py @@ -271,9 +271,7 @@ def simulate( required_shm += val.nbytes for beam in beam_list: - if isinstance(beam, UVBeam): - required_shm += beam.data_array.nbytes - elif isinstance(beam, BeamInterface) and getattr(beam, "_isuvbeam", False): + if isinstance(beam, BeamInterface) and getattr(beam, "_isuvbeam", False): required_shm += beam.beam.data_array.nbytes # Add visibility memory From f5844db68dc91c03ce3b680eec5acf5528308861 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Tue, 5 May 2026 11:10:07 -0700 Subject: [PATCH 31/32] update tests to reflect changes to prepare_beam_eval function --- src/fftvis/cpu/beams.py | 3 +-- tests/test_cpu_beams.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/fftvis/cpu/beams.py b/src/fftvis/cpu/beams.py index d9d86f0..57fcf1e 100644 --- a/src/fftvis/cpu/beams.py +++ b/src/fftvis/cpu/beams.py @@ -96,8 +96,7 @@ def prepare_beam_evaluation(antnums, baselines, beam_idx): if beam_idx is None: return [(0, 0)], {(0, 0): np.arange(len(baselines))}, {(0, 0): [False] * len(baselines)} - # Get number of beams: use max index + 1 so non-contiguous indices - # (e.g. [0, 2, 2]) still produce the correct set of beam pairs. + # Get unique beam indices and the number of beams we need to evaluate unique_beam_idx = np.unique(beam_idx) nbeams = len(unique_beam_idx) diff --git a/tests/test_cpu_beams.py b/tests/test_cpu_beams.py index 4085746..c4c3b1b 100644 --- a/tests/test_cpu_beams.py +++ b/tests/test_cpu_beams.py @@ -846,7 +846,7 @@ def test_non_contiguous_beam_idx(self): CPUBeamEvaluator.prepare_beam_evaluation(antnums, baselines, beam_idx) ) # Beam indices go up to 2, so nbeams = 3 → 6 upper-triangle pairs - assert len(unique_pairs) == 6 + assert len(unique_pairs) == 3 # Baseline (0,1): beams (0,2) → pair (0,2), not flipped assert 0 in pair_to_idxs[(0, 2)] assert pair_to_flipped[(0, 2)][pair_to_idxs[(0, 2)].index(0)] is False From f7734608a014db062f0a7f8d94020a98b7772921 Mon Sep 17 00:00:00 2001 From: Tyler Cox Date: Tue, 5 May 2026 11:10:15 -0700 Subject: [PATCH 32/32] update tests --- tests/test_cpu_beams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_cpu_beams.py b/tests/test_cpu_beams.py index c4c3b1b..d6e1a73 100644 --- a/tests/test_cpu_beams.py +++ b/tests/test_cpu_beams.py @@ -845,7 +845,7 @@ def test_non_contiguous_beam_idx(self): unique_pairs, pair_to_idxs, pair_to_flipped = ( CPUBeamEvaluator.prepare_beam_evaluation(antnums, baselines, beam_idx) ) - # Beam indices go up to 2, so nbeams = 3 → 6 upper-triangle pairs + # Beam indices go up to 2 assert len(unique_pairs) == 3 # Baseline (0,1): beams (0,2) → pair (0,2), not flipped assert 0 in pair_to_idxs[(0, 2)]