Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
Unreleased
----------

- Added support in `ra_dec_crossmatch` for a cutout size of zero, enabling single-point matching to FFIs that contain
the specified coordinates. [#166]


1.1.0 (2025-09-15)
------------------
Expand Down
8 changes: 5 additions & 3 deletions astrocut/cutout.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(self, input_files: List[Union[str, Path, S3Path]], coordinates: Uni
log.debug('Coordinates: %s', self._coordinates)

# Turning the cutout size into an array of two values
self._cutout_size = self._parse_size_input(cutout_size)
self._cutout_size = self.parse_size_input(cutout_size)
log.debug('Cutout size: %s', self._cutout_size)

# Assigning other attributes
Expand Down Expand Up @@ -150,7 +150,7 @@ def cutout(self):
raise NotImplementedError('Subclasses must implement this method.')

@staticmethod
def _parse_size_input(cutout_size):
def parse_size_input(cutout_size, *, allow_zero: bool = False) -> np.ndarray:
"""
Makes the given cutout size into a length 2 array.

Expand All @@ -162,6 +162,8 @@ def _parse_size_input(cutout_size):
If ``cutout_size`` has two elements, they should be in ``(ny, nx)`` order. Scalar numbers
in ``cutout_size`` are assumed to be in units of pixels. `~astropy.units.Quantity` objects
must be in pixel or angular units.
allow_zero : bool, optional
If True, allows cutout dimensions to be zero. Default is False.

Returns
-------
Expand All @@ -186,7 +188,7 @@ def _parse_size_input(cutout_size):

for dim in cutout_size:
# Raise error if either dimension is not a positive number
if dim <= 0:
if dim < 0 or (not allow_zero and dim == 0):
raise InvalidInputError('Cutout size dimensions must be greater than zero. '
f'Provided size: ({cutout_size[0]}, {cutout_size[1]})')

Expand Down
132 changes: 108 additions & 24 deletions astrocut/footprint_cutout.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from astropy.table import Table, Column
from cachetools import TTLCache, cached
from spherical_geometry.polygon import SphericalPolygon
from spherical_geometry.vector import radec_to_vector

from .cutout import Cutout

Expand Down Expand Up @@ -193,7 +194,91 @@ def get_ffis(s3_footprint_cache: str) -> Table:
return ffis


def ra_dec_crossmatch(all_ffis: Table, coordinates: SkyCoord, cutout_size, arcsec_per_px: int = 21) -> Table:
def _crossmatch_point(ra: SkyCoord, dec: SkyCoord, all_ffis: Table) -> List[int]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return type annotation -- should it be np.ndarray?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes it should. Fixed!

"""
Returns the indices of the Full Frame Images (FFIs) that contain the given RA and
Dec coordinates by checking which FFI polygons contain the point.

Parameters
----------
ra : SkyCoord
Right Ascension in degrees.
dec : SkyCoord
Declination in degrees.
all_ffis : `~astropy.table.Table`
Table of FFIs to crossmatch with the point.

Returns
-------
ffi_inds : `~numpy.ndarray`
Indices of FFIs that contain the given RA and Dec coordinates.
"""
ffi_inds = []
vector_coord = radec_to_vector(ra, dec)
for sector in np.unique(all_ffis['sequence_number']):
# Returns a 2-long array where the first element is indexes and the 2nd element is empty
sector_ffi_inds = np.where(all_ffis['sequence_number'] == sector)[0]

for ind in sector_ffi_inds:
if all_ffis[ind]["polygon"].contains_point(vector_coord):
ffi_inds.append(ind)
break # the ra/dec will only be on one ccd per sector
return np.array(ffi_inds, dtype=int)


def _crossmatch_polygon(ra: SkyCoord, dec: SkyCoord, all_ffis: Table, px_size: np.ndarray,
arcsec_per_px: int = 21) -> np.ndarray:
"""
Returns the indices of the Full Frame Images (FFIs) that intersect with the given cutout footprint
by checking which FFI polygons intersect with the cutout polygon.

Parameters
----------
ra : SkyCoord
Right Ascension in degrees.
dec : SkyCoord
Declination in degrees.
all_ffis : `~astropy.table.Table`
Table of FFIs to crossmatch with the point.
px_size : array-like
Size of the cutout in pixels, in the form [ny, nx].
arcsec_per_px : int, optional
Default 21. The number of arcseconds per pixel in an image. Used to determine
the footprint of the cutout. Default is the number of arcseconds per pixel in
a TESS image.

Returns
-------
ffi_inds : `~numpy.ndarray`
Boolean array indicating whether each FFI intersects with the cutout.
"""
# Create polygon for intersection
# Convert dimensions from pixels to arcseconds and divide by 2 to get offset from center
# If one of the dimensions is 0, use a very small value to avoid issues with SphericalPolygon
min_offset = 0.1 # pixels
ra_offset = ((max(px_size[0], min_offset) * arcsec_per_px) / 2) * u.arcsec
dec_offset = ((max(px_size[1], min_offset) * arcsec_per_px) / 2) * u.arcsec

# Calculate RA and Dec boundaries
ra_bounds = [ra - ra_offset, ra + ra_offset]
dec_bounds = [dec - dec_offset, dec + dec_offset]

# Get RA and Dec for four corners of rectangle
ras = [ra_bounds[0].value, ra_bounds[1].value, ra_bounds[1].value, ra_bounds[0].value]
decs = [dec_bounds[0].value, dec_bounds[0].value, dec_bounds[1].value, dec_bounds[1].value]

# Create SphericalPolygon for comparison
cutout_fp = SphericalPolygon.from_radec(ras, decs, center=(ra, dec))

# Find indices of FFIs that intersect with the cutout
ffi_inds = np.vectorize(lambda ffi: ffi.intersects_poly(cutout_fp))(all_ffis['polygon'])
ffi_inds = FootprintCutout._ffi_intersect(all_ffis, cutout_fp)

return ffi_inds


def ra_dec_crossmatch(all_ffis: Table, coordinates: Union[SkyCoord, str], cutout_size,
arcsec_per_px: int = 21) -> Table:
"""
Returns the Full Frame Images (FFIs) whose footprints overlap with a cutout of a given position and size.

Expand All @@ -208,11 +293,15 @@ def ra_dec_crossmatch(all_ffis: Table, coordinates: SkyCoord, cutout_size, arcse
cutout_size : int, array-like, `~astropy.units.Quantity`
The size of the cutout array. If ``cutout_size``
is a scalar number or a scalar `~astropy.units.Quantity`,
then a square cutout of ``cutout_size`` will be created. If
then a square cutout of ``cutout_size`` will be used. If
``cutout_size`` has two elements, they should be in ``(ny, nx)``
order. Scalar numbers in ``cutout_size`` are assumed to be in
units of pixels. `~astropy.units.Quantity` objects must be in pixel or
angular units.

If a cutout size of zero is provided, the function will return FFIs that contain
the exact RA and Dec position. If a non-zero cutout size is provided, the function
will return FFIs whose footprints overlap with the cutout area.
arcsec_per_px : int, optional
Default 21. The number of arcseconds per pixel in an image. Used to determine
the footprint of the cutout. Default is the number of arcseconds per pixel in
Expand All @@ -228,27 +317,22 @@ def ra_dec_crossmatch(all_ffis: Table, coordinates: SkyCoord, cutout_size, arcse
coordinates = SkyCoord(coordinates, unit='deg')
ra, dec = coordinates.ra, coordinates.dec

# Parse cutout size
cutout_size = Cutout._parse_size_input(cutout_size)

# Create polygon for intersection
# Convert dimensions from pixels to arcseconds and divide by 2 to get offset from center
ra_offset = ((cutout_size[0] * arcsec_per_px) / 2) * u.arcsec
dec_offset = ((cutout_size[1] * arcsec_per_px) / 2) * u.arcsec

# Calculate RA and Dec boundaries
ra_bounds = [ra - ra_offset, ra + ra_offset]
dec_bounds = [dec - dec_offset, dec + dec_offset]

# Get RA and Dec for four corners of rectangle
ras = [ra_bounds[0].value, ra_bounds[1].value, ra_bounds[1].value, ra_bounds[0].value]
decs = [dec_bounds[0].value, dec_bounds[0].value, dec_bounds[1].value, dec_bounds[1].value]

# Create SphericalPolygon for comparison
cutout_fp = SphericalPolygon.from_radec(ras, decs, center=(ra, dec))

# Find indices of FFIs that intersect with the cutout
ffi_inds = np.vectorize(lambda ffi: ffi.intersects_poly(cutout_fp))(all_ffis['polygon'])
ffi_inds = FootprintCutout._ffi_intersect(all_ffis, cutout_fp)
px_size = np.zeros(2, dtype=object)
for axis, size in enumerate(Cutout.parse_size_input(cutout_size, allow_zero=True)):
if isinstance(size, u.Quantity): # If Quantity, convert to pixels
if size.unit == u.pixel:
px_size[axis] = size.value
else: # Angular size
# Convert angular size to pixels
px_size[axis] = (size.to_value(u.arcsec)) / arcsec_per_px
else: # Assume pixels
px_size[axis] = size

if np.all(px_size == 0):
# Cross match with point
ffi_inds = _crossmatch_point(ra, dec, all_ffis)
else:
# Cross match with polygon
ffi_inds = _crossmatch_polygon(ra, dec, all_ffis, px_size, arcsec_per_px)

return all_ffis[ffi_inds]
48 changes: 48 additions & 0 deletions astrocut/tests/test_tess_footprint_cutout.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from pathlib import Path
import pytest
import re
from unittest.mock import MagicMock

import astropy.units as u
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.table import Table
from spherical_geometry.polygon import SphericalPolygon

from .. import footprint_cutout
from ..cube_cutout import CubeCutout
from ..exceptions import InvalidInputError, InvalidQueryError
from ..footprint_cutout import get_ffis, ra_dec_crossmatch
Expand All @@ -26,6 +30,27 @@ def coordinates():
return SkyCoord('350 -80', unit='deg')


@pytest.fixture
def all_ffis(scope='module'):
"""Fixture to return the table of all FFIs"""
return get_ffis('s3://stpubdata/tess/public/footprints/tess_ffi_footprint_cache.json')


@pytest.fixture
def crossmatch_spies(monkeypatch):
# wrap real functions with MagicMocks that call the real implementation
real_point = footprint_cutout._crossmatch_point
real_poly = footprint_cutout._crossmatch_polygon

spy_point = MagicMock(side_effect=real_point)
spy_poly = MagicMock(side_effect=real_poly)

monkeypatch.setattr(footprint_cutout, "_crossmatch_point", spy_point)
monkeypatch.setattr(footprint_cutout, "_crossmatch_polygon", spy_poly)

yield spy_point, spy_poly


def check_output_tpf(tpf, sequences=[], cutout_size=5):
"""Helper function to check the validity of output cutout files"""
tpf_table = tpf[1].data
Expand Down Expand Up @@ -77,6 +102,29 @@ def test_ffi_intersect(lon, lat, center, expected):
assert intersection.value[0] == expected


@pytest.mark.parametrize("cutout_size", [0, 0 * u.arcmin, [0, 0], (0, 0), (0*u.pix, 0*u.pix),
[0*u.arcsec, 0*u.arcsec], np.array([0, 0])])
def test_ra_dec_crossmatch_point(coordinates, all_ffis, cutout_size, crossmatch_spies):
spy_point, spy_poly = crossmatch_spies

# Cutout size of 0 should do a point match
results = ra_dec_crossmatch(all_ffis, coordinates, cutout_size)
assert isinstance(results, Table)
spy_point.assert_called_once()
spy_poly.assert_not_called()


@pytest.mark.parametrize("cutout_size", [5, 5 * u.arcmin, [5, 5], [5*u.arcsec, 5*u.arcsec], (5, 0), (5, 0)])
def test_ra_dec_crossmatch_poly(all_ffis, cutout_size, crossmatch_spies):
spy_point, spy_poly = crossmatch_spies

# Cutout size of 0 should do a point match
results = ra_dec_crossmatch(all_ffis, '350 -80', cutout_size)
assert isinstance(results, Table)
spy_poly.assert_called_once()
spy_point.assert_not_called()


def test_tess_footprint_cutout(cutout_size, caplog):
"""Test that a single data cube is created for a given sequence"""
cutout = TessFootprintCutout('130 30', cutout_size, sequence=44, verbose=True)
Expand Down
Loading