Skip to content
Merged

Dev #45

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions retrievalpipeline/config/somalia.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
class DateTimeConfig:
current_month: ClassVar[int] = datetime.datetime.now().month
current_year: ClassVar[int] = datetime.datetime.now().year
recency_duration: ClassVar[int] = 1 # years
recency_duration: ClassVar[int] = 2 # years
recent_end_year: ClassVar[int] = current_year
recent_start_year: ClassVar[int] = recent_end_year - recency_duration
baseline_end_year: ClassVar[int] = 2021
Expand Down Expand Up @@ -75,7 +75,7 @@ def drought_data_request(self) -> EcmwfDataRequest:
"year": [
f"{y}"
for y in range(
self.datetime_config.baseline_start_year, self.datetime_config.recent_end_year + 1
self.datetime_config.recent_start_year, self.datetime_config.recent_end_year + 1
)
],
"month": ["01", "07"],
Expand All @@ -97,7 +97,7 @@ def extreme_heat_data_request(self) -> EcmwfDataRequest:
"year": [
f"{year}"
for year in range(
self.datetime_config.baseline_start_year, self.datetime_config.baseline_end_year + 1
self.datetime_config.recent_start_year, self.datetime_config.baseline_end_year + 1
)
],
"month": [f"{m:02d}" for m in range(1, 13)],
Expand All @@ -117,8 +117,8 @@ def chirps_data_request(self) -> ChirpsDataRequest:
"years": [
f"{year}"
for year in range(
self.datetime_config.baseline_start_year,
self.datetime_config.trend_end_year + 1,
self.datetime_config.recent_start_year,
self.datetime_config.recent_end_year + 1,
)
],
},
Expand Down
10 changes: 5 additions & 5 deletions retrievalpipeline/config/syria.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def drought_data_request(self) -> EcmwfDataRequest:
"year": [
f"{y}"
for y in range(
self.datetime_config.baseline_start_year,
self.datetime_config.recent_start_year,
self.datetime_config.recent_end_year + 1,
)
],
Expand All @@ -98,8 +98,8 @@ def extreme_heat_data_request(self) -> EcmwfDataRequest:
"year": [
f"{year}"
for year in range(
self.datetime_config.baseline_start_year,
self.datetime_config.baseline_end_year + 1,
self.datetime_config.recent_start_year,
self.datetime_config.recent_end_year + 1,
)
],
"month": [f"{m:02d}" for m in range(1, 13)],
Expand All @@ -121,8 +121,8 @@ def chirps_data_request(self) -> ChirpsDataRequest:
"years": [
f"{year}"
for year in range(
self.datetime_config.baseline_start_year,
self.datetime_config.trend_end_year + 1,
self.datetime_config.recent_start_year,
self.datetime_config.recent_end_year + 1,
)
],
},
Expand Down
43 changes: 26 additions & 17 deletions retrievalpipeline/extract/extractors/chirps.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from collections.abc import Iterator
from collections.abc import AsyncIterator
from pathlib import Path
from typing import Any, TypedDict
from urllib.parse import urljoin
Expand Down Expand Up @@ -38,27 +38,32 @@ def __init__(
self.path_to_output = Path(path_to_output)
self.data_specification = data_request["data_specification"]

def _is_file_complete(self, url: str, local_path: Path) -> bool:
def _get_content_length(self, url: str) -> int | None:
"""Return content length of remote file, or None if unavailable."""
resp = requests.head(url, timeout=30)
resp.raise_for_status()
remote_content_length = resp.headers.get("Content-Length")
if remote_content_length is not None:
return int(remote_content_length)
return None

async def _is_file_complete(self, url: str, local_path: Path) -> bool:
"""Return True if local_path exists and its size matches the remote file.

If anything goes wrong while checking, return False so the file is re-downloaded.
"""

resp = requests.head(url, timeout=30)
resp.raise_for_status()
content_length = resp.headers.get("Content-Length")
remote_content_length = self._get_content_length(url)

# If server doesn't give us a length, we can't verify – assume OK.
if content_length is None:
if remote_content_length is None:
return True

remote_size = int(content_length)
local_size = local_path.stat().st_size

if local_size != remote_size:
local_size = await self.storage.get_size(str(local_path))
if local_size != remote_content_length:
logger.warning(
f"CHIRPS file size mismatch for {local_path.name}: "
f"local={local_size}, remote={remote_size}. Will re-download."
f"local={local_size}, remote={remote_content_length}. Will re-download."
)
return False
else:
Expand All @@ -74,12 +79,16 @@ def _get_file_urls(self) -> list[str]:

file_urls: list[str] = []
for dtype in data_types:
if dtype in ["monthly", "pentads", "dekads"]:
if dtype == "monthly":
for year in data_years:
path = f"{version}/{dtype}/{region}/{fmt}/by_year/"
filename = f"chirps-v3.0.{year}.{dtype}.nc"
file_urls.append(self._build_url(path + filename))

elif dtype in ["pentads", "dekads"]:
for year in data_years:
path = f"{version}/{dtype}/{region}/{fmt}/"
filename = f"chirps-v3.0.{year}.{dtype}.nc"
file_urls.append(self._build_url(path + filename))
elif dtype == "annual":
path = f"{version}/{dtype}/{region}/{fmt}/"
filename = f"chirps-v3.0.{dtype}.nc"
Expand All @@ -94,7 +103,7 @@ def _get_file_urls(self) -> list[str]:
def _build_url(self, path: str) -> str:
return urljoin(ChirpsConstant.CHIRPS_API_URL, path)

def get_urls_and_paths(self, output_dir: Path, files: list[str]) -> Iterator[tuple[str, str]]:
async def get_urls_and_paths(self, output_dir: Path, files: list[str]) -> AsyncIterator[tuple[str, str]]:
"""
Map each remote URL to a local path.

Expand Down Expand Up @@ -154,11 +163,11 @@ def get_urls_and_paths(self, output_dir: Path, files: list[str]) -> Iterator[tup
if (dtype == "annual") or (year == latest_year):
yield url, str(local_path)
continue
if year < latest_year and self._is_file_complete(url, local_path):
if year < latest_year and await self._is_file_complete(url, local_path):
continue

# Safety net: if we reach here and file is missing or incomplete, yield it
if not self._is_file_complete(url, local_path):
if not await self._is_file_complete(url, local_path):
yield url, str(local_path)
# If the file is complete and we got here (e.g. weird naming), we silently skip it.

Expand All @@ -171,7 +180,7 @@ async def extract(self) -> None:
3. Let Storage handle the parallel downloads.
"""
file_urls = self._get_file_urls()
urls_paths = self.get_urls_and_paths(self.path_to_output, file_urls)
urls_paths = [pair async for pair in self.get_urls_and_paths(self.path_to_output, file_urls)]

async with asyncio.TaskGroup() as tg:
tg.create_task(self.storage.download_multiple(urls_paths))
13 changes: 12 additions & 1 deletion retrievalpipeline/storage/azure_blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(
self.blob_service_client_config = _get_blob_service_client_config()

async def store_chunks(
self, chunks: AsyncIterator[bytes], destination: str, overwrite: bool = False
self, chunks: AsyncIterator[bytes], destination: str, overwrite: bool = True
) -> None:
"""Store data at destination.

Expand Down Expand Up @@ -165,6 +165,17 @@ async def get_parquet_chunks(self, source: str) -> pd.DataFrame:
blob_io = io.BytesIO(blob_bytes)
return pd.read_parquet(blob_io, engine="pyarrow")

async def get_size(self, source: str) -> int | None:
"""Return size in bytes of a blob, or None if it doesn't exist or size unknown."""
async with BlobServiceClient(**self.blob_service_client_config) as blob_service_client:
container_client = blob_service_client.get_container_client(self.container)
blob_client = container_client.get_blob_client(source)
try:
props = await blob_client.get_blob_properties()
return int(props.size) if getattr(props, "size", None) is not None else None
except Exception:
return None


def _get_blob_service_client_config() -> dict[str, str]:
"""Read environment variables from env_file and return dict with account_url and credential"""
Expand Down
8 changes: 8 additions & 0 deletions retrievalpipeline/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ async def read_parquet(self, source: str) -> pd.DataFrame:
df = pd.read_parquet(parquet_buffer, engine="pyarrow")
return df

@abstractmethod
async def get_size(self, source: str) -> int | None:
"""Return size in bytes of the file at `source` or None if unknown/missing.

Implementations should return None when the size cannot be determined.
"""
pass

async def store_parquet(self, df: pd.DataFrame, destination: str) -> None:
"""Store dataframe as parquet.

Expand Down
8 changes: 8 additions & 0 deletions retrievalpipeline/storage/local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ async def read_data(self, source: str) -> bytes:
async with aiofiles.open(target, "rb") as file:
return await file.read()

async def get_size(self, source: str) -> int | None:
"""Return size in bytes of a local file, or None if it doesn't exist."""
target = Path(source)
try:
return target.stat().st_size
except Exception:
return None

async def list_files(self, prefix: str = "", suffix: str = "") -> list[str]:
"""List files in local storage matching criteria.

Expand Down