Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 86 additions & 3 deletions nautilus_trader/adapters/interactive_brokers/client/market_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from nautilus_trader.model.data import Bar
from nautilus_trader.model.data import BarType
from nautilus_trader.model.data import BookOrder
from nautilus_trader.model.data import IndexPriceUpdate
from nautilus_trader.model.data import OrderBookDelta
from nautilus_trader.model.data import OrderBookDeltas
from nautilus_trader.model.data import QuoteTick
Expand Down Expand Up @@ -263,6 +264,52 @@ async def unsubscribe_ticks(self, instrument_id: InstrumentId, tick_type: str) -
name = (str(instrument_id), tick_type)
await self._unsubscribe(name, self._eclient.cancelTickByTickData)

async def subscribe_index_market_data(
self,
instrument_id: InstrumentId,
contract: IBContract,
generic_tick_list: str = "",
) -> None:
"""
Subscribe to index market data for a specified instrument using reqMktData. This
method is used for index contracts that don't support reqTickByTickData (^SPX.CBOE for example).
Note: Per interactive brokers some CME exchange indexes do support reqTickByTickData.

Parameters
----------
instrument_id : InstrumentId
The identifier of the instrument for which to subscribe.
contract : IBContract
The contract details for the instrument.
generic_tick_list : str
A comma-separated list of generic tick types to request.

"""
name = (str(instrument_id), "index_market_data")
await self._subscribe(
name,
self._eclient.reqMktData,
self._eclient.cancelMktData,
contract,
generic_tick_list,
False, # snapshot
False, # regulatory_snapshot
[], # mktDataOptions
)

async def unsubscribe_index_market_data(self, instrument_id: InstrumentId) -> None:
"""
Unsubscribes from index market data for a specified instrument.

Parameters
----------
instrument_id : InstrumentId
The identifier of the instrument for which to unsubscribe.

"""
name = (str(instrument_id), "index_market_data")
await self._unsubscribe(name, self._eclient.cancelMktData)

async def subscribe_market_data(
self,
instrument_id: InstrumentId,
Expand Down Expand Up @@ -794,11 +841,15 @@ async def process_tick_price(
)
return

# IB tick types: 0=BID_SIZE, 1=BID_PRICE, 2=ASK_PRICE, 3=ASK_SIZE
# IB tick types: 0=BID_SIZE, 1=BID_PRICE, 2=ASK_PRICE, 3=ASK_SIZE, 4=LAST_PRICE
self._subscription_tick_data[req_id][tick_type] = price

# Check if we have both bid and ask prices to create a quote tick
await self._try_create_quote_tick_from_market_data(subscription, req_id)
if subscription.name[1] == "index_market_data":
# Create an index price tick
await self._try_create_index_price_tick_from_market_data(subscription, req_id)
else:
# Check if we have both bid and ask prices to create a quote tick
await self._try_create_quote_tick_from_market_data(subscription, req_id)

async def process_tick_size(
self,
Expand Down Expand Up @@ -881,6 +932,38 @@ async def _try_create_quote_tick_from_market_data(

await self._handle_data(quote_tick)

async def _try_create_index_price_tick_from_market_data(
self,
subscription: Subscription,
req_id: int,
) -> None:
if req_id not in self._subscription_tick_data:
return

tick_data = self._subscription_tick_data[req_id]

price = tick_data.get(4) # IB tick type 4 = LAST_PRICE for index market data

if price is not None:
instrument_id = InstrumentId.from_str(subscription.name[0])
instrument = self._cache.instrument(instrument_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check for None here, log an error and return (common pattern in other adapters). This is clearer than an attribute error on a NoneType later.

ts_event = self._clock.timestamp_ns()
price_magnifier = (
self._instrument_provider.get_price_magnifier(instrument_id)
if self._instrument_provider
else 1
)
converted_price = ib_price_to_nautilus_price(price, price_magnifier)

index_price_update = IndexPriceUpdate(
instrument_id=instrument_id,
value=instrument.make_price(converted_price),
ts_event=ts_event,
ts_init=ts_event,
)

await self._handle_data(index_price_update)

async def process_realtime_bar(
self,
*,
Expand Down
19 changes: 19 additions & 0 deletions nautilus_trader/adapters/interactive_brokers/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from nautilus_trader.data.messages import RequestTradeTicks
from nautilus_trader.data.messages import SubscribeBars
from nautilus_trader.data.messages import SubscribeData
from nautilus_trader.data.messages import SubscribeIndexPrices
from nautilus_trader.data.messages import SubscribeInstrument
from nautilus_trader.data.messages import SubscribeInstrumentClose
from nautilus_trader.data.messages import SubscribeInstruments
Expand All @@ -49,6 +50,7 @@
from nautilus_trader.data.messages import SubscribeTradeTicks
from nautilus_trader.data.messages import UnsubscribeBars
from nautilus_trader.data.messages import UnsubscribeData
from nautilus_trader.data.messages import UnsubscribeIndexPrices
from nautilus_trader.data.messages import UnsubscribeInstrument
from nautilus_trader.data.messages import UnsubscribeInstrumentClose
from nautilus_trader.data.messages import UnsubscribeInstruments
Expand Down Expand Up @@ -169,6 +171,20 @@ async def _subscribe_instrument(self, command: SubscribeInstrument) -> None:
"implement the `_subscribe_instrument` coroutine", # pragma: no cover
)

async def _subscribe_index_prices(self, command: SubscribeIndexPrices) -> None:
contract = self.instrument_provider.contract.get(command.instrument_id)
if not contract:
self._log.error(
f"Cannot subscribe to index prices for {command.instrument_id}: instrument not found",
)
return

await self._client.subscribe_index_market_data(
instrument_id=command.instrument_id,
contract=contract,
generic_tick_list="", # Empty for basic price updates
)

async def _subscribe_order_book_deltas(self, command: SubscribeOrderBook) -> None:
if command.book_type == BookType.L3_MBO:
self._log.error(
Expand Down Expand Up @@ -285,6 +301,9 @@ async def _unsubscribe_instrument(self, command: UnsubscribeInstrument) -> None:
"implement the `_unsubscribe_instrument` coroutine", # pragma: no cover
)

async def _unsubscribe_index_prices(self, command: UnsubscribeIndexPrices) -> None:
await self._client.unsubscribe_index_market_data(command.instrument_id)

async def _unsubscribe_order_book_deltas(self, command: UnsubscribeOrderBook) -> None:
is_smart_depth = command.params.get("is_smart_depth", True)
await self._client.unsubscribe_order_book(
Expand Down
Loading