-
-
Notifications
You must be signed in to change notification settings - Fork 437
ASB-30568: Adding read_product Function to load files from s3 to memory #3561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
eb5a1b2
b04daad
e4a1991
722dde6
a19308a
ca6e1c2
7229c04
5f8e973
6990708
6f138b3
521d468
c3681a5
afb86e2
3508915
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -14,6 +14,7 @@ | |||||||
|
|
||||||||
| import numpy as np | ||||||||
| import astropy.units as u | ||||||||
| from astropy.io import fits | ||||||||
| import astropy.coordinates as coord | ||||||||
| from requests import HTTPError | ||||||||
| from astropy.table import Table, Row, vstack | ||||||||
|
|
@@ -44,6 +45,12 @@ | |||||||
| '`~astroquery.mast.ObservationsClass.enable_cloud_dataset` method.' | ||||||||
| ) | ||||||||
|
|
||||||||
| try: | ||||||||
| import asdf | ||||||||
| import s3fs | ||||||||
| except ImportError: | ||||||||
| pass | ||||||||
|
|
||||||||
|
|
||||||||
| @async_to_sync | ||||||||
| class ObservationsClass(MastQueryWithLogin): | ||||||||
|
|
@@ -1203,6 +1210,51 @@ def get_unique_product_list(self, observations, *, batch_size=500): | |||||||
| log.info("To return all products, use `Observations.get_product_list`") | ||||||||
| return unique_products | ||||||||
|
|
||||||||
| # TODO: Need to inlcude way to parse if it is a MAST on prem URL and handle the streaming of that | ||||||||
| def read_product(self, product_path, read_as="auto", ignore_unrecognized=True): | ||||||||
| """ | ||||||||
| Read a product from Open S3 bucket to memory. Currently supports FITS and ASDF product types only. | ||||||||
|
|
||||||||
| Parameters | ||||||||
| ---------- | ||||||||
| product_path: str | ||||||||
| URI to the product in open bucket. | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
| read_as: str, optional | ||||||||
| How to read the file. Currently only .fits and .asdf is supported by "auto". Defaults to "auto". | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
| ignore_unrecognized: bool | ||||||||
| Tells asdf.open() to include or ignore warnings from unrecognized asdf tags. Defaults to True | ||||||||
|
|
||||||||
| Returns | ||||||||
| ------- | ||||||||
| object | ||||||||
| FITS or ASDF object. | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
| """ | ||||||||
| path_lower = product_path.lower() | ||||||||
|
|
||||||||
| if read_as == "auto": | ||||||||
| # Read logic for fits data products | ||||||||
| if path_lower.endswith((".fits", ".fits.gz")): | ||||||||
| try: | ||||||||
| log.info(f"Loaded: {product_path}") | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should log this until after |
||||||||
| return fits.open(product_path, fsspec_kwargs={"anon": True}) | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm realizing that for this line, we will need fsspec[s3], which is not a listed dependency of astroquery. We should probably add this to |
||||||||
| except Exception as e: | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we be any more specific with this exception? If not, that's alright, but we should include a comment on why we catch exceptions so broadly. Same with line 1251. |
||||||||
| log.exception(f"Failed to open FITS File: {product_path} {e}") | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of logging an exception here, we should actually raise one. Same for line 1252 and 1255. |
||||||||
|
|
||||||||
| # Read logic for ASDF | ||||||||
| elif path_lower.endswith(".asdf"): | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These should be some logic in this branch for checking that the user has the proper packages installed, and it should error if they do not. |
||||||||
| try: | ||||||||
| fs = s3fs.S3FileSystem(anon=True) | ||||||||
| with fs.open(product_path, 'rb') as s3_file: | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually was experimenting a bit on Friday with a version of this function for the
Suggested change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also! I forgot in my initial review, but ASDF file objects are a little finicky with when and how you close the files. Data arrays that are unloaded when you close the file handler stay unloaded and will throw an error if you try to access them later. For that reason, we can't use context managers here and need to leave both file handlers open. I would also add a line in the remote tests to make sure that you can access unloaded arrays in ASDF objects. |
||||||||
| af = asdf.open(s3_file, ignore_unrecognized_tag=ignore_unrecognized) | ||||||||
| log.info(f"Loaded: {product_path}") | ||||||||
| return af | ||||||||
| except Exception as e: | ||||||||
| log.exception(f"Failed to open ASD File: {product_path} {e}") | ||||||||
|
|
||||||||
| else: | ||||||||
| log.error("Unsupported extension type") | ||||||||
| return | ||||||||
|
|
||||||||
|
|
||||||||
| @async_to_sync | ||||||||
| class MastClass(MastQueryWithLogin): | ||||||||
|
|
||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1355,6 +1355,130 @@ def test_observations_disable_cloud_dataset(patch_boto3): | |
| assert Observations._cloud_enabled_explicitly is False | ||
|
|
||
|
|
||
| @pytest.fixture | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The convention we have for fixtures in this file is to put them near the top, before any of the tests. For this one and the s3_asdf_path specifically, though, I'm not sure if we need a fixture, since they're only being used once. I'd rather see the value of |
||
| def s3_fits_path(): | ||
| return "s3://stpubdata/hst/public/u9o4/u9o40504m/u9o40504m_c3m.fits" | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def mock_fits_open(mocker): | ||
| return mocker.patch("astropy.io.fits.open", return_value=MagicMock(name="HDUList")) | ||
|
|
||
|
|
||
| def test_read_product_fits(s3_fits_path, mock_fits_open, mocker): | ||
| mocker.patch("astropy.__version__", "5.0.0") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this line? |
||
|
|
||
| result = Observations.read_product(s3_fits_path) | ||
|
|
||
| mock_fits_open.assert_called_once_with( | ||
| s3_fits_path, fsspec_kwargs={"anon": True} | ||
| ) | ||
| assert result is mock_fits_open.return_value | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def s3_asdf_path(): | ||
| return "s3://stpubdata/hst/public/test/test.asdf" | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def mock_s3fs(mocker): | ||
| s3_file = MagicMock(name="S3File") | ||
| s3_file.__enter__.return_value = s3_file | ||
| s3_file.__exit__.return_value = None | ||
|
|
||
| fs = MagicMock(name="S3FileSystem") | ||
| fs.open.return_value = s3_file | ||
|
|
||
| mocker.patch( | ||
| "astroquery.mast.observations.s3fs.S3FileSystem", | ||
| return_value=fs, | ||
| ) | ||
|
|
||
| return fs | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def mock_asdf_open(mocker): | ||
| return mocker.patch( | ||
| "astroquery.mast.observations.asdf.open", | ||
| return_value=MagicMock(name="AsdfFile"), | ||
| ) | ||
|
|
||
|
|
||
| def test_read_product_asdf(s3_asdf_path, mock_s3fs, mock_asdf_open): | ||
| pytest.importorskip("asdf") | ||
| pytest.importorskip("s3fs") | ||
|
|
||
| result = Observations.read_product(s3_asdf_path) | ||
|
|
||
| mock_s3fs.open.assert_called_once_with( | ||
| s3_asdf_path, | ||
| "rb", | ||
| ) | ||
|
|
||
| mock_asdf_open.assert_called_once_with( | ||
| mock_s3fs.open.return_value.__enter__.return_value, | ||
| ignore_unrecognized_tag=True, | ||
| ) | ||
|
|
||
| assert result is mock_asdf_open.return_value | ||
|
|
||
|
|
||
| def test_read_product_fits_open_failure(mocker, s3_fits_path): | ||
| # Simulate failure when opening the FITS file | ||
| mock_fits_open = mocker.patch( | ||
| "astropy.io.fits.open", | ||
| side_effect=OSError("Cannot read FITS file") | ||
| ) | ||
|
|
||
| result = Observations.read_product(s3_fits_path) | ||
|
|
||
| # fits.open should have been called once with correct arguments | ||
| mock_fits_open.assert_called_once_with( | ||
| s3_fits_path, fsspec_kwargs={"anon": True} | ||
| ) | ||
|
|
||
| # Function should return None after failure | ||
| assert result is None | ||
|
|
||
|
|
||
| def test_read_product_asdf_open_failure(mocker): | ||
| s3_asdf_path = "s3://stpubdata/hst/public/u9o4/u9o40504m/u9o40504m.asdf" | ||
|
|
||
| # Mock the S3 filesystem and its open() method | ||
| mock_fs = mocker.patch("s3fs.S3FileSystem") | ||
| mock_fs_instance = mock_fs.return_value | ||
|
|
||
| # Make fs.open raise an error when used | ||
| mock_fs_instance.open.side_effect = OSError("Cannot read ASDF file") | ||
|
|
||
| result = Observations.read_product(s3_asdf_path) | ||
|
|
||
| # Ensure S3FileSystem was created with anon=True | ||
| mock_fs.assert_called_once_with(anon=True) | ||
|
|
||
| # Ensure attempt was made to open the file | ||
| mock_fs_instance.open.assert_called_once_with(s3_asdf_path, "rb") | ||
|
|
||
| # Function should return None after failure | ||
| assert result is None | ||
|
|
||
|
|
||
| def test_read_product_unknown_extension_auto(mocker): | ||
| product_path = "s3://stpubdata/hst/public/u9o4/u9o40504m/u9o40504m.txt" | ||
|
|
||
| # Patch fits.open and asdf.open to ensure they are NOT called | ||
| mock_fits_open = mocker.patch("astropy.io.fits.open") | ||
| mock_asdf_open = mocker.patch("asdf.open") | ||
|
|
||
| result = Observations.read_product(product_path) | ||
|
|
||
| assert result is None | ||
| mock_fits_open.assert_not_called() | ||
| mock_asdf_open.assert_not_called() | ||
|
|
||
|
|
||
| ###################### | ||
| # CatalogClass tests # | ||
| ###################### | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1035,6 +1035,25 @@ def test_observations_get_cloud_uris_no_duplicates(self, msa_product_table, rese | |
| uris = Observations.get_cloud_uris(products) | ||
| assert len(uris) == 1 | ||
|
|
||
| @pytest.mark.remote_data | ||
| def test_observations_read_product_fits(self): | ||
| product_path = "s3://stpubdata/hst/public/u24r/u24r0102t/u24r0102t_c1f.fits" | ||
|
|
||
| product = Observations.read_product(product_path) | ||
|
|
||
| assert isinstance(product, fits.HDUList) | ||
|
|
||
| @pytest.mark.remote_data | ||
| def test_observations_read_product_asdf(self): | ||
| asdf = pytest.importorskip("asdf") | ||
|
|
||
| product_path = "s3://stpubdata/roman/nexus/soc_simulations/tutorial_data" \ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any smaller files that we have available to us from the Nexus? This takes around 15 seconds to load in, and we try to keep this test suite as short in duration as we can. |
||
| "/r0003201001001001004_0001_wfi01_f106_cal.asdf" | ||
|
|
||
| product = Observations.read_product(product_path) | ||
|
|
||
| assert isinstance(product, asdf.AsdfFile) | ||
|
|
||
| ###################### | ||
| # CatalogClass tests # | ||
| ###################### | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -629,3 +629,14 @@ remain fully cloud-based. | |||||
| COMPLETE | ||||||
| COMPLETE | ||||||
| COMPLETE | ||||||
|
|
||||||
| Streaming Data Products from S3 to memory | ||||||
| ----------------------------------------- | ||||||
| If instead of downloading you would like to load an S3 URI directly to memory you can use `~astroquery.mast.ObservationsClass.read_product`. | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| This function supports FITS and ASDF data products and will automatically parse the file for the suffix and load it to memory using `~astropy.io.fits.open` or ``~asdf.open``. | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| For ASDF data products additional packages may be required (e.g lz4 and roman-datamodels for ROMAN data). | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
|
||||||
| .. doctest-remote-data:: | ||||||
|
|
||||||
| >>> from astroquery.mast import Observations | ||||||
| >>> product = Observations.read_product(product_path="s3://stpubdata/hst/public/u9o4/u9o40504m/u9o40504m_c3m.fits", auto=True, ignore_unrecognized=True) | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.