22
33import csv
44import io
5+ import itertools
6+ import logging
7+ import typing
58from typing import Dict , List , Union
69
710import pytz
1114from pystac_monty .extension import HazardDetail , MontyEstimateType , MontyExtension
1215from pystac_monty .hazard_profiles import MontyHazardProfiles
1316from pystac_monty .sources .common import MontyDataSource , MontyDataTransformer
17+ from pystac_monty .validators .ibtracs import IBTracsdataValidator
18+
19+ logger = logging .getLogger (__name__ )
20+
1421
1522STAC_EVENT_ID_PREFIX = "ibtracs-event-"
1623STAC_HAZARD_ID_PREFIX = "ibtracs-hazard-"
@@ -47,15 +54,14 @@ def _parse_csv(self) -> List[Dict[str, str]]:
4754 csv_data .append (row )
4855 return csv_data
4956
50- def get_storm_ids (self ) -> List [str ]:
51- """Get a list of unique storm IDs from the data."""
52- data = self .get_data ()
53- return list (set (row .get ("SID" , "" ).strip () for row in data if row .get ("SID" )))
57+ def parse_row_data (self , rows : list [dict ]):
58+ validated_data = []
59+ for row in rows :
60+ obj = IBTracsdataValidator .validate_event (row )
61+ if obj :
62+ validated_data .append (obj )
5463
55- def get_storm_data (self , storm_id : str ) -> List [Dict [str , str ]]:
56- """Get all data rows for a specific storm ID."""
57- data = self .get_data ()
58- return [row for row in data if row .get ("SID" , "" ).strip () == storm_id ]
64+ return validated_data
5965
6066
6167class IBTrACSTransformer (MontyDataTransformer [IBTrACSDataSource ]):
@@ -64,37 +70,41 @@ class IBTrACSTransformer(MontyDataTransformer[IBTrACSDataSource]):
6470 hazard_profiles = MontyHazardProfiles ()
6571 source_name = 'ibtracs'
6672
67- def make_items (self ) -> List [Item ]:
68- """Create STAC Items from IBTrACS data.
73+ # FIXME: This is deprecated
74+ def make_items (self ):
75+ return list (self .get_stac_items ())
6976
70- Returns:
71- List of STAC Items (events and hazards)
72- """
73- items = []
77+ def get_stac_items (self ) -> typing .Generator [Item , None , None ]:
78+ csv_data = self .data_source ._parse_csv ()
79+ csv_data .sort (key = lambda x : x .get ("SID" ))
7480
75- storm_ids = self .data_source .get_storm_ids ()
81+ grouped_rows = {}
82+ for sid , group in itertools .groupby (csv_data , key = lambda x : x .get ("SID" )):
83+ grouped_rows [sid ] = list (group )
7684
77- # Create event items (one per storm)
78- for storm_id in storm_ids :
79- try :
80- event_hazard_items = []
81- event_item = self .make_source_event_items (storm_id )
82- event_hazard_items .append (event_item )
83- hazard_items = self .make_hazard_items (event_item )
84- event_hazard_items .extend (hazard_items )
85+ # # TODO: Use sax xml parser for memory efficient usage
86+ failed_items_count = 0
87+ total_items_count = 0
8588
86- yield event_hazard_items
89+ for storm_id , storm_data in grouped_rows .items ():
90+ total_items_count += 1
91+ try :
92+ storm_data = self .data_source .parse_row_data (storm_data )
93+ if event_item := self .make_source_event_items (storm_id , storm_data ):
94+ yield event_item
95+ yield from self .make_hazard_items (event_item , storm_data )
8796 except Exception :
88- logger .info ("Transformation failed" , exc_info = True )
97+ failed_items_count += 1
98+ logger .error ("Failed to process desinventar" , exc_info = True )
99+
100+ print (failed_items_count )
89101
90- def make_source_event_items (self , storm_id ) -> List [Item ]:
102+ def make_source_event_items (self , storm_id , storm_data ) -> List [Item ]:
91103 """Create source event items from IBTrACS data.
92104
93105 Returns:
94106 List of event STAC Items
95107 """
96- storm_data = self .data_source .get_storm_data (storm_id )
97-
98108 if not storm_data :
99109 return
100110
@@ -287,7 +297,7 @@ def make_source_event_items(self, storm_id) -> List[Item]:
287297
288298 return item
289299
290- def make_hazard_items (self , event_item : Item ) -> Item :
300+ def make_hazard_items (self , event_item : Item , storm_data ) -> list [ Item ] | None :
291301 """Create hazard items from IBTrACS data.
292302
293303 Args:
@@ -299,7 +309,6 @@ def make_hazard_items(self, event_item: Item) -> Item:
299309 hazard_items = []
300310
301311 storm_id = event_item .id
302- storm_data = self .data_source .get_storm_data (storm_id )
303312
304313 if not storm_data :
305314 return
@@ -360,6 +369,8 @@ def make_hazard_items(self, event_item: Item) -> Item:
360369 except (ValueError , TypeError ):
361370 wind = 0
362371
372+ wind = 0 if wind .strip () == "" else wind
373+
363374 try :
364375 pressure = float (row .USA_PRES if row .USA_PRES else 0 )
365376 except (ValueError , TypeError ):
@@ -559,14 +570,13 @@ def _get_countries_from_track(self, track_geometry: Union[LineString, Point]) ->
559570 if isinstance (track_geometry , LineString ):
560571 for point in track_geometry .coords :
561572 lon , lat = point
562- # country_code = self.geocoder.get_iso3_from_geometry(Point(lon, lat))
563- country_code = "UNK"
573+ country_code = self .geocoder .get_iso3_from_geometry (Point (lon , lat ))
564574 if country_code :
565575 countries .append (country_code )
566576 # For Point, check the single point
567577 elif isinstance (track_geometry , Point ):
568578 lon , lat = track_geometry .x , track_geometry .y
569- # country_code = self.geocoder.get_iso3_from_geometry(track_geometry)
579+ country_code = self .geocoder .get_iso3_from_geometry (track_geometry )
570580 country_code = "UNK"
571581 if country_code :
572582 countries .append (country_code )
0 commit comments