Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pystac_monty/sources/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,20 @@ class GenericDataSource(BaseModel):
class GdacsEpisodes(BaseModel):
type: str
data: GenericDataSource
hazard_type: str | None


class GdacsDataSourceType(BaseModel):
source_url: str
event_data: Union[File, Memory]
episodes: List[Tuple[GdacsEpisodes, GdacsEpisodes]]
episodes: List[Tuple[GdacsEpisodes, GdacsEpisodes, GdacsEpisodes | None]] # EventData, Geometry, Impact


class USGSDataSourceType(BaseModel):
source_url: str
event_data: Union[File, Memory]
loss_data: Union[File, Memory, None] = None
alerts_data: Union[File, Memory, None] = None


class DesinventarDataSourceType(BaseModel):
Expand Down
75 changes: 68 additions & 7 deletions pystac_monty/sources/gdacs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from pystac_monty.sources.utils import phrase_to_dashed
from pystac_monty.validators.gdacs_events import GdacsEventDataValidator, Sendai
from pystac_monty.validators.gdacs_geometry import GdacsGeometryDataValidator
from pystac_monty.validators.gdacs_impacts import GdacsImpactDataValidatorTC, TCImpactItem

# Constants

Expand All @@ -50,6 +51,7 @@
class GDACSDataSourceType(Enum):
EVENT = "geteventdata"
GEOMETRY = "getgeometry"
IMPACT = "getimpact"


@dataclass
Expand All @@ -58,7 +60,7 @@ class GDACSDataSource(MontyDataSourceV3):
source_url: str
event_data: [str, dict]
event_data_file_path: str
episodes: list[Tuple[GdacsEpisodes, GdacsEpisodes]]
episodes: list[Tuple[GdacsEpisodes, GdacsEpisodes, GdacsEpisodes | None]]

def __init__(self, data: GdacsDataSourceType):
super().__init__(data)
Expand Down Expand Up @@ -132,18 +134,25 @@ def get_stac_items_from_memory(self) -> typing.Generator[Item, None, None]:
for episode_data in self.data_source.episodes:
validated_episode_data = GdacsEventDataValidator(**episode_data[0].data.input_data.content)
episode_data_url = episode_data[0].data.source_url
if GDACSDataSourceType.GEOMETRY in episode_data:
if GDACSDataSourceType.GEOMETRY.value == episode_data[1].type:
validated_geometry_data = GdacsGeometryDataValidator(**episode_data[1].data.input_data.content)
geometry_data_url = episode_data[1].data.source_url
else:
validated_geometry_data = None
geometry_data_url = None
if episode_data[2] and GDACSDataSourceType.IMPACT.value == episode_data[2].type:
match episode_data[2].hazard_type:
case "TC":
validated_impact_data = GdacsImpactDataValidatorTC(**episode_data[2].data.input_data.content)
else:
validated_impact_data = None

episode_hazard_item = self.make_hazard_event_item(
episode_event_data=(validated_episode_data, episode_data_url),
episode_geometry_data=(validated_geometry_data, geometry_data_url),
)
yield episode_hazard_item
yield from self.make_impact_items(source_event_item, validated_episode_data)
yield from self.make_impact_items(source_event_item, validated_episode_data, validated_impact_data)
except Exception:
self.transform_summary.increment_failed_rows(1)
logger.warning("Failed to process the GDACS data", exc_info=True)
Expand All @@ -168,7 +177,8 @@ def get_stac_items_from_file(self) -> typing.Generator[Item, None, None]:

validated_episode_data = GdacsEventDataValidator(**episode_file_data)
episode_data_url = episode_data[0].data.source_url
if GDACSDataSourceType.GEOMETRY in episode_data:

if GDACSDataSourceType.GEOMETRY.value == episode_data[1].type:
geometry_data_file = episode_data[1].data.input_data.path
with open(geometry_data_file, "r", encoding="utf-8") as f:
geometry_data = json.loads(f.read())
Expand All @@ -177,12 +187,24 @@ def get_stac_items_from_file(self) -> typing.Generator[Item, None, None]:
else:
validated_geometry_data = None
geometry_data_url = None

if episode_data[2] and GDACSDataSourceType.IMPACT.value == episode_data[2].type:
impact_data_file = episode_data[2].data.input_data.path
with open(impact_data_file, "r", encoding="utf-8") as f:
impact_data = json.loads(f.read())

match episode_data[2].hazard_type:
case "TC":
validated_impact_data = GdacsImpactDataValidatorTC(**impact_data)
else:
validated_impact_data = None

episode_hazard_item = self.make_hazard_event_item(
episode_event_data=(validated_episode_data, episode_data_url),
episode_geometry_data=(validated_geometry_data, geometry_data_url),
)
yield episode_hazard_item
yield from self.make_impact_items(source_event_item, validated_episode_data)
yield from self.make_impact_items(source_event_item, validated_episode_data, validated_impact_data)
except Exception:
self.transform_summary.increment_failed_rows(1)
logger.warning("Failed to process the GDACS data", exc_info=True)
Expand Down Expand Up @@ -362,7 +384,9 @@ def get_hazard_detail(self, data: GdacsEventDataValidator) -> HazardDetail:
estimate_type=MontyEstimateType.PRIMARY,
)

def make_impact_items(self, event_item: Item, data: GdacsEventDataValidator) -> list[Item]:
def make_impact_items(
self, event_item: Item, data: GdacsEventDataValidator, impact_data: GdacsImpactDataValidatorTC
) -> list[Item]:
impact_items = []

# Search for Sendai fields
Expand All @@ -373,11 +397,47 @@ def make_impact_items(self, event_item: Item, data: GdacsEventDataValidator) ->
impact_item = self.make_impact_item_from_sendai_entry(entry, event_item, data=data)
impact_items.append(impact_item)

if impact_data:
for impact_item in impact_data.channel.item:
impact_item = self.make_impact_item_from_tc(impact_item=impact_item, event_item=event_item)
impact_items.append(impact_item)

return impact_items

def make_impact_item_from_tc(self, impact_item: TCImpactItem, event_item: Item) -> Item:
"""Create impact item for Tropical Cyclone (TC)"""
item = event_item.clone()
item.id = phrase_to_dashed(
item.id.replace(STAC_EVENT_ID_PREFIX, STAC_IMPACT_ID_PREFIX)
+ "-"
+ impact_item.storm_id
+ "-"
+ impact_item.advisory_number
)
item.common_metadata.description = impact_item.name
item.properties["forecasted"] = impact_item.actual == "false"
try:
item.common_metadata.created = item.common_metadata.start_datetime = item.common_metadata.end_datetime = (
pytz.utc.localize(datetime.datetime.strptime(impact_item.advisory_datetime, "%d %b %Y %H:%M"))
)
except Exception:
item.common_metadata.created = item.common_metadata.start_datetime = item.common_metadata.end_datetime = None
item.set_collection(self.get_impact_collection())
item.properties["roles"] = ["source", "impact"]
monty = MontyExtension.ext(item)
monty.impact_detail = ImpactDetail(
category=MontyImpactExposureCategory.ALL_PEOPLE,
type=MontyImpactType.POTENTIALLY_AFFECTED if item.properties["forecasted"] else MontyImpactType.TOTAL_AFFECTED,
value=int(impact_item.pop),
unit="GDACS TC",
estimate_type=MontyEstimateType.PRIMARY,
)
return item

def make_impact_item_from_sendai_entry(
self, entry: Sendai, event_item: Item, data: Optional[GdacsEventDataValidator] = None
) -> Item:
"""Create impact item for Flood using Sendai framework"""
item = event_item.clone()
item.id = phrase_to_dashed(
item.id.replace(STAC_EVENT_ID_PREFIX, STAC_IMPACT_ID_PREFIX)
Expand All @@ -390,7 +450,7 @@ def make_impact_item_from_sendai_entry(
+ "-"
+ entry.region
)
item.common_metadata.description = entry.description # entry["description"]
item.common_metadata.description = entry.description
# TODO geolocate the with country and region metadata
# item.geometry = self.geolocate(entry["country"], entry["region"])
item.set_collection(self.get_impact_collection())
Expand Down Expand Up @@ -421,6 +481,7 @@ def make_impact_item_from_sendai_entry(
return item

def get_impact_detail(self, entry: Sendai) -> ImpactDetail:
"""Create impact detail object"""
return ImpactDetail(
category=self.get_impact_category_from_sendai_indicators(entry.sendaitype, entry.sendainame),
type=self.get_impact_type_from_sendai_indicators(entry.sendaitype, entry.sendainame),
Expand Down
110 changes: 102 additions & 8 deletions pystac_monty/sources/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pystac_monty.hazard_profiles import MontyHazardProfiles
from pystac_monty.sources.common import GenericDataSource, MontyDataTransformer, USGSDataSourceType
from pystac_monty.sources.gdacs import DataType, MontyDataSourceV3
from pystac_monty.validators.usgs import EmpiricalValidator, USGSValidator
from pystac_monty.validators.usgs import AlertBin, AlertValidator, EmpiricalValidator, USGSValidator

logger = logging.getLogger(__name__)

Expand All @@ -48,6 +48,11 @@ class USGSLossData(BaseModel):
data: GenericDataSource


class USGSAlertData(BaseModel):
type: str
data: GenericDataSource


@dataclass
class USGSDataSource(MontyDataSourceV3):
type: USGSSourceType
Expand All @@ -56,7 +61,9 @@ class USGSDataSource(MontyDataSourceV3):
event_data_file_path: str
event: USGSEvent
loss_data_file_path: str
loss_data = USGSLossData
loss_data: USGSLossData
alerts_data_file_path: str
alerts_data: USGSAlertData

def __init__(self, data: USGSDataSourceType):
super().__init__(data)
Expand All @@ -71,6 +78,9 @@ def handle_file_data():
if data.loss_data and os.path.isfile(data.loss_data.path):
self.loss_data_file_path = data.loss_data.path

if data.alerts_data and os.path.isfile(data.alerts_data.path):
self.alerts_data_file_path = data.alerts_data.path

def handle_memory_data(): ...

input_data_type = data.event_data.data_type
Expand All @@ -91,12 +101,21 @@ def get_event_data(self) -> typing.Union[dict, str]:
def get_loss_data(self) -> typing.Union[dict, str, None]:
if self.root.loss_data is None:
return []
elif self.root.loss_data and self.root.loss_data.data_type == DataType.FILE:
if self.root.loss_data and self.root.loss_data.data_type == DataType.FILE:
with open(self.loss_data_file_path, "r", encoding="utf-8") as f:
self.loss_data = json.loads(f.read())
return self.loss_data
return self.loss_data

def get_alerts_data(self) -> typing.Union[dict, str, None]:
if not self.root.alerts_data:
return []
if self.root.alerts_data and self.root.alerts_data.data_type == DataType.FILE:
with open(self.alerts_data_file_path, "r", encoding="utf-8") as f:
self.alerts_data = json.loads(f.read())
return self.alerts_data
return self.alerts_data

def get_input_data_type(self) -> DataType:
return self.root.event_data.data_type

Expand Down Expand Up @@ -360,14 +379,16 @@ def iso2_to_iso3(iso2: str) -> str:
"ZM": "ZMB",
"ZW": "ZWE",
}
return iso_mappings[iso2.upper()]
# Fallback: iso2 is returned in case mapping not found.
return iso_mappings.get(iso2.upper(), iso2)

def get_stac_items(self) -> typing.Generator[Item, None, None]:
"""Creates the STAC Items"""
self.transform_summary.mark_as_started()

item_data = self.data_source.get_event_data()
losspager_data = self.data_source.get_loss_data()
alert_data = self.data_source.get_alerts_data()

# Note that only one datapoint is sent
self.transform_summary.increment_rows(1)
Expand All @@ -380,15 +401,26 @@ def get_validated_data(items: list[dict[str, typing.Any]]) -> typing.List[Empiri
validated_losspager_data.append(obj)
return validated_losspager_data

def get_validated_alert_data(items: list) -> typing.List[AlertValidator]:
validated_alert_data: list[AlertValidator] = []
for item in items:
obj = AlertValidator(**item)
validated_alert_data.append(obj)
return validated_alert_data

validated_item = USGSValidator(**item_data)

if event_item := self.make_source_event_item(item_data=validated_item):
yield event_item
losspager_validated_items = get_validated_data(losspager_data)
alert_validated_items = get_validated_alert_data(alert_data)
hazard_item = self.make_hazard_event_item(event_item=event_item, data_item=validated_item)
yield hazard_item
yield from self.make_impact_items(
event_item=event_item, hazard_item=hazard_item, losspager_items=losspager_validated_items
event_item=event_item,
hazard_item=hazard_item,
losspager_items=losspager_validated_items,
alert_items=alert_validated_items,
)
else:
self.transform_summary.increment_failed_rows(1)
Expand Down Expand Up @@ -549,10 +581,14 @@ def make_hazard_event_item(self, event_item: Item, data_item: USGSValidator) ->
return hazard_item

def make_impact_items(
self, event_item: Item, hazard_item: Item, losspager_items: typing.List[EmpiricalValidator]
self,
event_item: Item,
hazard_item: Item,
losspager_items: typing.List[EmpiricalValidator],
alert_items: typing.List[AlertValidator],
) -> typing.List[Item]:
"""Create impact items (PAGER) from USGS data."""
if not losspager_items:
if not losspager_items and not alert_items:
return []

impact_items = []
Expand Down Expand Up @@ -592,8 +628,62 @@ def make_impact_items(
)
impact_items.append(economic_item)

for alert_data in alert_items:
if alert_data.fatality:
alert_ppl_item = self._create_impact_item_from_alerts(
impact_type=alert_data.fatality.type or "fatality",
category=MontyImpactExposureCategory.ALL_PEOPLE,
imp_type=MontyImpactType.POTENTIALLY_AFFECTED,
value=self._calculate_value_from_bins(alert_data.fatality.bins),
unit=alert_data.fatality.units,
event_item=event_item,
)
impact_items.append(alert_ppl_item)
if alert_data.economic:
alert_economic_item = self._create_impact_item_from_alerts(
impact_type=alert_data.economic.type or "economic",
category=MontyImpactExposureCategory.GLOBAL_CURRENCY,
imp_type=MontyImpactType.POTENTIALLY_AFFECTED,
value=self._calculate_value_from_bins(alert_data.economic.bins),
unit=alert_data.economic.units,
event_item=event_item,
)
impact_items.append(alert_economic_item)
return impact_items

def _calculate_value_from_bins(self, bin_data: typing.List[AlertBin]) -> int:
if not bin_data:
return 0
try:
total = sum([(float(data.min) + float(data.max)) / 2 * data.probability for data in bin_data])
return int(total)
except Exception:
return 0

def _create_impact_item_from_alerts(
self,
impact_type: str,
category: MontyImpactExposureCategory,
imp_type: MontyImpactType,
value: float,
unit: str,
event_item: Item,
) -> Item:
impact_item = event_item.clone()
monty = MontyExtension.ext(impact_item)

impact_item.id = (
f"{impact_item.id.replace(STAC_EVENT_ID_PREFIX, STAC_IMPACT_ID_PREFIX)}-{impact_type}-{monty.country_codes[0]}"
)
impact_item.set_collection(self.get_impact_collection())

impact_item.properties["forecasted"] = True

monty.impact_detail = ImpactDetail(
category=category, type=imp_type, value=value, unit=unit, estimate_type=MontyEstimateType.MODELLED
)
return impact_item

def _create_impact_item_from_losses(
self,
impact_type: str,
Expand All @@ -608,7 +698,9 @@ def _create_impact_item_from_losses(
"""Helper method to create impact items from PAGER losses data."""

impact_item = event_item.clone()
impact_item.id = f"{STAC_IMPACT_ID_PREFIX}{impact_item.id.replace(STAC_EVENT_ID_PREFIX, '')}-{impact_type}-{iso2}"
impact_item.id = (
f"{impact_item.id.replace(STAC_EVENT_ID_PREFIX, STAC_IMPACT_ID_PREFIX)}-{impact_type}-{self.iso2_to_iso3(iso2)}"
)

# Set title and description
title_prefix = "Estimated Fatalities" if impact_type == "fatalities" else "Estimated Economic Losses"
Expand All @@ -619,6 +711,8 @@ def _create_impact_item_from_losses(
impact_item.set_collection(self.get_impact_collection())
impact_item.properties["roles"] = ["source", "impact"]

impact_item.properties["forecasted"] = False

# Add impact detail
monty = MontyExtension.ext(impact_item)
monty.country_codes = [self.iso2_to_iso3(iso2)]
Expand Down
Loading
Loading