diff --git a/heracles/__init__.py b/heracles/__init__.py index f1502ad..899c7cb 100644 --- a/heracles/__init__.py +++ b/heracles/__init__.py @@ -76,7 +76,7 @@ "cl2corr", "corr2cl", # unmixing - "natural_unmixing", + "naturalspice", ] try: @@ -157,5 +157,5 @@ ) from .unmixing import ( - natural_unmixing, + naturalspice, ) diff --git a/heracles/dices/jackknife.py b/heracles/dices/jackknife.py index 57c22f3..7134e78 100644 --- a/heracles/dices/jackknife.py +++ b/heracles/dices/jackknife.py @@ -25,7 +25,7 @@ from ..result import Result, get_result_array from ..mapping import transform from ..twopoint import angular_power_spectra -from ..unmixing import _natural_unmixing, logistic +from ..unmixing import real_naturalspice, logistic from ..transforms import cl2corr try: @@ -62,7 +62,7 @@ def jackknife_cls(data_maps, vis_maps, jk_maps, fields, mask_correction="Fast", # Mask correction if mask_correction == "Full": alphas = get_mask_correlation_ratio(_cls_mm, mls0) - _cls = _natural_unmixing(_cls, alphas, fields) + _cls = real_naturalspice(_cls, alphas, fields) elif mask_correction == "Fast": _cls = correct_footprint_reduction(_cls, jk_maps, fields, *regions) else: @@ -260,8 +260,7 @@ def get_mask_correlation_ratio(Mljk, Mls0): wmljk = cl2corr(mljk) wmljk = wmljk.T[0] # Compute alpha - alpha = wmljk / wmls0 - alpha *= logistic(np.log10(abs(wmljk))) + alpha = wmls0 / (wmljk * logistic(np.log10(abs(wmljk)))) alphas[key] = replace(Mls0[key], array=alpha) return alphas diff --git a/heracles/twopoint.py b/heracles/twopoint.py index da3d1d5..29b1dc5 100644 --- a/heracles/twopoint.py +++ b/heracles/twopoint.py @@ -402,6 +402,37 @@ def mixing_matrices( return out +def svd_pinv(A, rtol=1e-2): + """` + Compute the Moore–Penrose pseudoinverse of a matrix A + using its Singular Value Decomposition (SVD). + + Parameters + ---------- + A : (m, n) array_like + Input matrix to be pseudoinverted. + rtol : float, optional + Cutoff for small singular values. + Singular values smaller than rtol * max(s) are set to zero. + + Returns + ------- + A_pinv : (n, m) ndarray + The pseudoinverse of A. + """ + # Step 1: Compute SVD + U, s, Vt = np.linalg.svd(A, full_matrices=False) + + # Step 2: Invert singular values with tolerance + tol = rtol * np.max(s) + s_pinv = np.array([1 / si if si > tol else 0 for si in s]) + + # Step 3: Reconstruct pseudoinverse + # A^+ = V * Σ^+ * U^T + A_pinv = (Vt.T * s_pinv) @ U.T + return A_pinv, s_pinv + + def invert_mixing_matrix( M, rtol: float = 1e-5, @@ -417,11 +448,13 @@ def invert_mixing_matrix( Returns: inv_M: inverted Cls in the same mapping form + inv_s: singular values of the mixing matrices """ if progress is None: progress = NoProgress() inv_M = {} + inv_s = {} current, total = 0, len(M) for key, value in M.items(): @@ -438,8 +471,8 @@ def invert_mixing_matrix( # makes the mixing matrix block-diagonal M_p = _M[0] + _M[1] M_m = _M[0] - _M[1] - inv_M_p = np.linalg.pinv(M_p, rcond=rtol) - inv_M_m = np.linalg.pinv(M_m, rcond=rtol) + inv_M_p, inv_s_p = svd_pinv(M_p, rtol=rtol) + inv_M_m, inv_s_m = svd_pinv(M_m, rtol=rtol) _inv_m = np.vstack( ( np.hstack(((inv_M_p + inv_M_m) / 2, (inv_M_p - inv_M_m) / 2)), @@ -448,13 +481,15 @@ def invert_mixing_matrix( ) _inv_M_EEEE = _inv_m[:_m, :_n] _inv_M_EEBB = _inv_m[_m:, :_n] - _inv_M_EBEB = np.linalg.pinv(_M[2], rcond=rtol) + _inv_M_EBEB, inv_s_eb = svd_pinv(_M[2], rtol=rtol) _inv_M = np.array([_inv_M_EEEE, _inv_M_EEBB, _inv_M_EBEB]) + _inv_s = np.array([inv_s_p, inv_s_m, inv_s_eb]) else: - _inv_M = np.linalg.pinv(_M, rcond=rtol) + _inv_M, _inv_s = svd_pinv(_M, rtol=rtol) inv_M[key] = replace(M[key], array=_inv_M) - return inv_M + inv_s[key] = replace(M[key], array=_inv_s) + return inv_M, inv_s def apply_mixing_matrix(d, M, lmax=None): diff --git a/heracles/unmixing.py b/heracles/unmixing.py index 04cb88f..ccde477 100644 --- a/heracles/unmixing.py +++ b/heracles/unmixing.py @@ -17,6 +17,7 @@ # You should have received a copy of the GNU Lesser General Public # License along with Heracles. If not, see . import numpy as np +from heracles.twopoint import mixing_matrices, invert_mixing_matrix, apply_mixing_matrix from .result import truncated from .transforms import cl2corr, corr2cl from .utils import get_cl @@ -28,31 +29,58 @@ from dataclasses import replace -def natural_unmixing(d, m, fields, x0=-2, k=50, patch_hole=True, lmax=None): +def naturalspice(d, m, fields, mode="Harmonic", rtol=0.0, lmax=None): """ Natural unmixing of the data Cl. Args: d: Data Cl m: mask Cl fields: list of fields - patch_hole: If True, apply the patch hole correction + rtol: Relative tolerance for logistic correction Returns: corr_d: Corrected Cl """ - wm = {} - m_keys = list(m.keys()) - for m_key in m_keys: - _m = m[m_key].array - _wm = cl2corr(_m).T[0] - if patch_hole: - _wm *= logistic(np.log10(abs(_wm)), x0=x0, k=k) - wm[m_key] = replace(m[m_key], array=_wm) - return _natural_unmixing(d, wm, fields, lmax=lmax) + # Step 1: Transform. + # In harmonic space this means computing the singular values + # of the mixing matrix. + # In real space this means computing the correlation function. + if mode == "Harmonic": + M = mixing_matrices(fields, m) + + elif mode == "Real": + wm = {} + m_keys = list(m.keys()) + for m_key in m_keys: + _m = m[m_key].array + _wm = cl2corr(_m).T[0] + wm[m_key] = replace(m[m_key], array=_wm) + + # Step 2: Regularize + if mode == "Harmonic": + iM, inv_s = invert_mixing_matrix(M, rtol=rtol) + elif mode == "Real": + inv_s = {} + m_keys = list(m.keys()) + for m_key in m_keys: + _m = m[m_key].array + # Apply logistic correction + tol = rtol * np.max(np.abs(wm)) + _wm *= logistic(np.log10(abs(_wm)), tol=np.log10(tol)) + inv_s[m_key] = replace(m[m_key], array=1 / _wm) + # Step 3: Apply correction + if mode == "Harmonic": + corr_d = apply_mixing_matrix(d, iM) + elif mode == "Real": + corr_d = real_naturalspice(d, inv_s, fields, lmax=lmax) + return corr_d, inv_s -def _natural_unmixing(d, wm, fields, lmax=None): + +def real_naturalspice(d, inv_wm, fields, lmax=None): """ - Natural unmixing of the data Cl. + Computes the correlation function of the data Cl, + divides multiplies it by the provided inverse mask correlation function, + and transforms back to Cl. Args: d: Data Cl wm: mask correlation function @@ -70,12 +98,12 @@ def _natural_unmixing(d, wm, fields, lmax=None): for key in d.keys(): a, b, i, j = key m_key = (masks[a], masks[b], i, j) - _wm = get_cl(m_key, wm) + _inv_wm = get_cl(m_key, inv_wm) _d = d[key] s1, s2 = _d.spin if lmax is None: *_, lmax = _d.shape - lmax_mask = len(_wm.array) + lmax_mask = len(_inv_wm.array) # Grab metadata dtype = _d.array.dtype # pad cls @@ -102,8 +130,8 @@ def _natural_unmixing(d, wm, fields, lmax=None): ) # Correct by alpha wd = cl2corr(__d.T).T + 1j * cl2corr(__id.T).T - corr_wd = (wd / _wm).real - icorr_wd = (wd / _wm).imag + corr_wd = (wd * _inv_wm).real + icorr_wd = (wd * _inv_wm).imag # Transform back to Cl __corr_d = corr2cl(corr_wd.T).T __icorr_d = corr2cl(icorr_wd.T).T @@ -118,7 +146,7 @@ def _natural_unmixing(d, wm, fields, lmax=None): _corr_d = [] for cl in _d: wd = cl2corr(cl).T - corr_wd = wd / _wm + corr_wd = wd * _inv_wm # Transform back to Cl __corr_d = corr2cl(corr_wd.T).T _corr_d.append(__corr_d[0]) @@ -132,5 +160,5 @@ def _natural_unmixing(d, wm, fields, lmax=None): return corr_d -def logistic(x, x0=-5, k=50): - return 1.0 + np.exp(-k * (x - x0)) +def logistic(x, tol=-5, k=50): + return 1.0 + np.exp(-k * (x - tol)) diff --git a/tests/test_dices.py b/tests/test_dices.py index caabe23..1269bdf 100644 --- a/tests/test_dices.py +++ b/tests/test_dices.py @@ -93,7 +93,7 @@ def test_get_delete2_fsky(jk_maps, njk): def test_full_mask_correction(cls0, mls0, fields): alphas = dices.get_mask_correlation_ratio(mls0, mls0) - _cls = heracles.unmixing._natural_unmixing(cls0, alphas, fields) + _cls = heracles.unmixing.real_naturalspice(cls0, alphas, fields) for key in list(cls0.keys()): cl = cls0[key].array _cl = _cls[key].array diff --git a/tests/test_twopoint.py b/tests/test_twopoint.py index 4863a7d..cf3cdab 100644 --- a/tests/test_twopoint.py +++ b/tests/test_twopoint.py @@ -368,18 +368,20 @@ def test_inverting_mixing_matrices(): ("WHT", "WHT", 0, 0): Result(cl, spin=(0, 0), axis=(0,)), } mms = mixing_matrices(fields, cls2, l1max=10, l2max=20) - inv_mms = invert_mixing_matrix(mms) + inv_mms, inv_s = invert_mixing_matrix(mms) # test for correct shape for key in mms.keys(): *_, n, m = mms[key].shape *_, _n, _m = inv_mms[key].shape + *_, __n = inv_s[key].shape s1, s2 = mms[key].spin _s1, _s2 = inv_mms[key].spin assert s1 == _s1 assert s2 == _s2 assert n == _m assert m == _n + assert n == __n # test that the inverse is correct mms = mixing_matrices(fields, cls2) @@ -387,15 +389,19 @@ def test_inverting_mixing_matrices(): _m = np.ones_like(mms[key].array) mms[key] = Result(_m, spin=mms[key].spin, axis=mms[key].axis, ell=mms[key].ell) - inv_mms = invert_mixing_matrix(mms) + inv_mms, inv_s = invert_mixing_matrix(mms) assert inv_mms.keys() == mms.keys() for key in mms: inv_mm = inv_mms[key].array if inv_mm.ndim == 3: _inv_m = np.sum(inv_mm) + np.testing.assert_allclose( + inv_s[key].array.T[0], [1 / (2 * (lmax + 1)), 0.0, 1 / (lmax + 1)] + ) np.testing.assert_allclose(_inv_m, 1.5) else: _inv_m = np.sum(inv_mm) + np.isclose(inv_s[key][0], 1 / (lmax + 1)) np.testing.assert_allclose(_inv_m, 1.0) # test application of mixing matrices