Skip to content

Commit 22e5122

Browse files
committed
Refine DatabentoDataClient MBO/L3 feed handling
1 parent b32e04e commit 22e5122

File tree

2 files changed

+12
-5
lines changed

2 files changed

+12
-5
lines changed

examples/live/databento/databento_subscriber.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def on_order_book_deltas(self, deltas: OrderBookDeltas) -> None:
174174
The order book deltas received.
175175
176176
"""
177-
# self.log.info(repr(deltas), LogColor.CYAN)
177+
self.log.info(repr(deltas), LogColor.CYAN)
178178

179179
def on_order_book(self, order_book: OrderBook) -> None:
180180
"""

nautilus_trader/adapters/databento/data.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ def __init__(
121121
self._loader = loader or DatabentoDataLoader()
122122
self._dataset_ranges: dict[Dataset, tuple[pd.Timestamp, pd.Timestamp]] = {}
123123
self._dataset_ranges_requested: set[Dataset] = set()
124+
self._trade_tick_subscriptions: set[InstrumentId] = set()
124125

125126
# Cache parent symbol index
126127
for dataset, parent_symbols in (config.parent_symbols or {}).items():
@@ -217,6 +218,7 @@ async def _update_dataset_ranges(self) -> None:
217218

218219
async def _buffer_mbo_subscriptions(self) -> None:
219220
try:
221+
self._log.debug("Buffering MBO subscriptions...", LogColor.MAGENTA)
220222
await asyncio.sleep(self._mbo_subscriptions_delay or 0.0)
221223
self._is_buffering_mbo_subscriptions = False
222224

@@ -443,7 +445,7 @@ async def _subscribe_order_book_deltas(
443445
self._buffered_mbo_subscriptions[dataset].append(instrument_id)
444446
return
445447

446-
if self._get_live_client_mbo(dataset) is not None:
448+
if self._live_clients_mbo.get(dataset) is not None:
447449
self._log.error(
448450
f"Cannot subscribe to order book deltas for {instrument_id}, "
449451
"MBO/L3 feed already started.",
@@ -478,6 +480,9 @@ async def _subscribe_order_book_deltas_batch(
478480
if not instrument_ids:
479481
return # No subscribing instrument IDs were loaded in the cache
480482

483+
ids_str = ",".join([i.value for i in instrument_ids])
484+
self._log.info(f"Subscribing to MBO/L3 for {ids_str}.", LogColor.BLUE)
485+
481486
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_ids[0].venue)
482487
live_client = self._get_live_client_mbo(dataset)
483488
future = asyncio.ensure_future(
@@ -492,11 +497,10 @@ async def _subscribe_order_book_deltas_batch(
492497

493498
# Add trade tick subscriptions for all instruments (MBO data includes trades)
494499
for instrument_id in instrument_ids:
495-
self._add_subscription_trade_ticks(instrument_id)
500+
self._trade_tick_subscriptions.add(instrument_id)
496501

497502
future = asyncio.ensure_future(live_client.start(self._handle_record))
498503
self._live_client_futures.add(future)
499-
await future
500504
except asyncio.CancelledError:
501505
self._log.warning(
502506
"`_subscribe_order_book_deltas_batch` was canceled while still pending.",
@@ -553,14 +557,17 @@ async def _subscribe_quote_ticks(self, instrument_id: InstrumentId) -> None:
553557
await future
554558

555559
# Add trade tick subscriptions for instrument (MBP-1 data includes trades)
556-
self._add_subscription_trade_ticks(instrument_id)
560+
self._trade_tick_subscriptions.add(instrument_id)
557561

558562
await self._check_live_client_started(dataset, live_client)
559563
except asyncio.CancelledError:
560564
self._log.warning("`_subscribe_quote_ticks` was canceled while still pending.")
561565

562566
async def _subscribe_trade_ticks(self, instrument_id: InstrumentId) -> None:
563567
try:
568+
if instrument_id in self._trade_tick_subscriptions:
569+
return # Already subscribed (this will save on data costs)
570+
564571
await self._ensure_subscribed_for_instrument(instrument_id)
565572

566573
dataset: Dataset = self._loader.get_dataset_for_venue(instrument_id.venue)

0 commit comments

Comments
 (0)