diff --git a/nautilus_trader/adapters/interactive_brokers/client/market_data.py b/nautilus_trader/adapters/interactive_brokers/client/market_data.py index 523ccf707fec..ed572654d0ab 100644 --- a/nautilus_trader/adapters/interactive_brokers/client/market_data.py +++ b/nautilus_trader/adapters/interactive_brokers/client/market_data.py @@ -48,6 +48,7 @@ from nautilus_trader.model.data import Bar from nautilus_trader.model.data import BarType from nautilus_trader.model.data import BookOrder +from nautilus_trader.model.data import IndexPriceUpdate from nautilus_trader.model.data import OrderBookDelta from nautilus_trader.model.data import OrderBookDeltas from nautilus_trader.model.data import QuoteTick @@ -263,6 +264,52 @@ async def unsubscribe_ticks(self, instrument_id: InstrumentId, tick_type: str) - name = (str(instrument_id), tick_type) await self._unsubscribe(name, self._eclient.cancelTickByTickData) + async def subscribe_index_market_data( + self, + instrument_id: InstrumentId, + contract: IBContract, + generic_tick_list: str = "", + ) -> None: + """ + Subscribe to index market data for a specified instrument using reqMktData. This + method is used for index contracts that don't support reqTickByTickData (^SPX.CBOE for example). + Note: Per interactive brokers some CME exchange indexes do support reqTickByTickData. + + Parameters + ---------- + instrument_id : InstrumentId + The identifier of the instrument for which to subscribe. + contract : IBContract + The contract details for the instrument. + generic_tick_list : str + A comma-separated list of generic tick types to request. + + """ + name = (str(instrument_id), "index_market_data") + await self._subscribe( + name, + self._eclient.reqMktData, + self._eclient.cancelMktData, + contract, + generic_tick_list, + False, # snapshot + False, # regulatory_snapshot + [], # mktDataOptions + ) + + async def unsubscribe_index_market_data(self, instrument_id: InstrumentId) -> None: + """ + Unsubscribes from index market data for a specified instrument. + + Parameters + ---------- + instrument_id : InstrumentId + The identifier of the instrument for which to unsubscribe. + + """ + name = (str(instrument_id), "index_market_data") + await self._unsubscribe(name, self._eclient.cancelMktData) + async def subscribe_market_data( self, instrument_id: InstrumentId, @@ -794,11 +841,15 @@ async def process_tick_price( ) return - # IB tick types: 0=BID_SIZE, 1=BID_PRICE, 2=ASK_PRICE, 3=ASK_SIZE + # IB tick types: 0=BID_SIZE, 1=BID_PRICE, 2=ASK_PRICE, 3=ASK_SIZE, 4=LAST_PRICE self._subscription_tick_data[req_id][tick_type] = price - # Check if we have both bid and ask prices to create a quote tick - await self._try_create_quote_tick_from_market_data(subscription, req_id) + if subscription.name[1] == "index_market_data": + # Create an index price tick + await self._try_create_index_price_tick_from_market_data(subscription, req_id) + else: + # Check if we have both bid and ask prices to create a quote tick + await self._try_create_quote_tick_from_market_data(subscription, req_id) async def process_tick_size( self, @@ -881,6 +932,38 @@ async def _try_create_quote_tick_from_market_data( await self._handle_data(quote_tick) + async def _try_create_index_price_tick_from_market_data( + self, + subscription: Subscription, + req_id: int, + ) -> None: + if req_id not in self._subscription_tick_data: + return + + tick_data = self._subscription_tick_data[req_id] + + price = tick_data.get(4) # IB tick type 4 = LAST_PRICE for index market data + + if price is not None: + instrument_id = InstrumentId.from_str(subscription.name[0]) + instrument = self._cache.instrument(instrument_id) + ts_event = self._clock.timestamp_ns() + price_magnifier = ( + self._instrument_provider.get_price_magnifier(instrument_id) + if self._instrument_provider + else 1 + ) + converted_price = ib_price_to_nautilus_price(price, price_magnifier) + + index_price_update = IndexPriceUpdate( + instrument_id=instrument_id, + value=instrument.make_price(converted_price), + ts_event=ts_event, + ts_init=ts_event, + ) + + await self._handle_data(index_price_update) + async def process_realtime_bar( self, *, diff --git a/nautilus_trader/adapters/interactive_brokers/data.py b/nautilus_trader/adapters/interactive_brokers/data.py index 7496bfbfdfe9..70f2fac1387e 100644 --- a/nautilus_trader/adapters/interactive_brokers/data.py +++ b/nautilus_trader/adapters/interactive_brokers/data.py @@ -40,6 +40,7 @@ from nautilus_trader.data.messages import RequestTradeTicks from nautilus_trader.data.messages import SubscribeBars from nautilus_trader.data.messages import SubscribeData +from nautilus_trader.data.messages import SubscribeIndexPrices from nautilus_trader.data.messages import SubscribeInstrument from nautilus_trader.data.messages import SubscribeInstrumentClose from nautilus_trader.data.messages import SubscribeInstruments @@ -49,6 +50,7 @@ from nautilus_trader.data.messages import SubscribeTradeTicks from nautilus_trader.data.messages import UnsubscribeBars from nautilus_trader.data.messages import UnsubscribeData +from nautilus_trader.data.messages import UnsubscribeIndexPrices from nautilus_trader.data.messages import UnsubscribeInstrument from nautilus_trader.data.messages import UnsubscribeInstrumentClose from nautilus_trader.data.messages import UnsubscribeInstruments @@ -169,6 +171,20 @@ async def _subscribe_instrument(self, command: SubscribeInstrument) -> None: "implement the `_subscribe_instrument` coroutine", # pragma: no cover ) + async def _subscribe_index_prices(self, command: SubscribeIndexPrices) -> None: + contract = self.instrument_provider.contract.get(command.instrument_id) + if not contract: + self._log.error( + f"Cannot subscribe to index prices for {command.instrument_id}: instrument not found", + ) + return + + await self._client.subscribe_index_market_data( + instrument_id=command.instrument_id, + contract=contract, + generic_tick_list="", # Empty for basic price updates + ) + async def _subscribe_order_book_deltas(self, command: SubscribeOrderBook) -> None: if command.book_type == BookType.L3_MBO: self._log.error( @@ -285,6 +301,9 @@ async def _unsubscribe_instrument(self, command: UnsubscribeInstrument) -> None: "implement the `_unsubscribe_instrument` coroutine", # pragma: no cover ) + async def _unsubscribe_index_prices(self, command: UnsubscribeIndexPrices) -> None: + await self._client.unsubscribe_index_market_data(command.instrument_id) + async def _unsubscribe_order_book_deltas(self, command: UnsubscribeOrderBook) -> None: is_smart_depth = command.params.get("is_smart_depth", True) await self._client.unsubscribe_order_book(