Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pystac_monty/sources/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class USGSDataSourceType(BaseModel):
source_url: str
event_data: Union[File, Memory]
loss_data: Union[File, Memory, None] = None
alerts_data: Union[File, Memory, None] = None


class DesinventarDataSourceType(BaseModel):
Expand Down
101 changes: 94 additions & 7 deletions pystac_monty/sources/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pystac_monty.hazard_profiles import MontyHazardProfiles
from pystac_monty.sources.common import GenericDataSource, MontyDataTransformer, USGSDataSourceType
from pystac_monty.sources.gdacs import DataType, MontyDataSourceV3
from pystac_monty.validators.usgs import EmpiricalValidator, USGSValidator
from pystac_monty.validators.usgs import AlertBin, AlertValidator, EmpiricalValidator, USGSValidator

logger = logging.getLogger(__name__)

Expand All @@ -48,6 +48,11 @@ class USGSLossData(BaseModel):
data: GenericDataSource


class USGSAlertData(BaseModel):
type: str
data: GenericDataSource


@dataclass
class USGSDataSource(MontyDataSourceV3):
type: USGSSourceType
Expand All @@ -56,7 +61,9 @@ class USGSDataSource(MontyDataSourceV3):
event_data_file_path: str
event: USGSEvent
loss_data_file_path: str
loss_data = USGSLossData
loss_data: USGSLossData
alerts_data_file_path: str
alerts_data: USGSAlertData

def __init__(self, data: USGSDataSourceType):
super().__init__(data)
Expand All @@ -71,6 +78,9 @@ def handle_file_data():
if data.loss_data and os.path.isfile(data.loss_data.path):
self.loss_data_file_path = data.loss_data.path

if data.alerts_data and os.path.isfile(data.alerts_data.path):
self.alerts_data_file_path = data.alerts_data.path

def handle_memory_data(): ...

input_data_type = data.event_data.data_type
Expand All @@ -91,12 +101,21 @@ def get_event_data(self) -> typing.Union[dict, str]:
def get_loss_data(self) -> typing.Union[dict, str, None]:
if self.root.loss_data is None:
return []
elif self.root.loss_data and self.root.loss_data.data_type == DataType.FILE:
if self.root.loss_data and self.root.loss_data.data_type == DataType.FILE:
with open(self.loss_data_file_path, "r", encoding="utf-8") as f:
self.loss_data = json.loads(f.read())
return self.loss_data
return self.loss_data

def get_alerts_data(self) -> typing.Union[dict, str, None]:
if not self.root.alerts_data:
return []
if self.root.alerts_data and self.root.alerts_data.data_type == DataType.FILE:
with open(self.alerts_data_file_path, "r", encoding="utf-8") as f:
self.alerts_data = json.loads(f.read())
return self.alerts_data
return self.alerts_data

def get_input_data_type(self) -> DataType:
return self.root.event_data.data_type

Expand Down Expand Up @@ -368,6 +387,7 @@ def get_stac_items(self) -> typing.Generator[Item, None, None]:

item_data = self.data_source.get_event_data()
losspager_data = self.data_source.get_loss_data()
alert_data = self.data_source.get_alerts_data()

# Note that only one datapoint is sent
self.transform_summary.increment_rows(1)
Expand All @@ -380,15 +400,26 @@ def get_validated_data(items: list[dict[str, typing.Any]]) -> typing.List[Empiri
validated_losspager_data.append(obj)
return validated_losspager_data

def get_validated_alert_data(items: list) -> typing.List[AlertValidator]:
validated_alert_data: list[AlertValidator] = []
for item in items:
obj = AlertValidator(**item)
validated_alert_data.append(obj)
return validated_alert_data

validated_item = USGSValidator(**item_data)

if event_item := self.make_source_event_item(item_data=validated_item):
yield event_item
losspager_validated_items = get_validated_data(losspager_data)
alert_validated_items = get_validated_alert_data(alert_data)
hazard_item = self.make_hazard_event_item(event_item=event_item, data_item=validated_item)
yield hazard_item
yield from self.make_impact_items(
event_item=event_item, hazard_item=hazard_item, losspager_items=losspager_validated_items
event_item=event_item,
hazard_item=hazard_item,
losspager_items=losspager_validated_items,
alert_items=alert_validated_items,
)
else:
self.transform_summary.increment_failed_rows(1)
Expand Down Expand Up @@ -549,10 +580,14 @@ def make_hazard_event_item(self, event_item: Item, data_item: USGSValidator) ->
return hazard_item

def make_impact_items(
self, event_item: Item, hazard_item: Item, losspager_items: typing.List[EmpiricalValidator]
self,
event_item: Item,
hazard_item: Item,
losspager_items: typing.List[EmpiricalValidator],
alert_items: typing.List[AlertValidator],
) -> typing.List[Item]:
"""Create impact items (PAGER) from USGS data."""
if not losspager_items:
if not losspager_items and not alert_items:
return []

impact_items = []
Expand Down Expand Up @@ -592,8 +627,58 @@ def make_impact_items(
)
impact_items.append(economic_item)

for alert_data in alert_items:
if alert_data.fatality:
alert_ppl_item = self._create_impact_item_from_alerts(
impact_type=alert_data.fatality.type or "fatality",
category=MontyImpactExposureCategory.ALL_PEOPLE,
imp_type=MontyImpactType.POTENTIALLY_AFFECTED,
value=self._calculate_value_from_bins(alert_data.fatality.bins),
unit=alert_data.fatality.units,
event_item=event_item,
)
impact_items.append(alert_ppl_item)
if alert_data.economic:
alert_economic_item = self._create_impact_item_from_alerts(
impact_type=alert_data.economic.type or "economic",
category=MontyImpactExposureCategory.GLOBAL_CURRENCY,
imp_type=MontyImpactType.POTENTIALLY_AFFECTED,
value=self._calculate_value_from_bins(alert_data.economic.bins),
unit=alert_data.economic.units,
event_item=event_item,
)
impact_items.append(alert_economic_item)
return impact_items

def _calculate_value_from_bins(self, bin_data: typing.List[AlertBin]) -> int:
if not bin_data:
return 0
max_prob_data = max(bin_data, key=lambda x: x.probability or 0)
value = (float(max_prob_data.max) + float(max_prob_data.min)) / 2.0 * max_prob_data.probability
return int(value)

def _create_impact_item_from_alerts(
self,
impact_type: str,
category: MontyImpactExposureCategory,
imp_type: MontyImpactType,
value: float,
unit: str,
event_item: Item,
) -> Item:
impact_item = event_item.clone()
monty = MontyExtension.ext(impact_item)

impact_item.id = (
f"{impact_item.id.replace(STAC_EVENT_ID_PREFIX, STAC_IMPACT_ID_PREFIX)}-{impact_type}-{monty.country_codes[0]}"
)
impact_item.set_collection(self.get_impact_collection())

monty.impact_detail = ImpactDetail(
category=category, type=imp_type, value=value, unit=unit, estimate_type=MontyEstimateType.MODELLED
)
return impact_item

def _create_impact_item_from_losses(
self,
impact_type: str,
Expand All @@ -608,7 +693,9 @@ def _create_impact_item_from_losses(
"""Helper method to create impact items from PAGER losses data."""

impact_item = event_item.clone()
impact_item.id = f"{STAC_IMPACT_ID_PREFIX}{impact_item.id.replace(STAC_EVENT_ID_PREFIX, '')}-{impact_type}-{iso2}"
impact_item.id = (
f"{impact_item.id.replace(STAC_EVENT_ID_PREFIX, STAC_IMPACT_ID_PREFIX)}-{impact_type}-{self.iso2_to_iso3(iso2)}"
)

# Set title and description
title_prefix = "Estimated Fatalities" if impact_type == "fatalities" else "Estimated Economic Losses"
Expand Down
21 changes: 21 additions & 0 deletions pystac_monty/validators/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,24 @@ class EmpiricalEconomic(BaseModel):
class EmpiricalValidator(BaseModelWithExtra):
empirical_fatality: EmpiricalFatality
empirical_economic: EmpiricalEconomic


class AlertBin(BaseModelWithExtra):
color: str
min: float
max: float
probability: float


class AlertDesc(BaseModelWithExtra):
type: str
units: str
gvalue: float
summary: bool
level: str
bins: List[AlertBin]


class AlertValidator(BaseModelWithExtra):
fatality: AlertDesc
economic: AlertDesc
Loading