Skip to content

Commit

Permalink
update changelog
Browse files Browse the repository at this point in the history
  • Loading branch information
sliu008 committed Dec 5, 2024
1 parent d797fe1 commit 6c43404
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 53 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added harmony deployment into github actions.
### Changed
- [issue #117](https://github.com/podaac/concise/issues/117): Add part of URL to output file name
- Update python libraries
- Update harmony service lib that changed project structure
- Add Concise exception to propogate up to harmony api calls
### Deprecated
### Removed
### Fixed
Expand Down
133 changes: 80 additions & 53 deletions podaac/merger/harmony/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
from shutil import copyfile
from urllib.parse import urlsplit
from uuid import uuid4
import traceback

from harmony_service_lib.adapter import BaseHarmonyAdapter
from harmony_service_lib.util import bbox_to_geometry, stage
from harmony_service_lib.exceptions import HarmonyException
from pystac import Catalog, Item
from pystac.item import Asset

Expand All @@ -20,6 +22,26 @@
NETCDF4_MIME = 'application/x-netcdf4' # pylint: disable=invalid-name


class ConciseException(HarmonyException):
"""Base class for exceptions in the Harmony GDAL Adapter."""

def __init__(self, original_exception):
# Extract the last traceback entry (most recent call) for the error location
tb = traceback.extract_tb(original_exception.__traceback__)[-1]

# Get the error details: file, line, function, and message
filename = tb.filename
lineno = tb.lineno
funcname = tb.name
error_msg = str(original_exception)

# Format the error message to be more readable
readable_message = (f"Error in file '{filename}', line {lineno}, in function '{funcname}': "
f"{error_msg}")

super().__init__(readable_message, 'nasa/harmony-gdal-adapter')


class ConciseService(BaseHarmonyAdapter):
"""
A harmony-service-lib wrapper around the Concise module. This wrapper does
Expand Down Expand Up @@ -55,63 +77,68 @@ def process_catalog(self, catalog: Catalog):
pystac.Catalog
A new catalog containing the results from the merge
"""
result = catalog.clone()
result.id = str(uuid4())
result.clear_children()

# Get all the items from the catalog, including from child or linked catalogs
items = list(self.get_all_catalog_items(catalog))
try:
result = catalog.clone()
result.id = str(uuid4())
result.clear_children()

# Get all the items from the catalog, including from child or linked catalogs
items = list(self.get_all_catalog_items(catalog))

# Quick return if catalog contains no items
if len(items) == 0:
return result

# -- Process metadata --
bbox = []
granule_urls = []
datetimes = [
datetime.max.replace(tzinfo=timezone.utc), # start
datetime.min.replace(tzinfo=timezone.utc) # end
]

for item in items:
get_bbox(item, bbox)
get_granule_url(item, granule_urls)
get_datetime(item, datetimes)

# Items did not have a bbox; valid under spec
if len(bbox) == 0:
bbox = None

# -- Perform merging --
collection = self._get_item_source(items[0]).collection
first_granule_url = []
get_granule_url(items[0], first_granule_url)
first_url_name = Path(first_granule_url[0]).stem
filename = f'{first_url_name}_{datetimes[1].strftime("%Y%m%dT%H%M%SZ")}_{collection}_merged.nc4'

with TemporaryDirectory() as temp_dir:
self.logger.info('Starting granule downloads')
input_files = multi_core_download(granule_urls, temp_dir, self.message.accessToken, self.config)
self.logger.info('Finished granule downloads')

output_path = Path(temp_dir).joinpath(filename).resolve()
merge_netcdf_files(input_files, output_path, granule_urls, logger=self.logger)
staged_url = self._stage(str(output_path), filename, NETCDF4_MIME)

# -- Output to STAC catalog --
result.clear_items()
properties = {
"start_datetime": datetimes[0].isoformat(),
"end_datetime": datetimes[1].isoformat()
}

item = Item(str(uuid4()), bbox_to_geometry(bbox), bbox, None, properties)
asset = Asset(staged_url, title=filename, media_type=NETCDF4_MIME, roles=['data'])
item.add_asset('data', asset)
result.add_item(item)

# Quick return if catalog contains no items
if len(items) == 0:
return result

# -- Process metadata --
bbox = []
granule_urls = []
datetimes = [
datetime.max.replace(tzinfo=timezone.utc), # start
datetime.min.replace(tzinfo=timezone.utc) # end
]

for item in items:
get_bbox(item, bbox)
get_granule_url(item, granule_urls)
get_datetime(item, datetimes)

# Items did not have a bbox; valid under spec
if len(bbox) == 0:
bbox = None

# -- Perform merging --
collection = self._get_item_source(items[0]).collection
first_granule_url = []
get_granule_url(items[0], first_granule_url)
first_url_name = Path(first_granule_url[0]).stem
filename = f'{first_url_name}_{datetimes[1].strftime("%Y%m%dT%H%M%SZ")}_{collection}_merged.nc4'

with TemporaryDirectory() as temp_dir:
self.logger.info('Starting granule downloads')
input_files = multi_core_download(granule_urls, temp_dir, self.message.accessToken, self.config)
self.logger.info('Finished granule downloads')

output_path = Path(temp_dir).joinpath(filename).resolve()
merge_netcdf_files(input_files, output_path, granule_urls, logger=self.logger)
staged_url = self._stage(str(output_path), filename, NETCDF4_MIME)

# -- Output to STAC catalog --
result.clear_items()
properties = {
"start_datetime": datetimes[0].isoformat(),
"end_datetime": datetimes[1].isoformat()
}

item = Item(str(uuid4()), bbox_to_geometry(bbox), bbox, None, properties)
asset = Asset(staged_url, title=filename, media_type=NETCDF4_MIME, roles=['data'])
item.add_asset('data', asset)
result.add_item(item)

return result
except Exception as ex:
raise ConciseException(ex) from ex

def _stage(self, local_filename, remote_filename, mime):
"""
Expand Down

0 comments on commit 6c43404

Please sign in to comment.