Skip to content

Commit 06f9219

Browse files
authored
Merge pull request #49 from rodekruis/dev
Dev
2 parents 9e0f681 + 55cdff7 commit 06f9219

File tree

5 files changed

+31
-18
lines changed

5 files changed

+31
-18
lines changed

retrievalpipeline/config/somalia.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def drought_data_request(self) -> EcmwfDataRequest:
7878
self.datetime_config.recent_start_year, self.datetime_config.recent_end_year + 1
7979
)
8080
],
81-
"month": ["01", "07"],
81+
"month": [f"{m:02d}" for m in range(1, 13)],
8282
"time": ["00:00"],
8383
"data_format": "netcdf",
8484
# "download_format": "zip",
@@ -97,7 +97,7 @@ def extreme_heat_data_request(self) -> EcmwfDataRequest:
9797
"year": [
9898
f"{year}"
9999
for year in range(
100-
self.datetime_config.recent_start_year, self.datetime_config.baseline_end_year + 1
100+
self.datetime_config.recent_start_year, self.datetime_config.recent_end_year + 1
101101
)
102102
],
103103
"month": [f"{m:02d}" for m in range(1, 13)],

retrievalpipeline/extract/extractors/api_json_extractor.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
from collections.abc import Iterable, Iterator
23
from typing import Any
34

@@ -46,13 +47,26 @@ def get_data(
4647
Returns:
4748
list of pydantic models (of type specified in the initiation).
4849
"""
49-
try:
50-
response = self.get_response(api_url, params, headers)
51-
json_data = self.get_data_from_response(response)
52-
except ApiResponseError as err:
53-
logger.warning(err)
54-
return []
55-
return list(self.validate_and_parse(json_data))
50+
max_retries = 5
51+
sleep_seconds = 10
52+
53+
for attempt in range(1, max_retries + 1):
54+
try:
55+
response = self.get_response(api_url, params, headers)
56+
json_data = self.get_data_from_response(response)
57+
return list(self.validate_and_parse(json_data))
58+
except ApiResponseError as err:
59+
if attempt < max_retries:
60+
logger.warning(
61+
f"""Attempt {attempt}/{max_retries} failed for {api_url}: {err}.
62+
Retrying in {sleep_seconds} seconds""",
63+
)
64+
time.sleep(sleep_seconds)
65+
continue
66+
else:
67+
logger.warning(f"All {max_retries} attempts failed for {api_url}: {err}")
68+
69+
return []
5670

5771
def get_response(
5872
self, api_url: str, params: dict[str, str] | None = None, headers: dict[str, str] | None = None

retrievalpipeline/extract/extractors/ecmwf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ async def extract(self) -> None:
7474
logger.info(f"{self.name}: unzipped {zip_file.name}")
7575

7676
async def _wait_on_results(
77-
self, remote: Remote, offset: float = 0.1, factor: float = 1.3, max_waiting_time: int = 100
77+
self, remote: Remote, offset: float = 0.1, factor: float = 1.3, max_waiting_time: int = 3600
7878
) -> None:
7979
"""Wait till job is done.
8080

retrievalpipeline/extract/extractors/ipc.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -188,12 +188,11 @@ def get_available_data(self, data_type: str) -> gpd.GeoDataFrame:
188188
def get_metadata(self, anl_id: str) -> pd.DataFrame:
189189
"""Collect metadata from IPC based on analysis ID."""
190190
metadata_frames = []
191-
for iso2 in self.country_codes_iso2:
192-
try:
193-
metadata = self.get_available_metadata_iso2(iso2, self.types, [anl_id])
194-
metadata_frames.append(metadata)
195-
except Exception as e:
196-
logger.warning(f"Failed to get metadata for {iso2}: {e}")
191+
try:
192+
metadata = self.get_available_metadata_iso2(self.country_codes_iso2, self.types, [anl_id])
193+
metadata_frames.append(metadata)
194+
except Exception as e:
195+
logger.warning(f"Failed to get metadata for {self.country_codes_iso2}: {e}")
197196

198197
return pd.concat(metadata_frames, ignore_index=True)
199198

run_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,15 @@ def construct_pipeline(run_id: str, iso3: str) -> ETLPipeline:
8181

8282
extractors = [
8383
ChirpsExtractor(
84-
name="Drought Extractor",
84+
name="Flood Extractor",
8585
storage=storage,
8686
path_to_output=f"{path_to_bronze_global}/chirps/rainfall/",
8787
data_request=config.chirps_data_request,
8888
),
8989
EcmwfExtractor(
9090
name="Drought Extractor",
9191
storage=storage,
92-
path_to_output=f"{path_to_bronze}/ecmwf/rainfall/",
92+
path_to_output=f"{path_to_bronze}/ecmwf/rainfall/drought.zip",
9393
data_request=config.drought_data_request,
9494
),
9595
EcmwfExtractor(

0 commit comments

Comments
 (0)