-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaggregator.py
More file actions
314 lines (257 loc) · 9.86 KB
/
aggregator.py
File metadata and controls
314 lines (257 loc) · 9.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
from dataclasses import replace
from datetime import datetime
from decimal import Decimal
from typing import Any
from common.models.records import RawRecord
from common.utils.helper import sort_records_for_aggregation
class BaseAggregator:
def __init__(self, config=None):
"""
Base constructor to store configuration for all aggregators.
"""
self.config = config
def aggregate_lines(self, records: list[RawRecord]) -> list[RawRecord]:
raise NotImplementedError("Implementation missing")
@staticmethod
def safe_float(value):
"""Convert to float, return 0.0 if the value is empty or invalid."""
try:
return float(value) if value else 0.0
except ValueError:
return 0.0
@staticmethod
def sum_round(value1, value2, decimals=8):
"""Sum two values and round the result to the specified number of decimals."""
v1 = BaseAggregator.safe_float(value1)
v2 = BaseAggregator.safe_float(value2)
return round(v1 + v2, decimals)
@staticmethod
def values_equal(val1: Any, val2: Any) -> bool:
"""
Compare two values for equality, considering None as equal.
Works for str, int, float, Decimal, datetime, or None.
"""
# Both None
if val1 is None and val2 is None:
return True
# One is None, the other not
if val1 is None or val2 is None:
return False
# Otherwise, normal equality
return val1 == val2
@staticmethod
def _is_coin_buy(record: RawRecord) -> bool:
"""
A coin buy occurs when a crypto asset is bought in exchange for
fiat currency or another asset.
"""
crypto_currencies = {
"BTC",
"ETH",
"ADA",
"SOL2",
"LUNA2",
"LUNA3",
"DFI",
"BNB",
"XRP",
"KFEE",
}
stablecoins = {"USDC", "USDT", "BUSD"}
fiat_currencies = {"EUR", "USD"}
buy_curr = record.buy_currency
sell_curr = record.sell_currency
# Buying crypto with fiat
if buy_curr in crypto_currencies and sell_curr in fiat_currencies:
return True
# Buying crypto with stablecoin
if buy_curr in crypto_currencies and sell_curr in stablecoins:
return True
return False
@staticmethod
def _is_coin_sell(record: RawRecord) -> bool:
"""
A coin sell occurs when a crypto asset is sold in exchange for
fiat currency or another asset.
"""
crypto_currencies = {
"BTC",
"ETH",
"ADA",
"SOL2",
"LUNA2",
"LUNA3",
"DFI",
"BNB",
"XRP",
"KFEE",
}
stablecoins = {"USDC", "USDT", "BUSD"}
fiat_currencies = {"EUR", "USD"}
buy_curr = record.buy_currency
sell_curr = record.sell_currency
# Buying crypto with fiat
if sell_curr in crypto_currencies and buy_curr in fiat_currencies:
return True
# Buying crypto with stablecoin
if sell_curr in crypto_currencies and buy_curr in stablecoins:
return True
return False
@staticmethod
def _set_time(date_value: datetime, new_time: str) -> datetime:
"""
Replace the time part of a datetime with a given HH:MM:SS value.
"""
hours, minutes, seconds = map(int, new_time.split(":"))
return datetime(
year=date_value.year,
month=date_value.month,
day=date_value.day,
hour=hours,
minute=minutes,
second=seconds,
)
class CoinTrackingAggregator(BaseAggregator):
def _is_aggregation_applicable(
self, current_line: RawRecord, next_line: RawRecord
) -> bool:
if (
BaseAggregator.values_equal(current_line.type, next_line.type)
and BaseAggregator.values_equal(
current_line.buy_currency, next_line.buy_currency
)
and BaseAggregator.values_equal(
current_line.sell_currency, next_line.sell_currency
)
and BaseAggregator.values_equal(
current_line.fee_currency, next_line.fee_currency
)
and BaseAggregator.values_equal(current_line.exchange, next_line.exchange)
and BaseAggregator.values_equal(current_line.group, next_line.group)
and BaseAggregator.values_equal(current_line.comment, next_line.comment)
and BaseAggregator.values_equal(
current_line.datetime.date(), next_line.datetime.date()
) # nur Datum vergleichen
):
return True
else:
return False
def _adjust_timestamp(self, record: RawRecord) -> RawRecord:
"""
Adjust the timestamp of a record based on its business meaning.
Rules:
- Deposits are set to 00:01:00
- Coin buys are set to 23:55:00
- Coin sells are set to 23:56:00
"""
if record.type == "Deposit":
new_datetime = BaseAggregator._set_time(record.datetime, "00:01:00")
return replace(record, datetime=new_datetime)
if record.type == "Trade":
if self._is_coin_buy(record):
new_datetime = BaseAggregator._set_time(record.datetime, "23:55:00")
return replace(record, datetime=new_datetime)
if self._is_coin_sell(record):
new_datetime = BaseAggregator._set_time(record.datetime, "23:56:00")
return replace(record, datetime=new_datetime)
# Default: no change
return record
def _adjust_margin_timestamp(self, record: RawRecord) -> RawRecord:
"""
Adjust the timestamp of a record based on its business meaning.
Rules:
- Margin Profit: 00:01:00
- deactivated: Margin Fee: +1 min (except the old time is 23:59)
"""
# Margin Fee: +1 Minute (Limit 23:59)
# if record.type == "Margin Fee":
# if record.date.hour == 23 and record.date.minute == 59:
# return record
# new_date = record.date + timedelta(minutes=1)
# return replace(record, date=new_date)
# Margin Profit: -1 Minute (Limit 00:00)
if record.type == "Margin Profit":
new_datetime = BaseAggregator._set_time(record.datetime, "00:01:00")
return replace(record, datetime=new_datetime)
# Default: no change
return record
def _sort_result(self, records: list[RawRecord]) -> list[RawRecord]:
"""
Sort records chronologically by date.
"""
return sorted(records, key=lambda r: r.datetime)
def aggregate_lines(self, records: list[RawRecord]) -> list[RawRecord]:
"""
The aggregation process consolidates multiple transactions of one day (and further criterias) into a single daily entry.
The specific logic for the time adjustment is documented in the `_adjust_timestamp` method.
"""
if len(records) <= 1:
return records
# Ensure data is in the correct order
sort_records_for_aggregation(records)
result: list[RawRecord] = []
aggr_buy = Decimal("0")
aggr_sell = Decimal("0")
aggr_fee = Decimal("0")
aggregation_happened = False
aggr_count = 1
i = 0
while i < len(records) - 1:
current = records[i]
next_rec = records[i + 1]
if self._is_aggregation_applicable(current, next_rec):
aggr_buy += current.buy_amount
aggr_sell += current.sell_amount
aggr_fee += current.fee_amount
aggregation_happened = True
aggr_count += 1
else:
if aggregation_happened:
# Finalize the current record with aggregated values
current = self._finalize_aggregation(
current, aggr_buy, aggr_sell, aggr_fee, aggr_count
)
aggr_buy = Decimal("0")
aggr_sell = Decimal("0")
aggr_fee = Decimal("0")
aggr_count = 1
current = self._adjust_margin_timestamp(current)
result.append(current)
aggregation_happened = False
i += 1
# letzte Zeile behandeln
last = records[-1]
if aggregation_happened:
last = self._finalize_aggregation(
last, aggr_buy, aggr_sell, aggr_fee, aggr_count
)
last = self._adjust_margin_timestamp(last)
result.append(last)
return self._sort_result(result)
def _finalize_aggregation(
self, base_rec: RawRecord, buy: Decimal, sell: Decimal, fee: Decimal, count: int
) -> RawRecord:
"""
Finalizes the aggregation for a group of records by summing up amounts
and using the Tx-ID to show the count of aggregated entries.
"""
# We use replace to create a new instance with the final values
final_rec = replace(
base_rec,
buy_amount=base_rec.buy_amount + buy,
sell_amount=base_rec.sell_amount + sell,
fee_amount=base_rec.fee_amount + fee,
comment=f"Aggregated records: {count}",
tx_id="",
lpn="",
)
# Ensure the timestamp is adjusted according to the tool's rules
final_rec = self._adjust_timestamp(final_rec)
return final_rec
class AggregatorFactory:
@staticmethod
def get_aggregator(format: str) -> BaseAggregator:
if format == "CoinTracking":
return CoinTrackingAggregator()
else:
raise ValueError(f"Unknown format: {format}")