|
| 1 | +import hashlib |
1 | 2 | import json |
2 | 3 | import logging |
3 | | -from typing import Dict, Type, Union |
| 4 | +from typing import Any, Dict, Type, Union |
4 | 5 |
|
5 | 6 | from django.contrib.contenttypes.fields import GenericRelation |
6 | 7 | from django.contrib.contenttypes.models import ContentType |
|
26 | 27 | logger = logging.getLogger(__name__) |
27 | 28 |
|
28 | 29 |
|
| 30 | +def normalize_dict(obj: Any) -> Any: |
| 31 | + if isinstance(obj, dict): |
| 32 | + return {k: normalize_dict(v) for k, v in sorted(obj.items())} |
| 33 | + if isinstance(obj, list): |
| 34 | + return [normalize_dict(i) for i in obj] |
| 35 | + return obj |
| 36 | + |
| 37 | + |
29 | 38 | class IETFReport(models.Model): |
30 | 39 | rrname = LowercaseCharField(max_length=100) |
31 | 40 | rrtype = LowercaseCharField(max_length=100) |
@@ -91,6 +100,7 @@ class BaseDataModel(models.Model): |
91 | 100 | default=dict |
92 | 101 | ) # field for additional information related to a specific analyzer |
93 | 102 | date = models.DateTimeField(default=now) |
| 103 | + fingerprint = models.CharField(max_length=64, db_index=True, blank=True, default="") |
94 | 104 | analyzers_report = GenericRelation( |
95 | 105 | to="analyzers_manager.AnalyzerReport", |
96 | 106 | object_id_field="data_model_object_id", |
@@ -125,6 +135,30 @@ def owner(self) -> User: |
125 | 135 | elif self.jobs.exists(): |
126 | 136 | return self.jobs.first().user |
127 | 137 |
|
| 138 | + def get_content_map(self, data: Dict = None) -> Dict: |
| 139 | + if data is None: |
| 140 | + data = {} |
| 141 | + for field in self._meta.fields: |
| 142 | + name = field.name |
| 143 | + if name in ["id", "date", "fingerprint"]: |
| 144 | + continue |
| 145 | + value = getattr(self, name) |
| 146 | + if hasattr(value, "isoformat"): |
| 147 | + value = value.isoformat() |
| 148 | + data[name] = value |
| 149 | + data.pop("id", None) |
| 150 | + data.pop("date", None) |
| 151 | + data.pop("fingerprint", None) |
| 152 | + data.pop("analyzers_report", None) |
| 153 | + data.pop("jobs", None) |
| 154 | + data.pop("user_events", None) |
| 155 | + return normalize_dict(data) |
| 156 | + |
| 157 | + def generate_fingerprint(self, data: Dict = None) -> str: |
| 158 | + content_map = self.get_content_map(data) |
| 159 | + encoded_data = json.dumps(content_map, sort_keys=True).encode("utf-8") |
| 160 | + return hashlib.sha256(encoded_data).hexdigest() |
| 161 | + |
128 | 162 | def merge(self, other: Union["BaseDataModel", Dict], append: bool = True) -> "BaseDataModel": |
129 | 163 | if not self.pk: |
130 | 164 | raise ValueError("Unable to merge a model that was not saved.") |
|
0 commit comments