Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 22 additions & 54 deletions pystac_monty/sources/gidd.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from pystac_monty.hazard_profiles import MontyHazardProfiles
from pystac_monty.sources.common import MontyDataSource, MontyDataTransformer
from pystac_monty.sources.utils import IDMCUtils

STAC_EVENT_ID_PREFIX = "idmc-gidd-event-"
STAC_IMPACT_ID_PREFIX = "idmc-gidd-impact-"
Expand Down Expand Up @@ -53,6 +54,11 @@ def make_items(self) -> List[Item]:
items = []
# Create event items
event_items = self.make_source_event_items()
# Get the latest item based on id(the last occurrence)
# and get rid of duplicate items at event level
event_items_unique = {item.id: item for item in event_items}
event_items = list(event_items_unique.values())

items.extend(event_items)
# Create impact items
impact_items = self.make_impact_items()
Expand Down Expand Up @@ -111,7 +117,7 @@ def make_source_event_item(self, data: dict) -> Item:
enddate = pytz.utc.localize(datetime.fromisoformat(enddate_str))

item = Item(
id=f'{STAC_EVENT_ID_PREFIX}{properties["ID"]}',
id=f"{STAC_EVENT_ID_PREFIX}{properties['ID']}",
geometry=geometry,
bbox=bbox,
datetime=startdate,
Expand Down Expand Up @@ -140,7 +146,7 @@ def make_source_event_item(self, data: dict) -> Item:
properties["Hazard type"],
properties["Hazard sub type"],
)
monty.hazard_codes = self.map_gidd_to_hazard_codes(hazard=hazard_tuple)
monty.hazard_codes = IDMCUtils.hazard_codes_mapping(hazard=hazard_tuple)
monty.compute_and_set_correlation_id(hazard_profiles=self.hazard_profiles)

item.set_collection(self.get_event_collection())
Expand Down Expand Up @@ -171,17 +177,15 @@ def make_impact_items(self) -> List[Item]:
startdate = pytz.utc.localize(datetime.fromisoformat(startdate_str))
enddate = pytz.utc.localize(datetime.fromisoformat(enddate_str))

impact_type = properties.get("Figure category", "displaced")

impact_item.id = (
impact_item.id.replace(STAC_EVENT_ID_PREFIX, STAC_IMPACT_ID_PREFIX)
+ "-"
+ str(properties["ID"])
+ "-"
+ "displaced"
impact_item.id.replace(STAC_EVENT_ID_PREFIX, STAC_IMPACT_ID_PREFIX) + str(properties["ID"]) + "-" + impact_type
)

impact_item.datetime = startdate
impact_item.properties["title"] = (
f"{properties.get('Figure category')}-{properties.get('Figure unit')} " f"for {properties.get('Event name')}"
f"{properties.get('Figure category')}-{properties.get('Figure unit')} for {properties.get('Event name')}"
)
impact_item.properties.update(
{
Expand All @@ -198,62 +202,24 @@ def make_impact_items(self) -> List[Item]:

impact_item.set_collection(self.get_impact_collection())
monty = MontyExtension.ext(impact_item)
monty.impact_detail = self.get_impact_details(properties)
monty.impact_detail = self.get_impact_details(properties, impact_type=impact_type)
items.append(impact_item)

return items

def get_impact_details(self, gidd_src_item: dict) -> ImpactDetail:
def get_impact_details(self, gidd_src_item: dict, impact_type: str) -> ImpactDetail:
"""Returns the impact details related to displacement"""
category, category_type = IDMCUtils.mappings.get(
impact_type, (MontyImpactExposureCategory.ALL_PEOPLE, MontyImpactType.INTERNALLY_DISPLACED_PERSONS)
)
return ImpactDetail(
category=MontyImpactExposureCategory.ALL_PEOPLE,
type=MontyImpactType.INTERNALLY_DISPLACED_PERSONS,
category=category,
type=category_type,
value=gidd_src_item["Total figures"],
unit="count",
estimate_type=MontyEstimateType.PRIMARY,
)

def map_gidd_to_hazard_codes(self, hazard: tuple) -> List[str]:
"""
Map gidd hazards to UNDRR-ISC 2020 Hazard Codes

Args:
hazard: Tuple of (category, subcategory, type, subtype)

Returns:
List of hazard codes
"""
hazard = tuple(item.lower() if item else item for item in hazard)
hazard_mapping = {
("geophysical", "geophysical", "earthquake", "earthquake"): ["nat-geo-ear-gro"],
("geophysical", "geophysical", "earthquake", "tsunami"): ["nat-geo-ear-tsu"],
("geophysical", "geophysical", "mass movement", "dry mass movement"): ["nat-geo-mmd-lan"],
("geophysical", "geophysical", "mass movement", "sinkhole"): ["nat-geo-mmd-sub"],
("geophysical", "geophysical", "volcanic activity", "volcanic activity"): ["nat-geo-vol-vol"],
("mixed disasters", "mixed disasters", "mixed disasters", "mixed disasters"): ["mix-mix-mix-mix"],
("weather related", "climatological", "desertification", "desertification"): ["EN0006", "nat-geo-env-des"],
("weather related", "climatological", "drought", "drought"): ["nat-cli-dro-dro"],
("weather related", "climatological", "erosion", "erosion"): ["EN0019", "nat-geo-env-soi"],
("weather related", "climatological", "salinisation", "salinization"): ["EN0007", "nat-geo-env-slr"],
("weather related", "climatological", "sea level rise", "sea level rise"): ["EN0023", "nat-geo-env-slr"],
("weather related", "climatological", "wildfire", "wildfire"): ["nat-cli-wil-wil"],
("weather related", "hydrological", "flood", "dam release flood"): ["tec-mis-col-col"],
("weather related", "hydrological", "flood", "flood"): ["nat-hyd-flo-flo"],
("weather related", "hydrological", "mass movement", "avalanche"): ["nat-hyd-mmw-ava"],
("weather related", "hydrological", "mass movement", "landslide/wet mass movement"): ["nat-hyd-mmw-lan"],
("weather related", "hydrological", "wave action", "rogue wave"): ["nat-hyd-wav-rog"],
("weather related", "meteorological", "extreme temperature", "cold wave"): ["nat-met-ext-col"],
("weather related", "meteorological", "extreme temperature", "heat wave"): ["nat-met-ext-hea"],
("weather related", "meteorological", "storm", "hailstorm"): ["nat-met-sto-hai"],
("weather related", "meteorological", "storm", "sand/dust storm"): ["nat-met-sto-san"],
("weather related", "meteorological", "storm", "storm surge"): ["nat-met-sto-sur"],
("weather related", "meteorological", "storm", "storm"): ["nat-met-sto-sto"],
("weather related", "meteorological", "storm", "tornado"): ["nat-met-sto-tor"],
("weather related", "meteorological", "storm", "typhoon/hurricane/cyclone"): ["nat-met-sto-tro"],
("weather related", "meteorological", "storm", "winter storm/blizzard"): ["nat-met-sto-bli"],
}
return hazard_mapping.get(hazard, [hazard[-1]])

def check_and_get_gidd_data(self) -> List[Dict[str, Any]]:
"""
Validate the source fields
Expand All @@ -270,7 +236,9 @@ def check_and_get_gidd_data(self) -> List[Dict[str, Any]]:
disaster_data = []
for item in data:
item_properties = item.get("properties", {})
if not item_properties.get("Figure cause") == "Conflict": # skip conflict data
if (
IDMCUtils.DisplacementType(item_properties.get("Figure cause")) == IDMCUtils.DisplacementType.DISASTER_TYPE
): # skip conflict data
required_properties = ["Event ID", "ISO3", "Event start date"]
missing_properties = [field for field in required_properties if field not in item_properties]
if missing_properties:
Expand Down
81 changes: 9 additions & 72 deletions pystac_monty/sources/idu.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
import re
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List

import pytz
Expand All @@ -20,6 +19,7 @@
)
from pystac_monty.hazard_profiles import MontyHazardProfiles
from pystac_monty.sources.common import MontyDataSource, MontyDataTransformer
from pystac_monty.sources.utils import IDMCUtils

logger = logging.getLogger(__name__)

Expand All @@ -29,30 +29,6 @@
STAC_IMPACT_ID_PREFIX = "idmc-idu-impact-"


class DisplacementType(Enum):
"""Displacement Types"""

DISASTER_TYPE = "Disaster"
CONFLICT_TYPE = "Conflict"
OTHER_TYPE = "Other"


class ImpactMappings:
"""All Impact Mappings"""

# TODO: For other types e.g. FORCED_TO_FLEE, IN_RELIEF_CAMP, DESTROYED_HOUSING,
# PARTIALLY_DESTROYED_HOUSING, UNINHABITABLE_HOUSING, RETURNS, MULTIPLE_OR_OTHER
# Handle them later.
mappings = {
"evacuated": (MontyImpactExposureCategory.ALL_PEOPLE, MontyImpactType.EVACUATED),
"displaced": (MontyImpactExposureCategory.ALL_PEOPLE, MontyImpactType.INTERNALLY_DISPLACED_PERSONS),
"relocated": (MontyImpactExposureCategory.ALL_PEOPLE, MontyImpactType.RELOCATED),
"sheltered": (MontyImpactExposureCategory.ALL_PEOPLE, MontyImpactType.EMERGENCY_SHELTERED),
"homeless": (MontyImpactExposureCategory.ALL_PEOPLE, MontyImpactType.HOMELESS),
"affected": (MontyImpactExposureCategory.ALL_PEOPLE, MontyImpactType.TOTAL_AFFECTED),
}


@dataclass
class IDUDataSource(MontyDataSource):
"""IDU Data directly from the source"""
Expand Down Expand Up @@ -120,7 +96,7 @@ def make_source_event_item(self, data: dict) -> Item:
enddate = pytz.utc.localize(datetime.datetime.fromisoformat(enddate_str))

item = Item(
id=f'{STAC_EVENT_ID_PREFIX}{data["event_id"]}',
id=f"{STAC_EVENT_ID_PREFIX}{data['event_id']}",
geometry=geometry,
bbox=bbox,
datetime=startdate,
Expand All @@ -146,49 +122,14 @@ def make_source_event_item(self, data: dict) -> Item:
monty = MontyExtension.ext(item)
monty.episode_number = episode_number
monty.country_codes = [data["iso3"]]
monty.hazard_codes = self.map_idu_to_hazard_codes(hazard=hazard_tuple)
monty.hazard_codes = IDMCUtils.hazard_codes_mapping(hazard=hazard_tuple)
monty.compute_and_set_correlation_id(hazard_profiles=self.hazard_profiles)

return item

def map_idu_to_hazard_codes(self, hazard: tuple) -> list[str]:
"""Map IDU hazards to UNDRR-ISC 2020 Hazard Codes"""
hazard = tuple((item.lower() if item else item for item in hazard))
hazard_mapping = {
("geophysical", "geophysical", "earthquake", "earthquake"): ["nat-geo-ear-gro"],
("geophysical", "geophysical", "earthquake", "tsunami"): ["nat-geo-ear-tsu"],
("geophysical", "geophysical", "mass movement", "dry mass movement"): ["nat-geo-mmd-lan"],
("geophysical", "geophysical", "mass movement", "sinkhole"): ["nat-geo-mmd-sub"],
("geophysical", "geophysical", "volcanic activity", "volcanic activity"): ["nat-geo-vol-vol"],
("mixed disasters", "mixed disasters", "mixed disasters", "mixed disasters"): ["mix-mix-mix-mix"],
("weather related", "climatological", "desertification", "desertification"): ["EN0006", "nat-geo-env-des"],
("weather related", "climatological", "drought", "drought"): ["nat-cli-dro-dro"],
("weather related", "climatological", "erosion", "erosion"): ["EN0019", "nat-geo-env-soi"],
("weather related", "climatological", "salinisation", "salinization"): ["EN0007", "nat-geo-env-slr"],
("weather related", "climatological", "sea level rise", "sea level rise"): ["EN0023", "nat-geo-env-slr"],
("weather related", "climatological", "wildfire", "wildfire"): ["nat-cli-wil-wil"],
("weather related", "hydrological", "flood", "dam release flood"): ["tec-mis-col-col"],
("weather related", "hydrological", "flood", "flood"): ["nat-hyd-flo-flo"],
("weather related", "hydrological", "mass movement", "avalanche"): ["nat-hyd-mmw-ava"],
("weather related", "hydrological", "mass movement", "landslide/wet mass movement"): ["nat-hyd-mmw-lan"],
("weather related", "hydrological", "wave action", "rogue wave"): ["nat-hyd-wav-rog"],
("weather related", "meteorological", "extreme temperature", "cold wave"): ["nat-met-ext-col"],
("weather related", "meteorological", "extreme temperature", "heat wave"): ["nat-met-ext-hea"],
("weather related", "meteorological", "storm", "hailstorm"): ["nat-met-sto-hai"],
("weather related", "meteorological", "storm", "sand/dust storm"): ["nat-met-sto-san"],
("weather related", "meteorological", "storm", "storm surge"): ["nat-met-sto-sur"],
("weather related", "meteorological", "storm", "storm"): ["nat-met-sto-sto"],
("weather related", "meteorological", "storm", "tornado"): ["nat-met-sto-tor"],
("weather related", "meteorological", "storm", "typhoon/hurricane/cyclone"): ["nat-met-sto-tro"],
("weather related", "meteorological", "storm", "winter storm/blizzard"): ["nat-met-sto-bli"],
}
if hazard not in hazard_mapping:
raise KeyError(f"Hazard {hazard} not found.")
return hazard_mapping.get(hazard)

def _get_impact_type_from_desc(self, description: str):
"""Get impact type from description using regex"""
keywords = list(ImpactMappings.mappings.keys())
keywords = list(IDMCUtils.mappings.keys())
# Get the first match
match = re.findall(r"\((.*?)\)", description)
# Use the first item only
Expand Down Expand Up @@ -228,15 +169,11 @@ def make_impact_items(self) -> List[Item]:
items.append(impact_item)
return items

def _get_impact_type(self, impact_type: str):
"""Get the impact related details"""
return ImpactMappings.mappings.get(
impact_type, (MontyImpactExposureCategory.ALL_PEOPLE, MontyImpactType.INTERNALLY_DISPLACED_PERSONS)
)

def get_impact_details(self, idu_src_item: dict, impact_type: str):
"""Returns the impact details related to displacement"""
category, category_type = self._get_impact_type(impact_type=impact_type)
category, category_type = IDMCUtils.mappings.get(
impact_type, (MontyImpactExposureCategory.ALL_PEOPLE, MontyImpactType.INTERNALLY_DISPLACED_PERSONS)
)
return ImpactDetail(
category=category,
type=category_type,
Expand All @@ -256,11 +193,11 @@ def check_and_get_idu_data(self) -> list[Any]:
return []

for item in idu_data:
if item["displacement_type"] not in DisplacementType._value2member_map_:
if item["displacement_type"] not in IDMCUtils.DisplacementType._value2member_map_:
logging.error("Unknown displacement type: {item['displacement_type']} found. Ignore the datapoint.")
continue
# Get the Disaster type data only
if DisplacementType(item["displacement_type"]) == DisplacementType.DISASTER_TYPE:
if IDMCUtils.DisplacementType(item["displacement_type"]) == IDMCUtils.DisplacementType.DISASTER_TYPE:
missing_fields = [field for field in required_fields if field not in item]
if missing_fields:
raise ValueError(f"Missing required fields {missing_fields}.")
Expand Down
Loading
Loading