Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d7812f7
create a dag to handle the daily scheduled nightlight NRT data. The d…
sanzog03 Jun 2, 2025
06a517a
update the nrt update check and nrt data update task
sanzog03 Jun 3, 2025
0426b14
refactor to use the date extracted from metadata, into the collection…
sanzog03 Jun 3, 2025
77d8fd6
replace space with underscore on id
sanzog03 Jun 3, 2025
157f85d
update code into more granular subgroups. also moved nrt update speci…
sanzog03 Jun 4, 2025
5a76e33
removed unnecessary import
sanzog03 Jun 4, 2025
dc8602f
fix proper scoping on the branch_task return, enabling followup task
sanzog03 Jun 5, 2025
9264cc3
refactor dag to use taskflow api completely. added docstring to the t…
sanzog03 Jun 5, 2025
a12768f
add stac_extension to include web map links used by wmts capabilities.
sanzog03 Jun 5, 2025
92b911b
generic def names
sanzog03 Jun 5, 2025
d68f9ea
task group taking whatever is needed via parameters instead of global
sanzog03 Jun 5, 2025
95a7d64
modified dag creation with the thought of re-useability
sanzog03 Jun 5, 2025
ad97e93
generate dag docs based on the reused dag
sanzog03 Jun 5, 2025
fcaba93
add veda_worldview_nrt_data_collection_update_dag_creator to generate…
sanzog03 Jun 5, 2025
358e1e1
changed variables names and dag id generation to be more semantic
sanzog03 Jun 5, 2025
86b258c
change the taskgroupid to be generic
sanzog03 Jun 5, 2025
5026a63
add relevant dag tags
sanzog03 Jun 5, 2025
c1a6d19
fix collection config
sanzog03 Jun 5, 2025
2b6bcf1
added some comments
sanzog03 Nov 13, 2025
73ee5e8
renamed the dags and configs for better clarity
sanzog03 Nov 13, 2025
9646c50
Merge branch 'dev' into GHGC-632/scheduled_worldview_nrt_collection_u…
sanzog03 Nov 13, 2025
0a72f8b
remove unnecessary code
sanzog03 Nov 13, 2025
dd30c6b
take schedule from generate dag config. better namings
sanzog03 Nov 14, 2025
260324f
add ability to ingest wmts without gibs
sanzog03 Nov 14, 2025
82987e9
remove unwanted extensions from wmts config json
sanzog03 Nov 14, 2025
7e09e8b
adhere to the return type
sanzog03 Nov 14, 2025
e5b16e3
single task instantiation for whatever is necessary
sanzog03 Nov 14, 2025
c5208a7
refactor for better task arrangement and readability
sanzog03 Nov 14, 2025
fa67e1b
import error
sanzog03 Nov 14, 2025
94ae24f
1. made gibs url and schedule optional.
sanzog03 Nov 17, 2025
0e16bb1
wmts and gibs branching fix
sanzog03 Nov 17, 2025
d58dc2f
rename to make the dag generic for all wmts and not just gibs wmts
sanzog03 Nov 17, 2025
6cf67d9
update doc string
sanzog03 Nov 17, 2025
2f0ffd9
1. add validation task which validates if the json config is as per t…
sanzog03 Nov 17, 2025
087b486
remove comment
sanzog03 Nov 17, 2025
f1fb195
corrected import for tests
sanzog03 Nov 17, 2025
c33b58c
update wmts with validator
kyle-lesinger Nov 25, 2025
a1f3d5a
update pytests
kyle-lesinger Nov 25, 2025
ed6d435
add slack fail alert
kyle-lesinger Nov 25, 2025
0d61a5f
add value error messages
kyle-lesinger Nov 25, 2025
b2249b9
reduce number of logging statements
kyle-lesinger Dec 1, 2025
4a240f1
update generate dags
kyle-lesinger Dec 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions dags/generate_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,23 @@

from veda_data_pipeline.veda_discover_pipeline import get_discover_dag
from veda_data_pipeline.veda_vector_pipeline import get_ingest_vector_dag
from veda_data_pipeline.veda_wmts2stac_update_pipeline import get_ingest_wmts2stac_dag
from veda_data_pipeline.veda_pyarc2stac_pipeline import get_ingest_pyarc2stac_dag
from veda_data_pipeline.helpers.veda_wmts2stac_update_pipeline import get_ingest_wmts2stac_dag_config

dag_generators = {
"veda_discover": get_discover_dag,
"veda_ingest_vector": get_ingest_vector_dag,
"veda_pyarc2stac_ingest": get_ingest_pyarc2stac_dag,
"veda_wmts2stac_ingest": get_ingest_wmts2stac_dag
}

# preserve DAG history
dag_names = {
"veda_discover": "discover",
"veda_ingest_vector": "vector",
"veda_pyarc2stac_ingest": "pyarc2stac",
"veda_wmts2stac_ingest": "wmts2stac"
}

def generate_dags():
Expand Down Expand Up @@ -62,10 +66,13 @@ def generate_dags():
if c.get("schedule", None) and (dag := c.get("dag", "veda_discover")):
if id := c.get("id"):
file_name = id # use id for DAG name if provided, otherwise default to file name
dag_generators[dag](id=f"{dag_names[dag]}-{file_name}", event=c)

try:
dag_generators[dag](id=f"{dag_names[dag]}-{file_name}", event=c)
except KeyError:
continue # configured DAG not present in current environment

generate_dags()
# create default DAGs (no config or schedule)
get_ingest_vector_dag(id="veda_ingest_vector", event={})
get_discover_dag(id="veda_discover", event={})
get_ingest_wmts2stac_dag(id='veda_wmts2stac', event=get_ingest_wmts2stac_dag_config)
Empty file.
310 changes: 310 additions & 0 deletions dags/veda_data_pipeline/helpers/veda_wmts2stac_update_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
import requests
import datetime
from dataclasses import dataclass
from typing import Optional
from airflow.operators.empty import EmptyOperator
from airflow.decorators import dag, task, task_group
from veda_data_pipeline.groups.collection_group import ingest_collection_task
from veda_data_pipeline.utils.validate import validate_collection

WMTS2STACConfig = dict[str, any] # this mostly comply with a STAC json config

@dataclass
class VedaWMTS2STACConfig:
id: str # unique identifier
dag: str # name of the dag used in generate dag
collection_config: WMTS2STACConfig #TODO: if this is later generated by some library or is stored in S3, get it from there.
schedule: Optional[str] = None
gibs_url: Optional[str] = None # gibs url that has <ows:Identifier> for the indexed STAC collection as well as the associated time dimension.

## Tasks

@task
def is_data_current(date:str = "") -> bool:
"""
Check if the data is current.
If the input date equals to today, it returns True, else False

:param date: Date string in the format "%Y-%m-%d" representing the latest NRT data date.
:type date: str, optional
:return: True if the NRT data is up-to-date (date is today or a future date), False otherwise.
:rtype: bool
"""
if (not date):
return False

year, month, day = map(int, date.split("-"))
latest_data_date = datetime.date(year, month, day)
today = datetime.date.today()
if (latest_data_date >= today):
return True
return False

@task()
def update_nrt_collection_task(previous_collection=None, latest_nrt_date=None):
"""
Updates the temporal extent of a collection with the latest NRT (Near Real-Time) data date.

This task takes the previous collection metadata and the latest available date for NRT data,
and updates the collection's temporal extent to include this new data.

:param previous_collection: The previous collection config as a dictionary.
:type previous_collection: dict, optional
:param latest_nrt_date: The latest date for which NRT data is available, in 'YYYY-MM-DD' format.
Defaults to None.
:type latest_nrt_date: str, optional
:return: The updated collection config with the temporal extent updated to include the
latest NRT data date.
:rtype: dict or None
"""
import copy

if (not previous_collection or not latest_nrt_date):
return None

updated_collection = copy.deepcopy(previous_collection)
year, month, day = map(int, latest_nrt_date.split("-"))
latest_nrt_data_date = datetime.date(year, month, day)
formatted_datetime = latest_nrt_data_date.strftime("%Y-%m-%dT00:00:00Z")
updated_collection['extent']['temporal']['interval'][0][1] = formatted_datetime
return updated_collection

@task
def fetch_metadata_from_gibs(gibs_url: str="https://gibs.earthdata.nasa.gov/wmts/epsg4326/best/1.0.0/WMTSCapabilities.xml") -> str:
"""
Fetches the metadata from the NASA Global Imagery Browse Services (GIBS) server.

Args:
gibs_url (str): The URL for the GIBS WMTSCapabilities.xml endpoint.
Defaults to the public NASA GIBS endpoint for EPSG:4326.

Returns:
str: A string containing the XML response from the GIBS server,
or an empty string if the fetch fails.

Raises:
requests.exceptions.RequestException: If the HTTP request to the GIBS server fails.
"""
try:
response = requests.get(gibs_url)
response.raise_for_status()
xml_string = response.text
return xml_string
except requests.exceptions.RequestException as e:
print(f"Error fetching from the gibs: {e}")
return ""

@task
def extract_latest_nrt_date_for_collection(xml_string: str, collection_id:str='VIIRS_SNPP_DayNightBand_At_Sensor_Radiance') -> str:
"""
Extracts the latest date from the WMTS GetCapabilities XML response for a specified layer.

This function parses an XML string to find the most recent date available for a given
collection ID within the WMTS GetCapabilities response. It uses the XML namespace
to correctly locate the relevant elements and attributes.

:param xml_string: A string containing the XML response from the WMTS GetCapabilities request.
:type xml_string: str
:param collection_id: The identifier of the layer for which to extract the latest date.
Defaults to 'VIIRS_SNPP_DayNightBand_At_Sensor_Radiance'.
:type collection_id: str, optional
:return: The latest date as a string, extracted from the XML. Returns an empty string if the XML is empty,
or if the specified layer or date information is not found.
:rtype: str
:raises ET.ParseError: If the `xml_string` is not a valid XML.
"""
import xml.etree.ElementTree as ET
XML_NAMESPACE = {'xmlns': 'http://www.opengis.net/wmts/1.0'}
OWS_NAMESPACE = {'ows': 'http://www.opengis.net/ows/1.1'}

if not xml_string:
return ""

root = ET.fromstring(xml_string)
contents = root.find('xmlns:Contents', XML_NAMESPACE)
if contents is None:
return ""
layers = contents.findall('xmlns:Layer', XML_NAMESPACE)
if not layers:
return ""
for layer in layers:
layer_id = layer.find('ows:Identifier', OWS_NAMESPACE).text
if (layer_id == collection_id):
dimension = layer.find('xmlns:Dimension', XML_NAMESPACE)
dimension_id = dimension.find('ows:Identifier', OWS_NAMESPACE).text
if (dimension_id == 'Time'):
layer_date = dimension.find('xmlns:Default', XML_NAMESPACE).text
return layer_date
return ""

@task
def validate_collection_task(collection_config: dict) -> dict:
"""
Validates the collection config using PySTAC's native validation.

This uses pystac.Collection.validate() which automatically validates against
the STAC specification and all declared extensions in stac_extensions.

:param collection_config: The collection configuration dictionary
:return: The validated collection config
:raises ValueError: If the config doesn't conform to the STAC schema or extensions
"""

if "https://stac-extensions.github.io/web-map-links/v1.2.0/schema.json" in collection_config.get('stac_extensions', []):
return validate_collection(collection_config)
else:
raise ValueError("STAC does not have web-map-link extension needed for WMTS!")


## Task groups

@task_group(group_id="validation_task_group", tooltip="validate if collection update is needed via metadata available on gibs")
def validation_task_group(gibs_url: str, collection_id: str) -> dict:
"""
Task group to validate if a collection update is needed based on metadata from GIBS.
This group fetches metadata from GIBS which is in XML format, extracts the latest date, and checks if an update is needed.
:param gibs_url: The URL to the GIBS metadata endpoint.
:param collection_id: The id of the collection in the GIBS.
:return: A dictionary containing a boolean indicating if an update is needed and the latest layer date.
"""
xml_string_data = fetch_metadata_from_gibs(gibs_url)
latest_layer_date = extract_latest_nrt_date_for_collection(xml_string_data, collection_id)
is_data_current_res = is_data_current(latest_layer_date) # if its current, the latest nrt data source is updated to todays date, so need to update in STAC too.
return {
"is_update_needed": is_data_current_res,
"latest_layer_date": latest_layer_date
}

@task_group(group_id="collection_update_task_group", tooltip="update the nightlight nrt collection with the available nrt date")
def collection_update_task_group(collection: dict, latest_layer_date: str) -> None:
"""Updates the collection config json with the latest available date and ingests it.

:param collection: Dictionary representing the NRT collection config.
:param latest_layer_date: The latest date for which NRT data is available.
:return: None
"""
updated_collection = update_nrt_collection_task(collection, latest_layer_date)
ingest_collection_task(collection=updated_collection)

@task_group(group_id="gibs_wmts2stac_update_task_group", tooltip="gibs wmts to stac update")
def gibs_wmts2stac_update_task_group(collection: dict, collection_id: str, gibs_url: str) -> None:
"""
Task group to manage the update of the WMTS2STAC sourced from GIBS.

This task group consists of tasks that:
1. Validate if an update is needed by comparing the latest available data on GIBS with the current collection.
2. If an update is needed, it updates the NRT collection with the latest available date.
3. Finally, it ingests the updated collection.

The sub-tasks are grouped into validation and collection update task groups for better organization and readability.
A branching task determines whether to proceed with the update based on the validation result.
:param collection: A collection config dictionary.
:param gibs_url: The URL to the GIBS metadata endpoint.
:param collection_id: The id of the collection in the GIBS.
:return: None
"""
no_action_needed = EmptyOperator(task_id='no_action_needed')

@task.branch
def branch_update_needed(update_needed: bool) -> str:
"""
This task branches to either no_action or to update_nrt_collection_task,
based on the provided boolean representing if updated is needed or not.
:param update_needed: Boolean
"""
if not update_needed:
return 'gibs_wmts2stac_update_task_group.no_action_needed'
else:
return 'gibs_wmts2stac_update_task_group.collection_update_task_group.update_nrt_collection_task'

validation_result = validation_task_group(gibs_url, collection_id)
is_update_needed = validation_result['is_update_needed']
latest_layer_date = validation_result['latest_layer_date']

branch_choice_instance = branch_update_needed(is_update_needed)
branch_choice_instance >> [no_action_needed, collection_update_task_group(collection, latest_layer_date)]

@task_group(group_id="wmts2stac_task_group", tooltip="wmts2stac")
def wmts2stac_task_group(collection: dict) -> None:
# ingest the collection_config to stac
ingest_collection_task(collection=collection)

## Example Configs:

# Example Collection Config. Used as a default value.
VIIRS_SNPP_NRT_collection: WMTS2STACConfig = {
"assets": {},
"id": "VIIRS_SNPP_DayNightBand_At_Sensor_Radiance",
"dashboard:is_periodic": True,
"dashboard:time_density": "day",
"dashboard:time_interval": "P1D",
"description": "The Black Marble Nighttime At Sensor Radiance (Day/Night Band) layer is created from NASA's Black Marble daily at-sensor top-of-atmosphere nighttime radiance product (VNP46A1). It is displayed as a grayscale image. The layer is expressed in radiance units (nW/(cm2 sr)) with log10 conversion. It is stretched up to 38 nW/(cm2 sr) resulting in improvements in capturing city lights in greater spatial detail than traditional Nighttime Imagery resampled at 0-255 (e.g., Day/Night Band, Enhanced Near Constant Contrast).The ultra-sensitivity of the VIIRS Day/Night Band enables scientists to capture the Earth's surface and atmosphere in low light conditions, allowing for better monitoring of nighttime phenomena. These images are also useful for assessing anthropogenic sources of light emissions under varying illumination conditions. For instance, during partial to full moon conditions, the layer can identify the location and features of clouds and other natural terrestrial features such as sea ice and snow cover, while enabling temporal observations in urban regions, regardless of moonlit conditions. As such, the layer is particularly useful for detecting city lights, lightning, auroras, fires, gas flares, and fishing fleets.The Black Marble Nighttime At Sensor Radiance (Day/Night Band) layer is available in near real-time from the Visible Infrared Imaging Radiometer Suite (VIIRS) aboard the joint NASA/NOAA Suomi National Polar orbiting Partnership (Suomi NPP) satellite. The sensor resolution is 750 m at nadir, imagery resolution is 500 m, and the temporal resolution is daily.",
"extent": {
"spatial": {
"bbox": [
[
-180,
-90,
180,
90
]
]
},
"temporal": {
"interval": [
[
"2020-11-10T00:00:00Z",
"2025-04-14T00:00:00Z"
]
]
}
},
"is_periodic": True,
"item_assets": {
"cog_default": {
"description": "Cloud optimized default layer to display on map",
"roles": [
"data",
"layer"
],
"title": "Default COG Layer",
"type": "image/tiff; application=geotiff; profile=cloud-optimized"
}
},
"license": "MIT",
"links": [
{
"href": "https://gibs{s}.earthdata.nasa.gov/wmts/epsg3857/best/wmts.cgi",
"href:servers": ["-a", "-b"],
"rel": "wmts",
"title": "Visualized through a WMTS",
"type": "image/png",
"wmts:dimensions": {
"STYLE": "default"
},
"wmts:layer": ["VIIRS_SNPP_DayNightBand_At_Sensor_Radiance"]
}
],
"product_level": "L2",
"providers": [],
"renders": {},
"stac_extensions": [
"https://stac-extensions.github.io/web-map-links/v1.2.0/schema.json"
],
"stac_version": "1.1.0",
"temporal_frequency": "twenty four hours",
"time_density": "day",
"time_interval": "P1D",
"title": "Black Marble Nighttime At Sensor Radiance (Day/Night Band)",
"type": "Collection",
"units": "m·s⁻¹"
}

# Example NRT Collection Update Config. Used as a default value.
get_ingest_wmts2stac_dag_config: VedaWMTS2STACConfig = {
"id": "wmts2stac-wmts_gibs_update",
"dag": "veda_wmts2stac_ingest",
"schedule": "0 0 * * *",
"collection_config": VIIRS_SNPP_NRT_collection,
"gibs_url": "https://gibs.earthdata.nasa.gov/wmts/epsg4326/best/1.0.0/WMTSCapabilities.xml"
}
Loading