Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ and this project uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html)

### Added

- Add `get_virtual_reference()`: returns a reference to a consolidated virtual zarr dataset
if there is one available in CMR. ([#964](https://github.com/nsidc/earthaccess/pull/964))
([@DeanHenze](https://github.com/DeanHenze))
- Add notebook demonstrating workflow with TEMPO Level 3 data as a virtual dataset
([#924](https://github.com/nsidc/earthaccess/pull/924))
([@danielfromearth](https://github.com/danielfromearth))
Expand Down
9 changes: 7 additions & 2 deletions earthaccess/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
search_services,
)
from .auth import Auth
from .dmrpp_zarr import open_virtual_dataset, open_virtual_mfdataset
from .kerchunk import consolidate_metadata
from .search import DataCollection, DataCollections, DataGranule, DataGranules
from .services import DataServices
from .store import Store
from .system import PROD, UAT
from .zarr import (
consolidate_metadata,
get_virtual_reference,
open_virtual_dataset,
open_virtual_mfdataset,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -59,6 +63,7 @@
"Store",
# kerchunk
"consolidate_metadata",
"get_virtual_reference",
# virtualizarr
"open_virtual_dataset",
"open_virtual_mfdataset",
Expand Down
74 changes: 0 additions & 74 deletions earthaccess/kerchunk.py

This file was deleted.

9 changes: 9 additions & 0 deletions earthaccess/zarr/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .dmrpp_zarr import open_virtual_dataset, open_virtual_mfdataset
from .kerchunk import consolidate_metadata, get_virtual_reference

__all__ = [
"consolidate_metadata",
"open_virtual_dataset",
"open_virtual_mfdataset",
"get_virtual_reference",
]
File renamed without changes.
157 changes: 157 additions & 0 deletions earthaccess/zarr/kerchunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from __future__ import annotations

import json
import logging
import zipfile
from pathlib import Path
from typing import Optional, Union
from uuid import uuid4

import fsspec
import fsspec.utils
import s3fs

# import ipdb
import earthaccess
import zarr

logger = logging.getLogger(__name__)


def _get_chunk_metadata(
granule: earthaccess.DataGranule,
fs: fsspec.AbstractFileSystem,
) -> list[dict]:
from kerchunk.hdf import SingleHdf5ToZarr

if not isinstance(granule, earthaccess.DataGranule) and isinstance(granule, dict):
# WHY: dask serialization is doing something weird, it serializes the granule as a simple dict
# we need to add cast it back to a datagranule to get the nice methods for parsing the data links
# TODO: ask James what is going on
granule = earthaccess.DataGranule(granule)

metadata = []
access = "direct" if isinstance(fs, s3fs.S3FileSystem) else "indirect"
# ipdb.set_trace()

for url in granule.data_links(access=access):
with fs.open(url) as inf:
h5chunks = SingleHdf5ToZarr(inf, url) # type: ignore
m = h5chunks.translate()
metadata.append(m)

return metadata


def consolidate_metadata(
granules: list[earthaccess.DataGranule],
kerchunk_options: Optional[dict] = None,
access: str = "direct",
outfile: Optional[str] = None,
storage_options: Optional[dict] = None,
) -> Union[str, dict]:
try:
import dask
from kerchunk.combine import MultiZarrToZarr
except ImportError as e:
raise ImportError(
"`earthaccess.consolidate_metadata` requires `dask` and `kerchunk` to be be installed"
) from e

if access == "direct":
fs = earthaccess.get_s3_filesystem(provider=granules[0]["meta"]["provider-id"])
else:
fs = earthaccess.get_fsspec_https_session()

# Get metadata for each granule
get_chunk_metadata = dask.delayed(_get_chunk_metadata) # type: ignore

# ipdb.set_trace()
chunks = dask.compute(*[get_chunk_metadata(g, fs) for g in granules]) # type: ignore
chunks = sum(chunks, start=[])

# Get combined metadata object
mzz = MultiZarrToZarr(chunks, **(kerchunk_options or {}))

if outfile is None:
return mzz.translate()

output = fsspec.utils.stringify_path(outfile)
mzz.translate(outfile, storage_options=storage_options or {})
return output


def get_virtual_reference(
short_name: str = "", cloud_hosted: bool = True, format: str = "parquet"
) -> Union[fsspec.FSMap, None]:
"""Returns a virtual reference file for a given collection, this reference has to be created by the DAAC
distributing the data. The reference mapper can be used directly in xarray as a Zarr store.
"""
file_types = {
"parquet": "parq.zip",
"json": "json",
}

# Find collection-level metadata (UMM-C) on CMR:
collections = earthaccess.search_datasets(
short_name=short_name, cloud_hosted=cloud_hosted
)

links = collections[0]["umm"].get("RelatedUrls", [])

# Look within UMM-C for links to virtual data set reference files:
# I think both json or parquet should be under VIRTUAL COLLECTION
refs = [
e["URL"]
for e in links
if "Subtype" in e
and (("VIRTUAL COLLECTION" in e["Subtype"]) or ("DATA RECIPE" in e["Subtype"]))
]

# Currently it is assumed that link descriptions have the following format:
if refs:
logger.info("Virtual data set reference file exists for this collection")
link = [link for link in refs if link.endswith(file_types[format])][0]
else:
logger.info(
"Virtual data set reference file does not exists in",
"There may be a reference file in a different format, or else you will have to",
"open this data set using traditional netCDF/HDF methods.",
)
return None

if zarr.__version__ > "3.0.0" and link.endswith(".json"):
# see: https://github.com/nsidc/earthaccess/issues/1046
logger.warning("""Using Zarr V3, open as kerchunk store with:

data = xr.open_dataset(consolidated_virtual_store,
engine="kerchunk",
chunks={},
backend_kwargs={"storage_options":
{"remote_protocol": "s3",
"remote_options": fs.storage_options}})
""")
return link

# this assumes the ref point to s3 links, we'll have to refactor later
http_fs = earthaccess.get_fsspec_https_session()
fs = earthaccess.get_s3_filesystem(provider=collections[0]["meta"]["provider-id"])
if link.endswith(".json"):
with http_fs.open(link) as f:
ref_loc = json.load(f)
else:
with http_fs.open(link, "rb") as remote_zip:
# Unzip the contents into the temporary directory
with zipfile.ZipFile(remote_zip, "r") as zip_ref:
id = uuid4()
local_path = Path(f".references/{id}")
zip_ref.extractall(local_path)
ref_loc = str([d for d in local_path.iterdir() if d.is_dir()][0])

storage_opts = {
"fo": ref_loc,
"remote_protocol": "s3",
"remote_options": fs.storage_options,
}
file_ref = fsspec.filesystem("reference", **storage_opts)
return file_ref.get_mapper("")
Loading