Skip to content

Commit c92af11

Browse files
Guillaume De Saint MartinGuillaumeDSM
Guillaume De Saint Martin
authored andcommitted
[Storage] update authenticated data on portfolio and trade update
1 parent fe3acd9 commit c92af11

File tree

3 files changed

+72
-1
lines changed

3 files changed

+72
-1
lines changed

octobot_trading/storage/abstract_storage.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
#
1414
# You should have received a copy of the GNU Lesser General Public
1515
# License along with this library
16+
import asyncio
1617
import octobot_commons.display as commons_display
18+
import octobot_commons.logging as logging
1719

1820
import octobot_trading.exchange_channel as exchanges_channel
1921

@@ -24,6 +26,7 @@ class AbstractStorage:
2426
LIVE_CHANNEL = None
2527
IS_HISTORICAL = True
2628
HISTORY_TABLE = None
29+
DEBOUNCE_DURATION = 10
2730

2831
def __init__(self, exchange_manager, plot_settings: commons_display.PlotSettings,
2932
use_live_consumer_in_backtesting=None, is_historical=None):
@@ -34,6 +37,7 @@ def __init__(self, exchange_manager, plot_settings: commons_display.PlotSettings
3437
or self.USE_LIVE_CONSUMER_IN_BACKTESTING
3538
self.is_historical = is_historical or self.IS_HISTORICAL
3639
self.enabled = True
40+
self._update_task = None
3741

3842
def should_register_live_consumer(self):
3943
return self.IS_LIVE_CONSUMER and \
@@ -73,6 +77,8 @@ async def enable(self, enabled):
7377
async def stop(self, clear=True):
7478
if self.consumer is not None:
7579
await self.consumer.stop()
80+
if self._update_task is not None and not self._update_task.done():
81+
self._update_task.cancel()
7682
if clear:
7783
self.consumer = None
7884
self.exchange_manager = None
@@ -81,6 +87,24 @@ async def store_history(self):
8187
if self.enabled:
8288
await self._store_history()
8389

90+
async def trigger_debounced_update_auth_data(self):
91+
if self.exchange_manager.is_backtesting:
92+
# no interest in backtesting data
93+
return
94+
if self._update_task is not None and not self._update_task.done():
95+
self._update_task.cancel()
96+
self._update_task = asyncio.create_task(self._waiting_update_auth_data())
97+
98+
async def _waiting_update_auth_data(self):
99+
try:
100+
await asyncio.sleep(self.DEBOUNCE_DURATION)
101+
await self._update_auth_data()
102+
except Exception as err:
103+
logging.get_logger(self.__class__.__name__).exception(err, True, f"Error when updating auth data: {err}")
104+
105+
async def _update_auth_data(self):
106+
pass
107+
84108
async def _live_callback(self, *args, **kwargs):
85109
raise NotImplementedError(f"_live_callback not implemented for {self.__class__.__name__}")
86110

octobot_trading/storage/portfolio_storage.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,19 @@
1414
# You should have received a copy of the GNU Lesser General Public
1515
# License along with this library
1616
import octobot_commons.enums as commons_enums
17+
import octobot_commons.authentication as authentication
1718
import octobot_commons.constants as commons_constants
1819
import octobot_commons.databases as commons_databases
20+
import octobot_commons.tree as commons_tree
1921

2022
import octobot_trading.storage.abstract_storage as abstract_storage
23+
import octobot_trading.personal_data.portfolios.history as portfolio_history
2124

2225

2326
class PortfolioStorage(abstract_storage.AbstractStorage):
2427
IS_LIVE_CONSUMER = False
2528
IS_HISTORICAL = True
29+
PRICE_INIT_TIMEOUT = 30
2630

2731
async def store_history(self):
2832
if not self.enabled:
@@ -32,13 +36,44 @@ async def store_history(self):
3236
portfolio_manager.historical_portfolio_value_manager
3337
metadata = hist_portfolio_values_manager.get_metadata()
3438
# replace the whole table to ensure consistency
39+
history = hist_portfolio_values_manager.get_dict_historical_values()
3540
await portfolio_db.upsert(commons_enums.RunDatabases.METADATA.value, metadata, None, uuid=1)
3641
await portfolio_db.replace_all(
3742
commons_enums.RunDatabases.HISTORICAL_PORTFOLIO_VALUE.value,
38-
hist_portfolio_values_manager.get_dict_historical_values(),
43+
history,
3944
cache=False
4045
)
4146
await portfolio_db.flush()
47+
await self.trigger_debounced_update_auth_data()
48+
49+
async def _update_auth_data(self):
50+
hist_portfolio_values_manager = self.exchange_manager.exchange_personal_data. \
51+
portfolio_manager.historical_portfolio_value_manager
52+
authenticator = authentication.Authenticator.instance()
53+
history = hist_portfolio_values_manager.get_dict_historical_values()
54+
if history and authenticator.is_initialized():
55+
if hist_portfolio_values_manager.portfolio_manager.portfolio_value_holder.initializing_symbol_prices_pairs:
56+
for symbol in hist_portfolio_values_manager.portfolio_manager.portfolio_value_holder.initializing_symbol_prices_pairs:
57+
await commons_tree.EventProvider.instance().wait_for_event(
58+
self.exchange_manager.bot_id,
59+
commons_tree.get_exchange_path(
60+
self.exchange_manager.exchange_name,
61+
commons_enums.InitializationEventExchangeTopics.PRICE.value,
62+
symbol=symbol,
63+
),
64+
self.PRICE_INIT_TIMEOUT
65+
)
66+
await authenticator.update_portfolio(
67+
history[-1][portfolio_history.HistoricalAssetValue.VALUES_KEY],
68+
history[0][portfolio_history.HistoricalAssetValue.VALUES_KEY],
69+
hist_portfolio_values_manager.portfolio_manager.reference_market,
70+
hist_portfolio_values_manager.ending_portfolio,
71+
{
72+
history_val[portfolio_history.HistoricalAssetValue.TIMESTAMP_KEY]: history_val[portfolio_history.HistoricalAssetValue.VALUES_KEY]
73+
for history_val in history
74+
},
75+
hist_portfolio_values_manager.portfolio_manager.portfolio_value_holder.current_crypto_currencies_values
76+
)
4277

4378
def get_db(self):
4479
return commons_databases.RunDatabasesProvider.instance().get_historical_portfolio_value_db(

octobot_trading/storage/trades_storage.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# License along with this library
1616
import octobot_commons.channels_name as channels_name
1717
import octobot_commons.enums as commons_enums
18+
import octobot_commons.authentication as authentication
1819
import octobot_commons.databases as commons_databases
1920

2021
import octobot_trading.enums as enums
@@ -46,6 +47,17 @@ async def _live_callback(
4647
self.plot_settings.mode
4748
)
4849
)
50+
await self.trigger_debounced_update_auth_data()
51+
52+
async def _update_auth_data(self):
53+
authenticator = authentication.Authenticator.instance()
54+
history = [
55+
trade
56+
for trade in self.exchange_manager.exchange_personal_data.trades_manager.trades.values()
57+
if trade.status is not enums.OrderStatus.CANCELED
58+
]
59+
if history and authenticator.is_initialized():
60+
await authenticator.update_trades(history)
4961

5062
async def _store_history(self):
5163
await self._get_db().log_many(

0 commit comments

Comments
 (0)