Skip to content

HRRR Async Refactor #301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Hemispheric centred bred vector perturbation now supports single/odd batch sizes
- Refactored NCAR ERA5 source to have async structure
- Refactored GFS and GFS_FX to have async structure
- Refactored HRRR and HRRR_FX to have async structure
- Expanded the data source protocol to also include async fetch functions for async
data sources

Expand Down
2 changes: 1 addition & 1 deletion earth2studio/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async def fetch( # type: ignore[override]
lead_time: timedelta | list[timedelta] | LeadTimeArray,
variable: str | list[str] | VariableArray,
) -> xr.DataArray:
"""Async function to get data. Async data sources support.
"""Async function to get data. Async forecast sources support this.

Parameters
----------
Expand Down
9 changes: 7 additions & 2 deletions earth2studio/data/gfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ async def fetch(
if not self._cache:
shutil.rmtree(self.cache)

# Close aiohttp client if s3fs
# https://github.com/fsspec/s3fs/issues/943
# https://github.com/zarr-developers/zarr-python/issues/2901
if isinstance(self.fs, s3fs.S3FileSystem):
s3fs.S3FileSystem.close_session(asyncio.get_event_loop(), self.fs.s3)

return xr_array.isel(lead_time=0)

async def _create_tasks(
Expand Down Expand Up @@ -343,7 +349,7 @@ async def fetch_array(
xr.DataArray
FS data array for given time and lead time
"""
logger.debug(f"Fetching GRS grib file: {grib_uri} {byte_offset}-{byte_length}")
logger.debug(f"Fetching GFS grib file: {grib_uri} {byte_offset}-{byte_length}")
# Download the grib file to cache
grib_file = await self._fetch_remote_file(
grib_uri,
Expand Down Expand Up @@ -386,7 +392,6 @@ async def _fetch_index(self, index_uri: str) -> dict[str, tuple[int, int]]:
Dictionary of GFS vairables (byte offset, byte length)
"""
# Grab index file
# TODO: Change remote file to be more proper fetch
index_file = await self._fetch_remote_file(index_uri)
with open(index_file) as file:
index_lines = [line.rstrip() for line in file]
Expand Down
Loading