-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathingestors.py
More file actions
49 lines (37 loc) · 1.49 KB
/
ingestors.py
File metadata and controls
49 lines (37 loc) · 1.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import datetime
from abc import ABC, abstractmethod
from typing import List
from mercado_bitcoin.apis import DaySummaryApi
class DataIngestor(ABC):
def __init__(self, writer, coins: List[str], default_start_date: datetime.date) -> None:
self.default_start_date = default_start_date
self.coins = coins
self.writer = writer
self._checkpoint = self._load_checkpoint()
@property
def _checkpoint_filename(self) -> str:
return f"{self.__class__.__name__}.checkpoint"
def _write_checkpoint(self):
with open(self._checkpoint_filename, "w") as f:
f.write(f"{self._checkpoint}")
def _load_checkpoint(self) -> datetime.date:
try:
with open(self._checkpoint_filename, "r") as f:
return datetime.datetime.strptime(f.read(), "%Y-%m-%d").date()
except FileNotFoundError:
return self.default_start_date
def _update_checkpoint(self, value):
self._checkpoint = value
self._write_checkpoint()
@abstractmethod
def ingest(self) -> None:
pass
class DaySummaryIngestor(DataIngestor):
def ingest(self) -> None:
date = self._load_checkpoint()
if date < datetime.date.today():
for coin in self.coins:
api = DaySummaryApi(coin=coin)
data = api.get_data(date=date)
self.writer(coin=coin, api=api.type).write(data)
self._update_checkpoint(date + datetime.timedelta(days=1))