-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbase.py
More file actions
164 lines (128 loc) · 4.44 KB
/
base.py
File metadata and controls
164 lines (128 loc) · 4.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""Base data source abstractions for EcoTrack data pipeline."""
from __future__ import annotations
import abc
import hashlib
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, AsyncIterator, Generic, TypeVar
import httpx
T = TypeVar("T")
class DataFormat(str, Enum):
"""Supported data formats."""
GEOJSON = "geojson"
COG = "cog" # Cloud Optimized GeoTIFF
ZARR = "zarr"
NETCDF = "netcdf"
CSV = "csv"
PARQUET = "parquet"
STAC = "stac"
JSON = "json"
GRIB2 = "grib2"
@dataclass
class DataSourceConfig:
"""Configuration for a data source."""
name: str
base_url: str
api_key: str | None = None
rate_limit_per_second: float = 1.0
timeout_seconds: float = 30.0
max_retries: int = 3
cache_dir: Path = field(default_factory=lambda: Path("data/cache"))
formats: list[DataFormat] = field(default_factory=list)
@dataclass
class FetchResult:
"""Result of a data fetch operation."""
source: str
timestamp: datetime
data: Any
format: DataFormat
size_bytes: int
checksum: str
metadata: dict[str, Any] = field(default_factory=dict)
class DataSource(abc.ABC, Generic[T]):
"""Abstract base class for all data sources.
Provides common HTTP client management, checksum computation,
and defines the fetch → validate → transform contract that
every concrete source must implement.
Type Parameters:
T: The domain model type produced by :pymethod:`transform`.
"""
def __init__(self, config: DataSourceConfig) -> None:
self.config = config
self._client: httpx.AsyncClient | None = None
async def _get_client(self) -> httpx.AsyncClient:
"""Return a lazily-initialised :class:`httpx.AsyncClient`.
The client is configured with the source's *base_url*, optional
``Authorization`` header, and request timeout from config.
"""
if self._client is None or self._client.is_closed:
headers: dict[str, str] = {}
if self.config.api_key:
headers["Authorization"] = f"Bearer {self.config.api_key}"
self._client = httpx.AsyncClient(
base_url=self.config.base_url,
headers=headers,
timeout=self.config.timeout_seconds,
)
return self._client
@abc.abstractmethod
async def fetch(
self,
bbox: tuple[float, float, float, float] | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
**kwargs: Any,
) -> AsyncIterator[FetchResult]:
"""Fetch data from the source.
Args:
bbox: Optional bounding box ``(min_lon, min_lat, max_lon, max_lat)``.
start_time: Optional temporal range start.
end_time: Optional temporal range end.
**kwargs: Source-specific parameters.
Yields:
:class:`FetchResult` instances for each fetched data chunk.
"""
... # pragma: no cover
@abc.abstractmethod
async def validate(self, result: FetchResult) -> bool:
"""Validate fetched data for completeness and correctness.
Args:
result: The :class:`FetchResult` to validate.
Returns:
``True`` if the result passes all validation checks.
"""
... # pragma: no cover
@abc.abstractmethod
async def transform(self, result: FetchResult) -> list[T]:
"""Transform raw data into domain models.
Args:
result: The :class:`FetchResult` to transform.
Returns:
A list of domain model instances of type *T*.
"""
... # pragma: no cover
async def close(self) -> None:
"""Close the underlying HTTP client if open."""
if self._client and not self._client.is_closed:
await self._client.aclose()
@staticmethod
def compute_checksum(data: bytes) -> str:
"""Compute a SHA-256 hex-digest for *data*.
Args:
data: Raw bytes to hash.
Returns:
Lowercase hex-encoded SHA-256 digest.
"""
return hashlib.sha256(data).hexdigest()
async def __aenter__(self) -> DataSource[T]:
return self
async def __aexit__(self, *args: object) -> None:
await self.close()
__all__ = [
"DataFormat",
"DataSourceConfig",
"FetchResult",
"DataSource",
]