-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodels.py
85 lines (68 loc) · 2.11 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import hashlib
import json
from datetime import datetime
from enum import Enum
from typing import Optional
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, ConfigDict
class Product(Enum):
OTHER = 0
MOBILE_IOS = 1
MOBILE_ANDROID = 2
MOBILE_BACKEND = 3
PORTAL = 4
PCR = 5
PDP = 6
PCA = 7
PCP = 8
OHQ = 9
CLUBS = 10
def __str__(self):
return self.name
class CustomModel(BaseModel):
model_config = ConfigDict(
populate_by_name=True,
)
def serializable_dict(self, **kwargs):
default_dict = self.model_dump()
return jsonable_encoder(default_dict)
def json(self, **kwargs):
# Override the json method to customize JSON serialization if needed
return self.model_dump_json()
def __str__(self):
return str(self.json())
def hash_as_key(self):
return hashlib.md5(str(self).encode()).hexdigest()[0:16]
class RedisEvent(CustomModel):
key: bytes | str
value: bytes | str
class AnalyticsTxn(CustomModel):
product: Product
pennkey: Optional[str] = None
timestamp: datetime
data: list[RedisEvent]
# init with JSON data
def __init__(self, **data):
super().__init__(**data)
self.timestamp = datetime.fromtimestamp(data["timestamp"])
self.data = [RedisEvent(**event) for event in data["data"]]
self.product = Product(data["product"])
self.pennkey = data.get("pennkey")
def get_redis_key(self):
return f"{self.product}.{self.hash_as_key()}"
def build_redis_data(self) -> list[RedisEvent]:
return [
RedisEvent(
key=f"{self.get_redis_key()}.{event.hash_as_key()}",
value=json.dumps(
{
"product": str(self.product),
"pennkey": self.pennkey,
"timestamp": self.timestamp.timestamp(),
"datapoint": event.key,
"value": event.value,
}
),
)
for event in self.data
]