diff --git a/crates/adapters/databento/src/historical.rs b/crates/adapters/databento/src/historical.rs index c79b22b1d4df..70bd338aa546 100644 --- a/crates/adapters/databento/src/historical.rs +++ b/crates/adapters/databento/src/historical.rs @@ -31,7 +31,7 @@ use databento::{ use indexmap::IndexMap; use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime}; use nautilus_model::{ - data::{Bar, Data, InstrumentStatus, OrderBookDepth10, QuoteTick, TradeTick}, + data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick}, enums::BarAggregation, identifiers::{InstrumentId, Symbol, Venue}, instruments::InstrumentAny, @@ -41,8 +41,8 @@ use nautilus_model::{ use crate::{ common::get_date_time_range, decode::{ - decode_imbalance_msg, decode_instrument_def_msg, decode_mbp10_msg, decode_record, - decode_statistics_msg, decode_status_msg, + decode_imbalance_msg, decode_instrument_def_msg, decode_mbo_msg, decode_mbp10_msg, + decode_record, decode_statistics_msg, decode_status_msg, }, symbology::{ MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id, @@ -404,6 +404,78 @@ impl DatabentoHistoricalClient { Ok(result) } + /// Fetches order book deltas for the given parameters. + /// + /// # Errors + /// + /// Returns an error if the API request or data processing fails. + pub async fn get_range_order_book_deltas( + &self, + params: RangeQueryParams, + ) -> anyhow::Result> { + let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect(); + check_consistent_symbology(&symbols)?; + + let first_symbol = params + .symbols + .first() + .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?; + let stype_in = infer_symbology_type(first_symbol); + let end = params.end.unwrap_or_else(|| self.clock.get_time_ns()); + let time_range = get_date_time_range(params.start, end)?; + + let range_params = GetRangeParams::builder() + .dataset(params.dataset) + .date_time_range(time_range) + .symbols(symbols) + .stype_in(stype_in) + .schema(dbn::Schema::Mbo) + .limit(params.limit.and_then(NonZeroU64::new)) + .build(); + + let price_precision = params.price_precision.unwrap_or(Currency::USD().precision); + + let mut client = self.inner.lock().await; + let mut decoder = client + .timeseries() + .get_range(&range_params) + .await + .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?; + + let metadata = decoder.metadata().clone(); + let mut metadata_cache = MetadataCache::new(metadata); + let mut result: Vec = Vec::new(); + + let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> { + let sym_map = self + .symbol_venue_map + .read() + .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?; + let instrument_id = decode_nautilus_instrument_id( + &record, + &mut metadata_cache, + &self.publisher_venue_map, + &sym_map, + )?; + + if let Some(msg) = record.get::() { + let (delta, _trade) = + decode_mbo_msg(msg, instrument_id, price_precision, None, false)?; + if let Some(delta) = delta { + result.push(delta); + } + } + + Ok(()) + }; + + while let Ok(Some(msg)) = decoder.decode_record::().await { + process_record(dbn::RecordRef::from(msg))?; + } + + Ok(result) + } + /// Fetches trade ticks for the given parameters. /// /// # Errors diff --git a/crates/adapters/databento/src/python/historical.rs b/crates/adapters/databento/src/python/historical.rs index adaa18890010..7c0c09deaa82 100644 --- a/crates/adapters/databento/src/python/historical.rs +++ b/crates/adapters/databento/src/python/historical.rs @@ -282,6 +282,42 @@ impl DatabentoHistoricalClient { }) } + #[pyo3(name = "get_range_order_book_deltas")] + #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))] + #[allow(clippy::too_many_arguments)] + fn py_get_range_order_book_deltas<'py>( + &self, + py: Python<'py>, + dataset: String, + instrument_ids: Vec, + start: u64, + end: Option, + limit: Option, + price_precision: Option, + ) -> PyResult> { + let inner = self.inner.clone(); + let symbols = inner + .prepare_symbols_from_instrument_ids(&instrument_ids) + .map_err(to_pyvalue_err)?; + + let params = RangeQueryParams { + dataset, + symbols, + start: start.into(), + end: end.map(Into::into), + limit, + price_precision, + }; + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let deltas = inner + .get_range_order_book_deltas(params) + .await + .map_err(to_pyvalue_err)?; + Python::attach(|py| deltas.into_py_any(py)) + }) + } + #[pyo3(name = "get_range_imbalance")] #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))] #[allow(clippy::too_many_arguments)] diff --git a/crates/model/src/ffi/orderbook/book.rs b/crates/model/src/ffi/orderbook/book.rs index 5a51415b0cc7..1494fe8d072c 100644 --- a/crates/model/src/ffi/orderbook/book.rs +++ b/crates/model/src/ffi/orderbook/book.rs @@ -164,6 +164,31 @@ pub extern "C" fn orderbook_apply_deltas(book: &mut OrderBook_API, deltas: &Orde } } +/// Creates an `OrderBookDeltas` snapshot from the current order book state. +/// +/// This is the reverse operation of `orderbook_apply_deltas`: it converts the current book state +/// back into a snapshot format with a `Clear` delta followed by `Add` deltas for all orders. +/// +/// # Parameters +/// +/// * `book` - The order book to convert. +/// * `sequence` - The message sequence number for the snapshot. +/// * `ts_event` - UNIX timestamp (nanoseconds) when the book event occurred. +/// * `ts_init` - UNIX timestamp (nanoseconds) when the instance was created. +/// +/// # Returns +/// +/// An `OrderBookDeltas_API` containing a snapshot of the current order book state. +#[unsafe(no_mangle)] +pub extern "C" fn orderbook_to_snapshot_deltas( + book: &OrderBook_API, + ts_event: u64, + ts_init: u64, +) -> OrderBookDeltas_API { + use nautilus_core::UnixNanos; + OrderBookDeltas_API::new(book.to_deltas(UnixNanos::from(ts_event), UnixNanos::from(ts_init))) +} + #[unsafe(no_mangle)] pub extern "C" fn orderbook_apply_depth(book: &mut OrderBook_API, depth: &OrderBookDepth10) { if let Err(e) = book.apply_depth_unchecked(depth) { diff --git a/crates/model/src/orderbook/book.rs b/crates/model/src/orderbook/book.rs index a7da07faf3f3..40f1cc938a5b 100644 --- a/crates/model/src/orderbook/book.rs +++ b/crates/model/src/orderbook/book.rs @@ -28,7 +28,7 @@ use super::{ }; use crate::{ data::{BookOrder, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick}, - enums::{BookAction, BookType, OrderSide, OrderSideSpecified, OrderStatus}, + enums::{BookAction, BookType, OrderSide, OrderSideSpecified, OrderStatus, RecordFlag}, identifiers::InstrumentId, orderbook::{ BookIntegrityError, InvalidBookOperation, @@ -375,6 +375,84 @@ impl OrderBook { Ok(()) } + /// Creates an `OrderBookDeltas` snapshot from the current order book state. + /// + /// This is the reverse operation of `apply_deltas`: it converts the current book state + /// back into a snapshot format with a `Clear` delta followed by `Add` deltas for all orders. + /// + /// # Parameters + /// + /// * `ts_event` - UNIX timestamp (nanoseconds) when the book event occurred. + /// * `ts_init` - UNIX timestamp (nanoseconds) when the instance was created. + /// + /// # Returns + /// + /// An `OrderBookDeltas` containing a snapshot of the current order book state. + #[must_use] + pub fn to_deltas(&self, ts_event: UnixNanos, ts_init: UnixNanos) -> OrderBookDeltas { + let mut deltas = Vec::new(); + + // Add clear delta first + deltas.push(OrderBookDelta::clear( + self.instrument_id, + self.sequence, + ts_event, + ts_init, + )); + + // Count total orders to determine which one should have F_LAST flag + let total_orders = self.bids(None).map(|level| level.len()).sum::() + + self.asks(None).map(|level| level.len()).sum::(); + + let mut order_count = 0; + + // Add bid orders + for level in self.bids(None) { + for order in level.iter() { + order_count += 1; + let flags = if order_count == total_orders { + RecordFlag::F_SNAPSHOT as u8 | RecordFlag::F_LAST as u8 + } else { + RecordFlag::F_SNAPSHOT as u8 + }; + + deltas.push(OrderBookDelta::new( + self.instrument_id, + BookAction::Add, + *order, + flags, + self.sequence, + ts_event, + ts_init, + )); + } + } + + // Add ask orders + for level in self.asks(None) { + for order in level.iter() { + order_count += 1; + let flags = if order_count == total_orders { + RecordFlag::F_SNAPSHOT as u8 | RecordFlag::F_LAST as u8 + } else { + RecordFlag::F_SNAPSHOT as u8 + }; + + deltas.push(OrderBookDelta::new( + self.instrument_id, + BookAction::Add, + *order, + flags, + self.sequence, + ts_event, + ts_init, + )); + } + } + + OrderBookDeltas::new(self.instrument_id, deltas) + } + /// Replaces current book state with a depth snapshot. /// /// # Errors diff --git a/examples/backtest/notebooks/databento_download.py b/examples/backtest/notebooks/databento_download.py index 4d04cb80bafc..577664296055 100644 --- a/examples/backtest/notebooks/databento_download.py +++ b/examples/backtest/notebooks/databento_download.py @@ -6,7 +6,7 @@ # extension: .py # format_name: percent # format_version: '1.3' -# jupytext_version: 1.17.3 +# jupytext_version: 1.18.1 # kernelspec: # display_name: Python 3 (ipykernel) # language: python @@ -118,6 +118,14 @@ end=time_object_to_dt(end_time_1), ) +# %% +node.download_data( + "request_order_book_deltas", + instrument_id=InstrumentId.from_str("ESM4.XCME"), + start=time_object_to_dt("2024-05-08T10:00:00"), + end=time_object_to_dt("2024-05-08T10:00:01"), +) + # %% # # Clean up node.dispose() diff --git a/examples/backtest/notebooks/databento_test_order_book_deltas.py b/examples/backtest/notebooks/databento_test_order_book_deltas.py new file mode 100644 index 000000000000..fba2f9af1b9b --- /dev/null +++ b/examples/backtest/notebooks/databento_test_order_book_deltas.py @@ -0,0 +1,198 @@ +# --- +# jupyter: +# jupytext: +# formats: py:percent +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# jupytext_version: 1.19.0 +# kernelspec: +# display_name: Python 3 (ipykernel) +# language: python +# name: python3 +# --- + +# %% [markdown] +# ## imports + +# %% +# Note: Use the jupytext python extension to be able to open this python file in jupyter as a notebook + +# %% + +from nautilus_trader.adapters.databento.data_utils import databento_data +from nautilus_trader.adapters.databento.data_utils import load_catalog +from nautilus_trader.backtest.node import BacktestNode +from nautilus_trader.common.enums import LogColor +from nautilus_trader.config import BacktestDataConfig +from nautilus_trader.config import BacktestEngineConfig +from nautilus_trader.config import BacktestRunConfig +from nautilus_trader.config import BacktestVenueConfig +from nautilus_trader.config import DataEngineConfig +from nautilus_trader.config import ImportableStrategyConfig +from nautilus_trader.config import LoggingConfig +from nautilus_trader.config import StrategyConfig +from nautilus_trader.model.data import OrderBookDeltas +from nautilus_trader.model.identifiers import InstrumentId +from nautilus_trader.persistence.config import DataCatalogConfig +from nautilus_trader.trading.strategy import Strategy + + +# %% [markdown] +# ## parameters + +# %% +# Set the data path for Databento data +# import nautilus_trader.adapters.databento.data_utils as db_data_utils +# DATA_PATH = "/path/to/your/data" # Use your own value here +# db_data_utils.DATA_PATH = DATA_PATH +# A valid databento key can be entered here (or as an env variable of the same name) + +# DATABENTO_API_KEY = None +# db_data_utils.init_databento_client(DATABENTO_API_KEY) + +catalog_folder = "order_book_deltas_catalog" +catalog = load_catalog(catalog_folder) + +future_symbols = ["ESM4"] + +start_time = "2024-05-08T00:00:00" +end_time = "2024-05-08T00:00:02" + +order_book_deltas = databento_data( + future_symbols, + start_time, + end_time, + "mbo", + "orderbooks", + catalog_folder, + as_legacy_cython=True, + load_databento_files_if_exist=True, +) + +# deltas = catalog.order_book_deltas() +# deltas_batched = OrderBookDeltas.batch(deltas) +# len(deltas_batched) + +# %% [markdown] +# ## strategy + + +# %% +class TestOrderBookDeltasConfig(StrategyConfig, frozen=True): + symbol_id: InstrumentId + + +class TestOrderBookDeltasStrategy(Strategy): + def __init__(self, config: TestOrderBookDeltasConfig): + super().__init__(config=config) + self._deltas_count = 0 + + def on_start(self): + self.request_instrument(self.config.symbol_id) + self.subscribe_order_book_deltas(self.config.symbol_id) + + def on_order_book_deltas(self, deltas): + if self._deltas_count % 50 == 0: + order_book = self.cache.order_book(self.config.symbol_id) + self.user_log(f"{order_book}, ts_init={deltas.ts_init}") + + self._deltas_count += 1 + + def user_log(self, msg, color=LogColor.GREEN): + self.log.warning(f"{msg}", color=color) + + def on_stop(self): + order_book = self.cache.order_book(self.config.symbol_id) + self.user_log(f"{order_book}") + + +# %% [markdown] +# ## backtest node + +# %% +# BacktestEngineConfig + +strategies = [ + ImportableStrategyConfig( + strategy_path=TestOrderBookDeltasStrategy.fully_qualified_name(), + config_path=TestOrderBookDeltasConfig.fully_qualified_name(), + config={ + "symbol_id": InstrumentId.from_str(f"{future_symbols[0]}.XCME"), + }, + ), +] + +logging = LoggingConfig( + bypass_logging=False, + log_colors=True, + log_level="WARN", + log_level_file="WARN", + log_directory=".", + log_file_format=None, # "json" or None + log_file_name="databento_order_book_deltas", + clear_log_file=True, + print_config=False, + use_pyo3=False, +) + +catalogs = [ + DataCatalogConfig( + path=catalog.path, + ), +] + +data_engine = DataEngineConfig( + buffer_deltas=True, +) + +engine_config = BacktestEngineConfig( + strategies=strategies, + logging=logging, + catalogs=catalogs, + data_engine=data_engine, +) + +# BacktestRunConfig + +data = [] +data.append( + BacktestDataConfig( + data_cls=OrderBookDeltas, + catalog_path=catalog.path, + instrument_id=InstrumentId.from_str(f"{future_symbols[0]}.XCME"), + start_time=start_time, + end_time=end_time, + ), +) + +venues = [ + BacktestVenueConfig( + name="XCME", + oms_type="NETTING", + account_type="MARGIN", + base_currency="USD", + starting_balances=["1_000_000 USD"], + ), +] + +configs = [ + BacktestRunConfig( + engine=engine_config, + data=[], # data, + venues=venues, + chunk_size=None, # use None when loading custom data + start=start_time, + end=end_time, + ), +] + +node = BacktestNode(configs=configs) + +# %% +results = node.run() + +# %% + +# %% diff --git a/nautilus_trader/adapters/databento/data.py b/nautilus_trader/adapters/databento/data.py index 729d6e0df170..7dbb1b2c92a6 100644 --- a/nautilus_trader/adapters/databento/data.py +++ b/nautilus_trader/adapters/databento/data.py @@ -43,6 +43,7 @@ from nautilus_trader.data.messages import RequestData from nautilus_trader.data.messages import RequestInstrument from nautilus_trader.data.messages import RequestInstruments +from nautilus_trader.data.messages import RequestOrderBookDeltas from nautilus_trader.data.messages import RequestOrderBookDepth from nautilus_trader.data.messages import RequestQuoteTicks from nautilus_trader.data.messages import RequestTradeTicks @@ -68,12 +69,15 @@ from nautilus_trader.model.data import Bar from nautilus_trader.model.data import DataType from nautilus_trader.model.data import InstrumentStatus +from nautilus_trader.model.data import OrderBookDelta +from nautilus_trader.model.data import OrderBookDeltas from nautilus_trader.model.data import OrderBookDepth10 from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import TradeTick from nautilus_trader.model.data import capsule_to_data from nautilus_trader.model.enums import BarAggregation from nautilus_trader.model.enums import BookType +from nautilus_trader.model.enums import RecordFlag from nautilus_trader.model.enums import bar_aggregation_to_str from nautilus_trader.model.identifiers import ClientId from nautilus_trader.model.identifiers import InstrumentId @@ -784,6 +788,8 @@ async def _request(self, request: RequestData) -> None: await self._request_statistics(request.data_type, request.id) elif request.data_type.type == OrderBookDepth10: await self._request_order_book_depth(request) + elif request.data_type.type == OrderBookDeltas: + await self._request_order_book_deltas(request) else: raise NotImplementedError( f"Cannot request {request.data_type.type} (not implemented)", @@ -1148,6 +1154,65 @@ async def _request_order_book_depth(self, request: RequestOrderBookDepth) -> Non params=request.params, ) + async def _request_order_book_deltas(self, request: RequestOrderBookDeltas) -> None: + dataset: Dataset = self._loader.get_dataset_for_venue(request.instrument_id.venue) + start, end = await self._resolve_time_range_for_request(dataset, request.start, request.end) + + if request.limit > 0: + self._log.warning( + f"Databento does not support `limit` parameter for order book deltas, " + f"ignoring limit={request.limit}", + ) + + self._log.info( + f"Requesting {request.instrument_id} order book deltas data: start={start}, end={end}", + LogColor.BLUE, + ) + + # Request MBO data directly from the historical API + pyo3_deltas = await self._http_client.get_range_order_book_deltas( + dataset=dataset, + instrument_ids=[instrument_id_to_pyo3(request.instrument_id)], + start=start.value, + end=end.value, + ) + deltas_list = OrderBookDelta.from_pyo3_list(pyo3_deltas) + + # Group deltas into OrderBookDeltas objects by sequence and F_LAST flag + deltas: list[OrderBookDeltas] = [] + current_group: list[OrderBookDelta] = [] + + for delta in deltas_list: + current_group.append(delta) + + # Check if this is the last delta in an event (F_LAST flag) + if delta.flags & RecordFlag.F_LAST: + deltas.append( + OrderBookDeltas( + instrument_id=request.instrument_id, + deltas=current_group.copy(), + ), + ) + current_group.clear() + + # Handle any remaining deltas without F_LAST flag + if current_group: + deltas.append( + OrderBookDeltas( + instrument_id=request.instrument_id, + deltas=current_group, + ), + ) + + self._handle_order_book_deltas( + instrument_id=request.instrument_id, + deltas=deltas, + correlation_id=request.id, + start=request.start, + end=request.end, + params=request.params, + ) + def _handle_msg_pyo3( self, record: object, diff --git a/nautilus_trader/backtest/data_client.pyx b/nautilus_trader/backtest/data_client.pyx index 13ec5fcdd193..0f76a2648b03 100644 --- a/nautilus_trader/backtest/data_client.pyx +++ b/nautilus_trader/backtest/data_client.pyx @@ -238,7 +238,7 @@ cdef class BacktestMarketDataClient(MarketDataClient): return self._add_subscription_order_book_deltas(command.instrument_id) - # Do nothing else for backtest + self._msgbus.send(endpoint="BacktestEngine.execute", msg=command) cpdef void subscribe_order_book_depth(self, SubscribeOrderBook command): Condition.not_none(command.instrument_id, "instrument_id") diff --git a/nautilus_trader/backtest/engine.pyx b/nautilus_trader/backtest/engine.pyx index 167f4f8b3a0f..49841b3b98d8 100644 --- a/nautilus_trader/backtest/engine.pyx +++ b/nautilus_trader/backtest/engine.pyx @@ -916,8 +916,8 @@ cdef class BacktestEngine: self._log.info(f"Added {data_name} stream generator") cpdef void _handle_data_command(self, DataCommand command): - if not(command.data_type.type in [Bar, QuoteTick, TradeTick, OrderBookDepth10] - or type(command) in [SubscribeData, UnsubscribeData, SubscribeInstruments, UnsubscribeInstruments]): + if not(command.data_type.type in [Bar, QuoteTick, TradeTick, OrderBookDelta, OrderBookDeltas, OrderBookDepth10] + or isinstance(command, (SubscribeData, UnsubscribeData, SubscribeInstruments, UnsubscribeInstruments))): return if isinstance(command, SubscribeData): diff --git a/nautilus_trader/backtest/node.py b/nautilus_trader/backtest/node.py index 12e513cdc3b6..68be5a167831 100644 --- a/nautilus_trader/backtest/node.py +++ b/nautilus_trader/backtest/node.py @@ -330,6 +330,7 @@ def download_data( "request_quote_ticks", "request_trade_ticks", "request_order_book_depth", + "request_order_book_deltas", ] if request_function not in compatible_request_functions: diff --git a/nautilus_trader/common/actor.pxd b/nautilus_trader/common/actor.pxd index 345192b06ed4..aa34fc1f4c7c 100644 --- a/nautilus_trader/common/actor.pxd +++ b/nautilus_trader/common/actor.pxd @@ -29,6 +29,7 @@ from nautilus_trader.core.uuid cimport UUID4 from nautilus_trader.data.messages cimport DataCommand from nautilus_trader.data.messages cimport DataResponse from nautilus_trader.data.messages cimport RequestData +from nautilus_trader.data.messages cimport RequestOrderBookDeltas from nautilus_trader.indicators.base cimport Indicator from nautilus_trader.model.book cimport OrderBook from nautilus_trader.model.data cimport Bar @@ -40,6 +41,7 @@ from nautilus_trader.model.data cimport IndexPriceUpdate from nautilus_trader.model.data cimport InstrumentClose from nautilus_trader.model.data cimport InstrumentStatus from nautilus_trader.model.data cimport MarkPriceUpdate +from nautilus_trader.model.data cimport OrderBookDeltas from nautilus_trader.model.data cimport OrderBookDepth10 from nautilus_trader.model.data cimport QuoteTick from nautilus_trader.model.data cimport TradeTick @@ -162,6 +164,7 @@ cdef class Actor(Component): ClientId client_id=*, bint managed=*, bint pyo3_conversion=*, + int interval_ms=*, dict[str, object] params=*, ) cpdef void subscribe_order_book_depth( @@ -173,6 +176,7 @@ cdef class Actor(Component): bint managed=*, bint pyo3_conversion=*, bint update_catalog=*, + int interval_ms=*, dict[str, object] params=*, ) cpdef void subscribe_order_book_at_interval( @@ -264,6 +268,19 @@ cdef class Actor(Component): UUID4 request_id=*, dict[str, object] params=*, ) + cpdef UUID4 request_order_book_deltas( + self, + InstrumentId instrument_id, + datetime start, + datetime end=*, + int limit=*, + ClientId client_id=*, + callback=*, + bint update_catalog=*, + bint join_request=*, + UUID4 request_id=*, + dict[str, object] params=*, + ) cpdef UUID4 request_order_book_depth( self, InstrumentId instrument_id, @@ -352,7 +369,8 @@ cdef class Actor(Component): cpdef void handle_instrument(self, Instrument instrument) cpdef void handle_order_book(self, OrderBook order_book) - cpdef void handle_order_book_deltas(self, deltas) + cpdef void handle_order_book_deltas(self, deltas, bint historical=*) + cpdef void handle_historical_order_book_deltas(self, OrderBookDeltas deltas) cpdef void handle_historical_order_book_depth(self, OrderBookDepth10 depth) cpdef void handle_order_book_depth(self, OrderBookDepth10 depth, bint historical=*) cpdef void handle_historical_quote_tick(self, QuoteTick tick) @@ -378,6 +396,7 @@ cdef class Actor(Component): cpdef void _handle_instruments_response(self, DataResponse response) cpdef void _handle_quote_ticks_response(self, DataResponse response) cpdef void _handle_trade_ticks_response(self, DataResponse response) + cpdef void _handle_order_book_deltas_response(self, DataResponse response) cpdef void _handle_order_book_depth_response(self, DataResponse response) cpdef void _handle_order_book_snapshot_response(self, DataResponse response) cpdef void _handle_bars_response(self, DataResponse response) diff --git a/nautilus_trader/common/actor.pyx b/nautilus_trader/common/actor.pyx index c1b7a89ad175..b0f16dee858e 100644 --- a/nautilus_trader/common/actor.pyx +++ b/nautilus_trader/common/actor.pyx @@ -64,6 +64,7 @@ from nautilus_trader.data.messages cimport RequestData from nautilus_trader.data.messages cimport RequestInstrument from nautilus_trader.data.messages cimport RequestInstruments from nautilus_trader.data.messages cimport RequestJoin +from nautilus_trader.data.messages cimport RequestOrderBookDeltas from nautilus_trader.data.messages cimport RequestOrderBookDepth from nautilus_trader.data.messages cimport RequestOrderBookSnapshot from nautilus_trader.data.messages cimport RequestQuoteTicks @@ -1409,6 +1410,7 @@ cdef class Actor(Component): ClientId client_id = None, bint managed = True, bint pyo3_conversion = False, + int interval_ms = 0, dict[str, object] params = None, ): """ @@ -1434,11 +1436,19 @@ cdef class Actor(Component): pyo3_conversion : bool, default False If received deltas should be converted to `nautilus_pyo3.OrderBookDeltas` prior to being passed to the `on_order_book_deltas` handler. + interval_ms : int, default 0 + The order book snapshot interval (milliseconds). A value of 0 means no interval snapshots. params : dict[str, Any], optional Additional parameters potentially used by a specific client. + Raises + ------ + ValueError + If `interval_ms` is negative (< 0). + """ Condition.not_none(instrument_id, "instrument_id") + Condition.not_negative(interval_ms, "interval_ms") Condition.is_true(self.trader_id is not None, "The actor has not been registered") if pyo3_conversion: @@ -1459,7 +1469,7 @@ cdef class Actor(Component): book_type=book_type, depth=depth, managed=managed, - interval_ms=0, + interval_ms=interval_ms, client_id=client_id, venue=instrument_id.venue, command_id=UUID4(), @@ -1477,6 +1487,7 @@ cdef class Actor(Component): bint managed = True, bint pyo3_conversion = False, bint update_catalog = False, + int interval_ms = 0, dict[str, object] params = None, ): """ @@ -1502,11 +1513,19 @@ cdef class Actor(Component): update_catalog : bool, default False Whether to update a catalog with the received data. Only useful when downloading data during a backtest. + interval_ms : int, default 0 + The order book snapshot interval (milliseconds). A value of 0 means no interval snapshots. params : dict[str, Any], optional Additional parameters potentially used by a specific client. + Raises + ------ + ValueError + If `interval_ms` is negative (< 0). + """ Condition.not_none(instrument_id, "instrument_id") + Condition.not_negative(interval_ms, "interval_ms") Condition.is_true(self.trader_id is not None, "The actor has not been registered") if pyo3_conversion: @@ -1528,7 +1547,7 @@ cdef class Actor(Component): book_type=book_type, depth=depth, managed=managed, - interval_ms=0, + interval_ms=interval_ms, client_id=client_id, venue=instrument_id.venue, command_id=UUID4(), @@ -3276,6 +3295,102 @@ cdef class Actor(Component): return used_request_id + cpdef UUID4 request_order_book_deltas( + self, + InstrumentId instrument_id, + datetime start, + datetime end = None, + int limit = 0, + ClientId client_id = None, + callback: Callable[[UUID4], None] | None = None, + bint update_catalog = False, + bint join_request = False, + UUID4 request_id = None, + dict[str, object] params = None, + ): + """ + Request historical `OrderBookDeltas` data. + + Once the response is received, the order book deltas data is forwarded from the message bus + to the `on_historical_data` handler. + + If the request fails, then an error is logged. + + Parameters + ---------- + instrument_id : InstrumentId + The instrument ID for the order book deltas request. + start : datetime + The start datetime (UTC) of request time range (inclusive). + end : datetime, optional + The end datetime (UTC) of request time range. + The inclusiveness depends on individual data client implementation. + limit : int, optional + The limit on the amount of deltas received. + client_id : ClientId, optional + The specific client ID for the command. + If None, it will be inferred from the venue in the instrument ID. + callback : Callable[[UUID4], None], optional + The registered callback, to be called with the request ID when the response has completed processing. + update_catalog : bool, default False + If the data catalog should be updated with the received data. + join_request: bool, optional, default to False + If a request should be joined and sorted with another one by using request_join. + request_id : UUID4, optional + The UUID to use for the request ID. If `None`, a new UUID will be generated. + params : dict[str, Any], optional + Additional parameters potentially used by a specific client. + + Returns + ------- + UUID4 + The `request_id` for the request. + + Raises + ------ + ValueError + If the instrument_id is None. + TypeError + If callback is not None and not of type Callable. + + """ + Condition.is_true(self.trader_id is not None, "The actor has not been registered") + Condition.not_none(instrument_id, "instrument_id") + Condition.callable_or_none(callback, "callback") + + start, end = self._validate_datetime_range(start, end) + + used_params = {} + used_params["update_catalog"] = update_catalog + used_params["join_request"] = join_request + if params: + used_params.update(params) + + cdef UUID4 used_request_id = request_id if request_id else UUID4() + cdef RequestOrderBookDeltas request = RequestOrderBookDeltas( + instrument_id=instrument_id, + start=start, + end=end, + limit=limit, + client_id=client_id, + venue=instrument_id.venue, + callback=self._handle_order_book_deltas_response, + request_id=used_request_id, + ts_init=self._clock.timestamp_ns(), + params=used_params, + ) + self._requests[used_request_id] = request + self._pending_requests[used_request_id] = callback + + self._msgbus.subscribe( + topic=self._topic_cache.get_deltas_topic(instrument_id, historical=True), + handler=self.handle_historical_order_book_deltas, + ) + + self._send_data_req(request) + + return used_request_id + cpdef UUID4 request_quote_ticks( self, InstrumentId instrument_id, @@ -3979,7 +4094,10 @@ cdef class Actor(Component): except Exception as e: self._log.exception(f"Error on handling {repr(instrument)}", e) - cpdef void handle_order_book_deltas(self, deltas): + cpdef void handle_historical_order_book_deltas(self, OrderBookDeltas deltas): + self.handle_order_book_deltas(deltas, True) + + cpdef void handle_order_book_deltas(self, deltas, bint historical=False): """ Handle the given order book deltas. @@ -3991,6 +4109,8 @@ cdef class Actor(Component): ---------- deltas : OrderBookDeltas or nautilus_pyo3.OrderBookDeltas The order book deltas received. + historical : bool, default False + If True, treats the data as historical. Warnings -------- @@ -4002,7 +4122,9 @@ cdef class Actor(Component): if OrderBookDeltas in self._pyo3_conversion_types: deltas = deltas.to_pyo3() - if self._fsm.state == ComponentState.RUNNING: + if historical: + self.handle_historical_data(deltas) + elif self._fsm.state == ComponentState.RUNNING: try: self.on_order_book_deltas(deltas) except Exception as e: @@ -4516,6 +4638,30 @@ cdef class Actor(Component): self._finish_response(response.correlation_id) + cpdef void _handle_order_book_deltas_response(self, DataResponse response): + cdef RequestOrderBookDeltas request = self._requests.pop(response.correlation_id, None) + if request is not None: + self._msgbus.unsubscribe( + topic=self._topic_cache.get_deltas_topic(request.instrument_id, historical=True), + handler=self.handle_historical_order_book_deltas, + ) + + cdef int length = response.params.get("data_count", 0) if response.params else 0 + cdef InstrumentId instrument_id = request.instrument_id if request is not None else None + + if length > 0: + if instrument_id is not None: + self._log.info(f"Received data for {instrument_id}") + else: + self._log.info(f"Received data") + else: + if instrument_id is not None: + self._log.warning(f"Received data with no deltas for {instrument_id}") + else: + self._log.warning(f"Received data with no deltas") + + self._finish_response(response.correlation_id) + cpdef void _handle_order_book_snapshot_response(self, DataResponse response): cdef RequestOrderBookSnapshot request = self._requests.pop(response.correlation_id, None) if request is not None: diff --git a/nautilus_trader/core/includes/model.h b/nautilus_trader/core/includes/model.h index 231430b9dd0e..a1a861ea6d83 100644 --- a/nautilus_trader/core/includes/model.h +++ b/nautilus_trader/core/includes/model.h @@ -3065,6 +3065,27 @@ void orderbook_apply_delta(struct OrderBook_API *book, const struct OrderBookDel void orderbook_apply_deltas(struct OrderBook_API *book, const struct OrderBookDeltas_API *deltas); +/** + * Creates an `OrderBookDeltas` snapshot from the current order book state. + * + * This is the reverse operation of `orderbook_apply_deltas`: it converts the current book state + * back into a snapshot format with a `Clear` delta followed by `Add` deltas for all orders. + * + * # Parameters + * + * * `book` - The order book to convert. + * * `sequence` - The message sequence number for the snapshot. + * * `ts_event` - UNIX timestamp (nanoseconds) when the book event occurred. + * * `ts_init` - UNIX timestamp (nanoseconds) when the instance was created. + * + * # Returns + * + * An `OrderBookDeltas_API` containing a snapshot of the current order book state. + */ +struct OrderBookDeltas_API orderbook_to_snapshot_deltas(const struct OrderBook_API *book, + uint64_t ts_event, + uint64_t ts_init); + void orderbook_apply_depth(struct OrderBook_API *book, const struct OrderBookDepth10_t *depth); CVec orderbook_bids(struct OrderBook_API *book); diff --git a/nautilus_trader/core/nautilus_pyo3.pyi b/nautilus_trader/core/nautilus_pyo3.pyi index 2bb82901bda6..3be39b942ef9 100644 --- a/nautilus_trader/core/nautilus_pyo3.pyi +++ b/nautilus_trader/core/nautilus_pyo3.pyi @@ -6786,6 +6786,15 @@ class DatabentoHistoricalClient: end: int | None = None, depth: int | None = None, ) -> list[OrderBookDepth10]: ... + async def get_range_order_book_deltas( + self, + dataset: str, + instrument_ids: list[InstrumentId], + start: int, + end: int | None = None, + limit: int | None = None, + price_precision: int | None = None, + ) -> list[OrderBookDelta]: ... async def get_range_imbalance( self, dataset: str, diff --git a/nautilus_trader/core/rust/model.pxd b/nautilus_trader/core/rust/model.pxd index 3e45a51aaecb..58b52969a257 100644 --- a/nautilus_trader/core/rust/model.pxd +++ b/nautilus_trader/core/rust/model.pxd @@ -2058,6 +2058,25 @@ cdef extern from "../includes/model.h": void orderbook_apply_deltas(OrderBook_API *book, const OrderBookDeltas_API *deltas); + # Creates an `OrderBookDeltas` snapshot from the current order book state. + # + # This is the reverse operation of `orderbook_apply_deltas`: it converts the current book state + # back into a snapshot format with a `Clear` delta followed by `Add` deltas for all orders. + # + # # Parameters + # + # * `book` - The order book to convert. + # * `sequence` - The message sequence number for the snapshot. + # * `ts_event` - UNIX timestamp (nanoseconds) when the book event occurred. + # * `ts_init` - UNIX timestamp (nanoseconds) when the instance was created. + # + # # Returns + # + # An `OrderBookDeltas_API` containing a snapshot of the current order book state. + OrderBookDeltas_API orderbook_to_snapshot_deltas(const OrderBook_API *book, + uint64_t ts_event, + uint64_t ts_init); + void orderbook_apply_depth(OrderBook_API *book, const OrderBookDepth10_t *depth); CVec orderbook_bids(OrderBook_API *book); diff --git a/nautilus_trader/data/client.pxd b/nautilus_trader/data/client.pxd index 1ba0e48d1569..219384884d6f 100644 --- a/nautilus_trader/data/client.pxd +++ b/nautilus_trader/data/client.pxd @@ -23,6 +23,7 @@ from nautilus_trader.data.messages cimport RequestBars from nautilus_trader.data.messages cimport RequestData from nautilus_trader.data.messages cimport RequestInstrument from nautilus_trader.data.messages cimport RequestInstruments +from nautilus_trader.data.messages cimport RequestOrderBookDeltas from nautilus_trader.data.messages cimport RequestOrderBookSnapshot from nautilus_trader.data.messages cimport RequestQuoteTicks from nautilus_trader.data.messages cimport RequestTradeTicks @@ -171,6 +172,7 @@ cdef class MarketDataClient(DataClient): cpdef void request_instrument(self, RequestInstrument request) cpdef void request_instruments(self, RequestInstruments request) cpdef void request_order_book_snapshot(self, RequestOrderBookSnapshot request) + cpdef void request_order_book_deltas(self, RequestOrderBookDeltas request) cpdef void request_quote_ticks(self, RequestQuoteTicks request) cpdef void request_trade_ticks(self, RequestTradeTicks request) cpdef void request_bars(self, RequestBars request) @@ -183,3 +185,4 @@ cdef class MarketDataClient(DataClient): cpdef void _handle_trade_ticks(self, InstrumentId instrument_id, list ticks, UUID4 correlation_id, datetime start, datetime end, dict[str, object] params) cpdef void _handle_bars(self, BarType bar_type, list bars, UUID4 correlation_id, datetime start, datetime end, dict[str, object] params) cpdef void _handle_order_book_depths(self, InstrumentId instrument_id, list depths, UUID4 correlation_id, datetime start, datetime end, dict[str, object] params) + cpdef void _handle_order_book_deltas(self, InstrumentId instrument_id, list deltas, UUID4 correlation_id, datetime start, datetime end, dict[str, object] params) diff --git a/nautilus_trader/data/client.pyx b/nautilus_trader/data/client.pyx index 47d24eafe5de..c4bebbac6cfb 100644 --- a/nautilus_trader/data/client.pyx +++ b/nautilus_trader/data/client.pyx @@ -27,6 +27,7 @@ from nautilus_trader.data.messages cimport RequestBars from nautilus_trader.data.messages cimport RequestData from nautilus_trader.data.messages cimport RequestInstrument from nautilus_trader.data.messages cimport RequestInstruments +from nautilus_trader.data.messages cimport RequestOrderBookDeltas from nautilus_trader.data.messages cimport RequestOrderBookSnapshot from nautilus_trader.data.messages cimport RequestQuoteTicks from nautilus_trader.data.messages cimport RequestTradeTicks @@ -55,6 +56,7 @@ from nautilus_trader.data.messages cimport UnsubscribeOrderBook from nautilus_trader.data.messages cimport UnsubscribeQuoteTicks from nautilus_trader.data.messages cimport UnsubscribeTradeTicks from nautilus_trader.model.data cimport BarType +from nautilus_trader.model.data cimport OrderBookDeltas from nautilus_trader.model.data cimport OrderBookDepth10 from nautilus_trader.model.data cimport QuoteTick from nautilus_trader.model.data cimport TradeTick @@ -1052,6 +1054,21 @@ cdef class MarketDataClient(DataClient): "You can implement by overriding the `request_order_book_snapshot` method for this client." ) + cpdef void request_order_book_deltas(self, RequestOrderBookDeltas request): + """ + Request historical `OrderBookDeltas` data. + + Parameters + ---------- + request : RequestOrderBookDeltas + The message for the data request. + + """ + self._log.error( # pragma: no cover + f"Cannot request `OrderBookDeltas` data for {request.instrument_id}: not implemented. " # pragma: no cover + f"You can implement by overriding the `request_order_book_deltas` method for this client", # pragma: no cover # noqa + ) + cpdef void request_quote_ticks(self, RequestQuoteTicks request): """ Request historical `QuoteTick` data. @@ -1124,6 +1141,9 @@ cdef class MarketDataClient(DataClient): def _handle_order_book_depths_py(self, InstrumentId instrument_id, list depths, UUID4 correlation_id, datetime start, datetime end, dict[str, object] params = None): self._handle_order_book_depths(instrument_id, depths, correlation_id, start, end, params) + def _handle_order_book_deltas_py(self, InstrumentId instrument_id, list deltas, UUID4 correlation_id, datetime start, datetime end, dict[str, object] params = None): + self._handle_order_book_deltas(instrument_id, deltas, correlation_id, start, end, params) + def _handle_data_response_py(self, DataType data_type, data, UUID4 correlation_id, datetime start, datetime end, dict[str, object] params = None): self._handle_data_response(data_type, data, correlation_id, start, end, params) @@ -1227,3 +1247,19 @@ cdef class MarketDataClient(DataClient): ) self._msgbus.send(endpoint="DataEngine.response", msg=response) + + cpdef void _handle_order_book_deltas(self, InstrumentId instrument_id, list deltas, UUID4 correlation_id, datetime start, datetime end, dict[str, object] params): + cdef DataResponse response = DataResponse( + client_id=self.id, + venue=instrument_id.venue, + data_type=DataType(OrderBookDeltas, metadata=({"instrument_id": instrument_id})), + data=deltas, + correlation_id=correlation_id, + response_id=UUID4(), + start=start, + end=end, + ts_init=self._clock.timestamp_ns(), + params=params, + ) + + self._msgbus.send(endpoint="DataEngine.response", msg=response) diff --git a/nautilus_trader/data/engine.pxd b/nautilus_trader/data/engine.pxd index c2f1ebdf65c4..67bd3280d859 100644 --- a/nautilus_trader/data/engine.pxd +++ b/nautilus_trader/data/engine.pxd @@ -36,6 +36,7 @@ from nautilus_trader.data.messages cimport RequestData from nautilus_trader.data.messages cimport RequestInstrument from nautilus_trader.data.messages cimport RequestInstruments from nautilus_trader.data.messages cimport RequestJoin +from nautilus_trader.data.messages cimport RequestOrderBookDeltas from nautilus_trader.data.messages cimport RequestOrderBookDepth from nautilus_trader.data.messages cimport RequestOrderBookSnapshot from nautilus_trader.data.messages cimport RequestQuoteTicks @@ -226,6 +227,8 @@ cdef class DataEngine(Component): cpdef void _handle_request_instrument(self, DataClient client, RequestInstrument request) cpdef void _handle_request_order_book_snapshot(self, DataClient client, RequestOrderBookSnapshot request) cpdef void _handle_request_order_book_depth(self, DataClient client, RequestOrderBookDepth request) + cpdef void _handle_order_book_deltas_request(self, DataClient client, RequestOrderBookDeltas request) + cpdef void _handle_order_book_deltas_snapshot_replay(self, DataResponse response) cpdef tuple _bound_dates(self, RequestData request) cpdef void _date_range_client_request(self, DataClient client, RequestData request) cpdef void _handle_date_range_request(self, DataClient client, RequestData request) diff --git a/nautilus_trader/data/engine.pyx b/nautilus_trader/data/engine.pyx index 76ef26d6a60a..d107a1678d5c 100644 --- a/nautilus_trader/data/engine.pyx +++ b/nautilus_trader/data/engine.pyx @@ -61,8 +61,11 @@ from nautilus_trader.core.datetime cimport unix_nanos_to_dt from nautilus_trader.core.rust.core cimport NANOSECONDS_IN_MILLISECOND from nautilus_trader.core.rust.core cimport NANOSECONDS_IN_SECOND from nautilus_trader.core.rust.core cimport millis_to_nanos +from nautilus_trader.core.rust.model cimport BookAction from nautilus_trader.core.rust.model cimport BookType +from nautilus_trader.core.rust.model cimport OrderBookDeltas_API from nautilus_trader.core.rust.model cimport PriceType +from nautilus_trader.core.rust.model cimport orderbook_to_snapshot_deltas from nautilus_trader.core.uuid cimport UUID4 from nautilus_trader.data.aggregation cimport BarAggregator from nautilus_trader.data.aggregation cimport RenkoBarAggregator @@ -87,6 +90,7 @@ from nautilus_trader.data.messages cimport RequestData from nautilus_trader.data.messages cimport RequestInstrument from nautilus_trader.data.messages cimport RequestInstruments from nautilus_trader.data.messages cimport RequestJoin +from nautilus_trader.data.messages cimport RequestOrderBookDeltas from nautilus_trader.data.messages cimport RequestOrderBookDepth from nautilus_trader.data.messages cimport RequestOrderBookSnapshot from nautilus_trader.data.messages cimport RequestQuoteTicks @@ -137,8 +141,6 @@ from nautilus_trader.model.identifiers cimport ClientId from nautilus_trader.model.identifiers cimport ComponentId from nautilus_trader.model.identifiers cimport InstrumentId from nautilus_trader.model.identifiers cimport Venue -from nautilus_trader.model.identifiers cimport generic_spread_id_to_list -from nautilus_trader.model.identifiers cimport is_generic_spread_id from nautilus_trader.model.instruments.base cimport Instrument from nautilus_trader.model.instruments.synthetic cimport SyntheticInstrument from nautilus_trader.model.objects cimport Price @@ -985,7 +987,6 @@ cdef class DataEngine(Component): if command.interval_ms > 0: key = (command.instrument_id, command.interval_ms) - if key not in self._order_book_intervals: self._order_book_intervals[key] = [] @@ -1483,6 +1484,8 @@ cdef class DataEngine(Component): self._handle_request_order_book_snapshot(client, request) elif isinstance(request, RequestOrderBookDepth): self._handle_request_order_book_depth(client, request) + elif isinstance(request, RequestOrderBookDeltas): + self._handle_order_book_deltas_request(client, request) elif isinstance(request, RequestQuoteTicks): self._handle_request_quote_ticks(client, request) elif isinstance(request, RequestTradeTicks): @@ -1532,6 +1535,17 @@ cdef class DataEngine(Component): cpdef void _handle_request_order_book_depth(self, DataClient client, RequestOrderBookDepth request): self._handle_date_range_request(client, request) + cpdef void _handle_order_book_deltas_request(self, DataClient client, RequestOrderBookDeltas request): + # Store original start_date only if not already present (for long requests) + if request.start is not None: + request.params["original_start_date"] = request.start + + # Floor to start of UTC day (optional, default True) + if request.params.get("from_day_start", True): + request.start = request.start.floor(freq="d") + + self._handle_date_range_request(client, request) + cpdef void _handle_request_quote_ticks(self, DataClient client, RequestQuoteTicks request): self._handle_date_range_request(client, request) @@ -1624,6 +1638,8 @@ cdef class DataEngine(Component): client.request_trade_ticks(request) elif isinstance(request, RequestOrderBookDepth): client.request_order_book_depth(request) + elif isinstance(request, RequestOrderBookDeltas): + client.request_order_book_deltas(request) else: try: client.request(request) @@ -1704,6 +1720,14 @@ cdef class DataEngine(Component): start=ts_start, end=ts_end, ) + elif isinstance(request, RequestOrderBookDeltas): + batched = request.params.get("batched", True) + data = catalog.order_book_deltas( + instrument_ids=[str(request.instrument_id)], + start=ts_start, + end=ts_end, + batched=batched, + ) elif type(request) is RequestData: filter_expr = request.params.get("filter_expr") if filter_expr: @@ -1735,11 +1759,9 @@ cdef class DataEngine(Component): if isinstance(request, RequestInstruments) or isinstance(request, RequestInstrument): only_last = request.params.get("only_last", True) - if only_last: # Retains only the latest instrument record per instrument_id, based on the most recent ts_init last_instrument = {} - for instrument in data: if instrument.id not in last_instrument: last_instrument[instrument.id] = instrument @@ -2036,6 +2058,7 @@ cdef class DataEngine(Component): list[OrderBookDelta] buffer_deltas = None bint is_last_delta = False InstrumentId instrument_id = delta.instrument_id + OrderBookDelta last_delta = None if self._buffer_deltas: buffer_deltas = self._buffered_deltas_map.get(instrument_id) if buffer_deltas is None: @@ -2058,7 +2081,7 @@ cdef class DataEngine(Component): else: deltas = OrderBookDeltas( instrument_id=instrument_id, - deltas=[delta] + deltas=[delta], ) self._msgbus.publish_c( topic=self._topic_cache.get_deltas_topic(instrument_id, historical), @@ -2067,33 +2090,55 @@ cdef class DataEngine(Component): cpdef void _handle_order_book_deltas(self, OrderBookDeltas deltas, bint historical = False): cdef: - OrderBookDeltas deltas_to_publish = None - list[OrderBookDelta] buffer_deltas = None - bint is_last_delta = False InstrumentId instrument_id = deltas.instrument_id + list[OrderBookDelta] buffer_deltas = None + OrderBookDeltas all_deltas = None + OrderBookDelta delta + bint has_last_flag = False + uint64_t i + uint64_t f_last_index = 0 + uint64_t deltas_len = len(deltas.deltas) + if self._buffer_deltas: buffer_deltas = self._buffered_deltas_map.get(instrument_id) if buffer_deltas is None: buffer_deltas = [] self._buffered_deltas_map[instrument_id] = buffer_deltas - for delta in deltas.deltas: - buffer_deltas.append(delta) + # Find the index of the first delta with F_LAST flag + for i in range(deltas_len): + delta = deltas.deltas[i] + if delta.flags & RecordFlag.F_LAST: + has_last_flag = True + f_last_index = i + break - is_last_delta = delta.flags & RecordFlag.F_LAST - if is_last_delta: - deltas_to_publish = OrderBookDeltas( - instrument_id=instrument_id, - deltas=buffer_deltas, - ) - self._msgbus.publish_c( - topic=self._topic_cache.get_deltas_topic(instrument_id, historical), - msg=deltas_to_publish, - ) - buffer_deltas.clear() + if has_last_flag: + # Add deltas up to and including the one with F_LAST to buffer + for i in range(f_last_index + 1): + buffer_deltas.append(deltas.deltas[i]) + + # Publish all buffered deltas and clear buffer + all_deltas = OrderBookDeltas( + instrument_id=instrument_id, + deltas=buffer_deltas, + ) + self._msgbus.publish_c( + topic=self._topic_cache.get_deltas_topic(instrument_id, historical), + msg=all_deltas, + ) + buffer_deltas.clear() + + # Add any remaining deltas (after F_LAST) to buffer for next batch + for i in range(f_last_index + 1, deltas_len): + buffer_deltas.append(deltas.deltas[i]) + else: + # No F_LAST flag, add all deltas to buffer + for i in range(deltas_len): + buffer_deltas.append(deltas.deltas[i]) else: self._msgbus.publish_c( - topic=self._topic_cache.get_deltas_topic(instrument_id, historical), + topic=self._topic_cache.get_deltas_topic(deltas.instrument_id, historical), msg=deltas, ) @@ -2259,6 +2304,10 @@ cdef class DataEngine(Component): if grouped_response.params.get("disable_historical_cache", False): self._disable_historical_cache = True + # Handle snapshot forward replay for order book deltas + if grouped_response.data_type.type == OrderBookDeltas: + self._handle_order_book_deltas_snapshot_replay(grouped_response) + cdef: bint query_past_data = response.params.get("subscription_name") is None Data data @@ -2352,7 +2401,7 @@ cdef class DataEngine(Component): cdef: uint64_t start = response.start.value if response.start is not None else 0 - cdef int first_index = 0 + int first_index = 0 if start: for i in range(data_len): if response.data[i].ts_init >= start: @@ -2425,6 +2474,110 @@ cdef class DataEngine(Component): for instrument in instruments: self._handle_instrument(instrument) + cpdef void _handle_order_book_deltas_snapshot_replay(self, DataResponse response): + """ + Handle snapshot forward replay for order book deltas. + + If the data at the start of a UTC day is a snapshot, move the snapshot forward + by playing order book deltas until the first delta with ts_init >= "original_start_date". + """ + cdef: + OrderBookDelta delta = None + OrderBookDelta last_applied = None + OrderBookDeltas deltas_obj + OrderBookDeltas snapshot_deltas = None + OrderBookDeltas_API snapshot_deltas_api + OrderBook order_book + Instrument instrument + InstrumentId instrument_id + uint64_t original_start_ns + uint64_t snapshot_ts + bint stop = False + list[OrderBookDeltas] filtered_data = [] + list[OrderBookDelta] before_deltas + list[OrderBookDelta] after_deltas + + original_start_date = response.params.get("original_start_date") + if original_start_date is None or not response.data: + return + + # Check if first deltas at start of UTC day is a snapshot + deltas_obj = response.data[0] + if not deltas_obj.deltas: + return + + delta = deltas_obj.deltas[0] + if not (delta.flags & RecordFlag.F_SNAPSHOT): + return + + # Check if first delta is at start of UTC day + first_delta_dt = unix_nanos_to_dt(delta.ts_init) + start_of_utc_day = first_delta_dt.replace(hour=0, minute=0, second=0, microsecond=0) + if first_delta_dt != start_of_utc_day: + return + + # Apply the initial snapshot and deltas up to original_start_date, similar to _update_order_book + instrument_id = delta.instrument_id + instrument = self._cache.instrument(instrument_id) + if instrument is None: + self._log.warning(f"Instrument {instrument_id} not found in cache, skipping snapshot replay") + return + + book_type = response.params.get("book_type", BookType.L2_MBP) + order_book = OrderBook(instrument_id, book_type) + original_start_ns = dt_to_unix_nanos(original_start_date) + + if original_start_ns <= delta.ts_init: + return + + # Apply snapshot and deltas until first delta >= original_start_ns + for deltas_obj in response.data: + if stop: + filtered_data.append(deltas_obj) + continue + + before_deltas = [] + after_deltas = [] + for delta in deltas_obj.deltas: + if not stop: + before_deltas.append(delta) + if delta.ts_init >= original_start_ns: + stop = True + last_applied = delta + else: + after_deltas.append(delta) + + if before_deltas: + order_book.apply(OrderBookDeltas( + instrument_id=instrument_id, + deltas=before_deltas, + )) + if last_applied is None: + last_applied = before_deltas[-1] + + if stop: + # Create the evolved snapshot + snapshot_ts = last_applied.ts_init + if snapshot_ts < original_start_ns: + snapshot_ts = original_start_ns + + snapshot_deltas = order_book.to_deltas_c(snapshot_ts, snapshot_ts) + filtered_data.append(snapshot_deltas) + + if after_deltas: + filtered_data.append(OrderBookDeltas( + instrument_id=instrument_id, + deltas=after_deltas, + )) + + # If we exhausted all data without reaching original_start_ns, create snapshot from end state + if not stop and last_applied is not None: + snapshot_ts = max(last_applied.ts_init, original_start_ns) + snapshot_deltas = order_book.to_deltas_c(snapshot_ts, snapshot_ts) + filtered_data.append(snapshot_deltas) + + response.data = filtered_data + cpdef void _update_order_book(self, Data data): cdef OrderBook order_book = self._cache.order_book(data.instrument_id) if order_book is None: diff --git a/nautilus_trader/data/messages.pxd b/nautilus_trader/data/messages.pxd index e9393d260ed0..355f9a6356dd 100644 --- a/nautilus_trader/data/messages.pxd +++ b/nautilus_trader/data/messages.pxd @@ -180,6 +180,10 @@ cdef class RequestOrderBookDepth(RequestData): """The maximum depth for the order book depths.\n\n:returns: `int`""" +cdef class RequestOrderBookDeltas(RequestData): + pass + + cdef class RequestQuoteTicks(RequestData): pass diff --git a/nautilus_trader/data/messages.pyx b/nautilus_trader/data/messages.pyx index f7f90c3df4ac..c5801559e10b 100644 --- a/nautilus_trader/data/messages.pyx +++ b/nautilus_trader/data/messages.pyx @@ -476,7 +476,7 @@ cdef class SubscribeOrderBook(SubscribeData): datetime start: datetime | None, datetime end: datetime | None, callback: Callable[[Any], None] | None, - ) -> RequestOrderBookDepth: + ) -> RequestOrderBookDepth | RequestOrderBookDeltas: """ Convert this subscribe message to a request message. @@ -492,32 +492,52 @@ cdef class SubscribeOrderBook(SubscribeData): Returns ------- - RequestOrderBookDepth + RequestOrderBookDepth | RequestOrderBookDeltas The converted request message. - """ - if self.data_type.type != OrderBookDepth10: - raise ValueError( - f"Cannot convert SubscribeOrderBook with data_type {self.data_type.type} to RequestOrderBookDepth. " - f"Only OrderBookDepth10 subscriptions can be converted to historical requests." - ) + Raises + ------ + ValueError + If the data type is neither OrderBookDepth10 nor OrderBookDelta. + """ params = self.params.copy() if self.params else {} params["subscription_name"] = f"{self.data_type.type.__name__}.{self.instrument_id}" - - return RequestOrderBookDepth( - instrument_id=self.instrument_id, - start=start, - end=end, - limit=0, - depth=self.depth if self.depth > 0 else 10, - client_id=self.client_id, - venue=self.venue, - callback=callback, - request_id=UUID4(), - ts_init=self.ts_init, - params=params, - correlation_id=self.id, - ) + params["book_type"] = self.book_type + + if self.data_type.type == OrderBookDepth10: + return RequestOrderBookDepth( + instrument_id=self.instrument_id, + start=start, + end=end, + limit=0, + depth=self.depth if self.depth > 0 else 10, + client_id=self.client_id, + venue=self.venue, + callback=callback, + request_id=UUID4(), + ts_init=self.ts_init, + params=params, + correlation_id=self.id, + ) + elif self.data_type.type == OrderBookDelta: + return RequestOrderBookDeltas( + instrument_id=self.instrument_id, + start=start, + end=end, + limit=0, + client_id=self.client_id, + venue=self.venue, + callback=callback, + request_id=UUID4(), + ts_init=self.ts_init, + params=params, + correlation_id=self.id, + ) + else: + raise ValueError( + f"Cannot convert SubscribeOrderBook with data_type {self.data_type.type} to a request. " + f"Only OrderBookDelta and OrderBookDepth10 subscriptions can be converted to historical requests." + ) cdef class SubscribeQuoteTicks(SubscribeData): @@ -2445,6 +2465,112 @@ cdef class RequestOrderBookDepth(RequestData): ) +cdef class RequestOrderBookDeltas(RequestData): + """ + Represents a request for historical `OrderBookDeltas` data. + + Parameters + ---------- + instrument_id : InstrumentId + The instrument ID for the request. + start : datetime + The start datetime (UTC) of request time range (inclusive). + end : datetime + The end datetime (UTC) of request time range. + The inclusiveness depends on individual data client implementation. + limit : int + The limit on the amount of deltas received. + client_id : ClientId or ``None`` + The data client ID for the request. + venue : Venue or ``None`` + The venue for the request. + callback : Callable[[Any], None] + The delegate to call with the data. + request_id : UUID4 + The request ID. + ts_init : uint64_t + UNIX timestamp (nanoseconds) when the object was initialized. + params : dict[str, object] + Additional parameters for the request. + + Raises + ------ + ValueError + If both `client_id` and `venue` are both ``None`` (not enough routing info). + + """ + + def __init__( + self, + InstrumentId instrument_id not None, + datetime start : datetime | None, + datetime end : datetime | None, + int limit, + ClientId client_id: ClientId | None, + Venue venue: Venue | None, + callback: Callable[[Any], None] | None, + UUID4 request_id not None, + uint64_t ts_init, + dict[str, object] params: dict | None, + UUID4 correlation_id = None, + ) -> None: + super().__init__( + DataType(OrderBookDeltas), + instrument_id, + start, + end, + limit, + client_id, + venue, + callback, + request_id, + ts_init, + params, + correlation_id, + ) + + def with_dates(self, datetime start, datetime end, uint64_t ts_init, callback: Callable[[Any], None] | None = None): + return RequestOrderBookDeltas( + instrument_id=self.instrument_id, + start=start, + end=end, + limit=self.limit, + client_id=self.client_id, + venue=self.venue, + callback=callback, + request_id=UUID4(), + ts_init=ts_init, + params=self.params.copy(), + correlation_id=self.id, + ) + + def __str__(self) -> str: + return ( + f"{type(self).__name__}(" + f"instrument_id={self.instrument_id}, " + f"start={self.start}, " + f"end={self.end}, " + f"limit={self.limit}, " + f"client_id={self.client_id}, " + f"venue={self.venue}, " + f"data_type={self.data_type}{form_params_str(self.params)})" + ) + + def __repr__(self) -> str: + return ( + f"{type(self).__name__}(" + f"instrument_id={self.instrument_id}, " + f"start={self.start}, " + f"end={self.end}, " + f"limit={self.limit}, " + f"client_id={self.client_id}, " + f"venue={self.venue}, " + f"callback={self.callback}, " + f"id={self.id}, " + f"correlation_id={self.correlation_id}{form_params_str(self.params)})" + ) + + cdef class RequestQuoteTicks(RequestData): """ Represents a request for quote ticks. diff --git a/nautilus_trader/execution/engine.pyx b/nautilus_trader/execution/engine.pyx index 01c9344a8c77..5f3bcf956a79 100644 --- a/nautilus_trader/execution/engine.pyx +++ b/nautilus_trader/execution/engine.pyx @@ -1222,6 +1222,7 @@ cdef class ExecutionEngine(Component): f"Order with {event.client_order_id!r} " f"not found in the cache to apply {event}" ) + if event.venue_order_id is None: self._log.error( f"Cannot apply event to any order: " diff --git a/nautilus_trader/live/data_client.py b/nautilus_trader/live/data_client.py index 72707302e683..abf312042b5a 100644 --- a/nautilus_trader/live/data_client.py +++ b/nautilus_trader/live/data_client.py @@ -41,6 +41,7 @@ from nautilus_trader.data.messages import RequestData from nautilus_trader.data.messages import RequestInstrument from nautilus_trader.data.messages import RequestInstruments +from nautilus_trader.data.messages import RequestOrderBookDeltas from nautilus_trader.data.messages import RequestOrderBookDepth from nautilus_trader.data.messages import RequestOrderBookSnapshot from nautilus_trader.data.messages import RequestQuoteTicks @@ -880,6 +881,18 @@ def request_order_book_depth(self, request: RequestOrderBookDepth) -> None: log_msg=f"request: order_book_depth {request.instrument_id}", ) + def request_order_book_deltas(self, request: RequestOrderBookDeltas) -> None: + time_range_str = format_utc_timerange(request.start, request.end) + limit_str = f" limit={request.limit}" if request.limit != 0 else "" + self._log.info( + f"Request {request.instrument_id} order_book_deltas{time_range_str}{limit_str}", + LogColor.BLUE, + ) + self.create_task( + self._request_order_book_deltas(request), + log_msg=f"request: order_book_deltas {request.instrument_id}", + ) + ############################################################################ # Coroutines to implement ############################################################################ @@ -1063,6 +1076,11 @@ async def _request_order_book_depth(self, request: RequestOrderBookDepth) -> Non "implement the `_request_order_book_depth` coroutine", # pragma: no cover ) + async def _request_order_book_deltas(self, request: RequestOrderBookDeltas) -> None: + raise NotImplementedError( # pragma: no cover + "implement the `_request_order_book_deltas` coroutine", # pragma: no cover + ) + async def cancel_pending_tasks(self, timeout_secs: float = 5.0) -> None: """ Cancel all pending tasks and await their cancellation. diff --git a/nautilus_trader/model/book.pxd b/nautilus_trader/model/book.pxd index 4d932a02591b..ba3b4b0cca39 100644 --- a/nautilus_trader/model/book.pxd +++ b/nautilus_trader/model/book.pxd @@ -68,6 +68,7 @@ cdef class OrderBook(Data): cpdef void update_quote_tick(self, QuoteTick tick) cpdef void update_trade_tick(self, TradeTick tick) cpdef QuoteTick to_quote_tick(self) + cpdef OrderBookDeltas to_deltas_c(self, uint64_t ts_event, uint64_t ts_init) cpdef str pprint(self, int num_levels=*) diff --git a/nautilus_trader/model/book.pyx b/nautilus_trader/model/book.pyx index 5a36e43a873d..794c9be2780b 100644 --- a/nautilus_trader/model/book.pyx +++ b/nautilus_trader/model/book.pyx @@ -32,6 +32,7 @@ from nautilus_trader.core.rust.model cimport BookLevel_API from nautilus_trader.core.rust.model cimport BookOrder_t from nautilus_trader.core.rust.model cimport BookType from nautilus_trader.core.rust.model cimport OrderBook_API +from nautilus_trader.core.rust.model cimport OrderBookDeltas_API from nautilus_trader.core.rust.model cimport OrderSide from nautilus_trader.core.rust.model cimport OrderType from nautilus_trader.core.rust.model cimport Price_t @@ -75,6 +76,7 @@ from nautilus_trader.core.rust.model cimport orderbook_reset from nautilus_trader.core.rust.model cimport orderbook_sequence from nautilus_trader.core.rust.model cimport orderbook_simulate_fills from nautilus_trader.core.rust.model cimport orderbook_spread +from nautilus_trader.core.rust.model cimport orderbook_to_snapshot_deltas from nautilus_trader.core.rust.model cimport orderbook_ts_last from nautilus_trader.core.rust.model cimport orderbook_update from nautilus_trader.core.rust.model cimport orderbook_update_count @@ -840,6 +842,11 @@ cdef class OrderBook(Data): ts_init=self.ts_last, ) + cpdef OrderBookDeltas to_deltas_c(self, uint64_t ts_event, uint64_t ts_init): + cdef OrderBookDeltas obj = OrderBookDeltas.__new__(OrderBookDeltas) + obj._mem = orderbook_to_snapshot_deltas(&self._mem, ts_event, ts_init) + return obj + cpdef str pprint(self, int num_levels=3): """ Return a string representation of the order book in a human-readable table format. diff --git a/nautilus_trader/persistence/catalog/parquet.py b/nautilus_trader/persistence/catalog/parquet.py index 2c3710155399..25f48ca5cad1 100644 --- a/nautilus_trader/persistence/catalog/parquet.py +++ b/nautilus_trader/persistence/catalog/parquet.py @@ -1631,7 +1631,6 @@ def _query_rust( # Gather data data = [] - for chunk in result: data.extend(capsule_to_list(chunk)) diff --git a/nautilus_trader/test_kit/mocks/data.py b/nautilus_trader/test_kit/mocks/data.py index 563e392d4718..75010689038a 100644 --- a/nautilus_trader/test_kit/mocks/data.py +++ b/nautilus_trader/test_kit/mocks/data.py @@ -24,6 +24,7 @@ from nautilus_trader.data.messages import RequestBars from nautilus_trader.data.messages import RequestInstrument from nautilus_trader.data.messages import RequestInstruments +from nautilus_trader.data.messages import RequestOrderBookDeltas from nautilus_trader.data.messages import RequestOrderBookDepth from nautilus_trader.data.messages import RequestQuoteTicks from nautilus_trader.data.messages import RequestTradeTicks @@ -31,6 +32,7 @@ from nautilus_trader.data.messages import SubscribeQuoteTicks from nautilus_trader.data.messages import SubscribeTradeTicks from nautilus_trader.model.data import Bar +from nautilus_trader.model.data import OrderBookDeltas from nautilus_trader.model.data import OrderBookDepth10 from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import TradeTick @@ -85,6 +87,7 @@ def __init__( self.trade_ticks: list[TradeTick] = [] self.bars: list[Bar] = [] self.order_book_depths: list[OrderBookDepth10] = [] + self.order_book_deltas: list[OrderBookDeltas] = [] def request_instrument(self, request: RequestInstrument) -> None: self._handle_instrument( @@ -145,6 +148,16 @@ def request_order_book_depth(self, request: RequestOrderBookDepth) -> None: request.params, ) + def request_order_book_deltas(self, request: RequestOrderBookDeltas) -> None: + self._handle_order_book_deltas( + request.instrument_id, + self.order_book_deltas, + request.id, + request.start, + request.end, + request.params, + ) + def subscribe_quote_ticks(self, command: SubscribeQuoteTicks) -> None: """Subscribe to quote ticks - mock implementation that just tracks the subscription.""" self._add_subscription_quote_ticks(command.instrument_id) diff --git a/tests/acceptance_tests/test_backtest.py b/tests/acceptance_tests/test_backtest.py index d18ee64a96db..0f0edd628ac0 100644 --- a/tests/acceptance_tests/test_backtest.py +++ b/tests/acceptance_tests/test_backtest.py @@ -896,7 +896,7 @@ def test_run_market_maker(self): self.engine.run() # Assert - assert self.engine.kernel.msgbus.sent_count == 23_688 + assert self.engine.kernel.msgbus.sent_count == 23_689 assert self.engine.kernel.msgbus.pub_count == 26_806 assert self.engine.iteration == 8_198 account = self.engine.portfolio.account(self.venue) diff --git a/tests/test_data/databento/order_book_deltas_catalog/databento/orderbooks_definition.dbn.zst b/tests/test_data/databento/order_book_deltas_catalog/databento/orderbooks_definition.dbn.zst new file mode 100644 index 000000000000..f506e9856948 Binary files /dev/null and b/tests/test_data/databento/order_book_deltas_catalog/databento/orderbooks_definition.dbn.zst differ diff --git a/tests/test_data/databento/order_book_deltas_catalog/databento/orderbooks_mbo_2024-05-08T00-00-00_2024-05-08T00-00-02.dbn.zst b/tests/test_data/databento/order_book_deltas_catalog/databento/orderbooks_mbo_2024-05-08T00-00-00_2024-05-08T00-00-02.dbn.zst new file mode 100644 index 000000000000..a1d63382a5e3 Binary files /dev/null and b/tests/test_data/databento/order_book_deltas_catalog/databento/orderbooks_mbo_2024-05-08T00-00-00_2024-05-08T00-00-02.dbn.zst differ diff --git a/tests/unit_tests/data/test_engine.py b/tests/unit_tests/data/test_engine.py index 389d78320318..ebf108d0038f 100644 --- a/tests/unit_tests/data/test_engine.py +++ b/tests/unit_tests/data/test_engine.py @@ -36,6 +36,7 @@ from nautilus_trader.data.messages import RequestData from nautilus_trader.data.messages import RequestInstrument from nautilus_trader.data.messages import RequestInstruments +from nautilus_trader.data.messages import RequestOrderBookDeltas from nautilus_trader.data.messages import RequestOrderBookDepth from nautilus_trader.data.messages import RequestOrderBookSnapshot from nautilus_trader.data.messages import RequestQuoteTicks @@ -3737,6 +3738,376 @@ def test_request_order_book_depth_reaches_client(self): assert len(depths_received) == 1 # Depth should flow through message bus assert depths_received[0] == depth + def test_request_order_book_deltas_reaches_client(self): + # Arrange + self.data_engine.register_client(self.mock_market_data_client) + from nautilus_trader.test_kit.stubs.data import TestDataStubs + + deltas = OrderBookDeltas( + instrument_id=ETHUSDT_BINANCE.id, + deltas=[TestDataStubs.order_book_delta(instrument_id=ETHUSDT_BINANCE.id)], + ) + self.mock_market_data_client.order_book_deltas = [deltas] + + # Subscribe to order book deltas on message bus + deltas_received = [] + topic = f"historical.data.book.deltas.{ETHUSDT_BINANCE.venue}.{ETHUSDT_BINANCE.id.symbol.topic()}" + self.msgbus.subscribe(topic=topic, handler=deltas_received.append) + + handler = [] + request = RequestOrderBookDeltas( + instrument_id=ETHUSDT_BINANCE.id, + start=None, + end=None, + limit=0, + client_id=None, # Will route to the Binance venue + venue=ETHUSDT_BINANCE.venue, + callback=handler.append, + request_id=UUID4(), + ts_init=self.clock.timestamp_ns(), + params={"update_catalog": False}, + ) + + # Act + self.msgbus.request(endpoint="DataEngine.request", request=request) + + # Assert + assert self.data_engine.request_count == 1 + assert len(handler) == 1 + assert handler[0].data == [] # Response data should be empty + assert len(deltas_received) == 1 # Deltas should flow through message bus + assert deltas_received[0] == deltas + + def test_request_order_book_deltas_with_start_date_floors_to_day_start(self): + # Arrange + self.data_engine.register_client(self.mock_market_data_client) + + start_date = pd.Timestamp("2024-01-15T14:30:00", tz="UTC") + handler = [] + request = RequestOrderBookDeltas( + instrument_id=ETHUSDT_BINANCE.id, + start=start_date, + end=None, + limit=0, + client_id=None, + venue=ETHUSDT_BINANCE.venue, + callback=handler.append, + request_id=UUID4(), + ts_init=self.clock.timestamp_ns(), + params={"update_catalog": False}, + ) + + # Act + self.msgbus.request(endpoint="DataEngine.request", request=request) + + # Assert + assert self.data_engine.request_count == 1 + # Verify that original_start_date is stored in params + assert "original_start_date" in request.params + assert request.params["original_start_date"] == start_date + # Verify that start date was floored to start of day + assert request.start == pd.Timestamp("2024-01-15T00:00:00", tz="UTC") + + def test_request_order_book_deltas_with_from_day_start_false_preserves_start_date(self): + # Arrange + self.data_engine.register_client(self.mock_market_data_client) + + start_date = pd.Timestamp("2024-01-15T14:30:00", tz="UTC") + handler = [] + request = RequestOrderBookDeltas( + instrument_id=ETHUSDT_BINANCE.id, + start=start_date, + end=None, + limit=0, + client_id=None, + venue=ETHUSDT_BINANCE.venue, + callback=handler.append, + request_id=UUID4(), + ts_init=self.clock.timestamp_ns(), + params={"update_catalog": False, "from_day_start": False}, + ) + + # Act + self.msgbus.request(endpoint="DataEngine.request", request=request) + + # Assert + assert self.data_engine.request_count == 1 + # Verify that original_start_date is stored in params + assert "original_start_date" in request.params + assert request.params["original_start_date"] == start_date + # Verify that start date was NOT floored + assert request.start == start_date + + def test_process_order_book_deltas_with_historical_flag(self): + # Arrange + self.data_engine.register_client(self.binance_client) + self.binance_client.start() + + self.data_engine.process(ETHUSDT_BINANCE) + + historical_handler = [] + live_handler = [] + + self.msgbus.subscribe( + topic="historical.data.book.deltas.BINANCE.ETHUSDT", + handler=historical_handler.append, + ) + self.msgbus.subscribe( + topic="data.book.deltas.BINANCE.ETHUSDT", + handler=live_handler.append, + ) + + subscribe = SubscribeOrderBook( + book_data_type=OrderBookDelta, + client_id=None, + venue=BINANCE, + instrument_id=ETHUSDT_BINANCE.id, + book_type=BookType.L2_MBP, + depth=10, + managed=True, + command_id=UUID4(), + ts_init=self.clock.timestamp_ns(), + ) + + self.data_engine.execute(subscribe) + + deltas = TestDataStubs.order_book_deltas(ETHUSDT_BINANCE.id) + + # Act - Process as historical data via process_historical + # Historical data flows through process_historical, which calls _handle_data with historical=True + self.data_engine.process_historical(deltas) + + # Assert + # Historical data should be published to historical topic + assert len(historical_handler) == 1 + assert historical_handler[0].instrument_id == ETHUSDT_BINANCE.id + assert isinstance(historical_handler[0], OrderBookDeltas) + # Live handler should not receive historical data + assert len(live_handler) == 0 + + def test_process_order_book_deltas_with_f_last_flag_in_middle_of_batch(self): + # Arrange + self.data_engine.register_client(self.binance_client) + self.binance_client.start() + + self.data_engine.process(ETHUSDT_BINANCE) + + handler = [] + self.msgbus.subscribe( + topic="data.book.deltas.BINANCE.ETHUSDT", + handler=handler.append, + ) + + subscribe = SubscribeOrderBook( + book_data_type=OrderBookDelta, + client_id=ClientId(BINANCE.value), + venue=BINANCE, + instrument_id=ETHUSDT_BINANCE.id, + book_type=BookType.L3_MBO, + depth=5, + managed=True, + command_id=UUID4(), + ts_init=self.clock.timestamp_ns(), + ) + + self.data_engine.execute(subscribe) + + # Create deltas with F_LAST flag in the middle + delta1 = TestDataStubs.order_book_delta( + instrument_id=ETHUSDT_BINANCE.id, + flags=0, + ts_init=1, + ) + delta2 = TestDataStubs.order_book_delta( + instrument_id=ETHUSDT_BINANCE.id, + flags=RecordFlag.F_LAST, + ts_init=2, + ) + delta3 = TestDataStubs.order_book_delta( + instrument_id=ETHUSDT_BINANCE.id, + flags=0, + ts_init=3, + ) + + deltas_with_f_last = OrderBookDeltas( + instrument_id=ETHUSDT_BINANCE.id, + deltas=[delta1, delta2, delta3], + ) + + # Act + self.data_engine.process(deltas_with_f_last) + + # Assert + # When buffering is disabled, all deltas are published immediately + # When buffering is enabled, only deltas up to and including F_LAST are published + # Since default config has buffer_deltas=False, all deltas should be published + assert len(handler) == 1 + assert handler[0].instrument_id == ETHUSDT_BINANCE.id + assert isinstance(handler[0], OrderBookDeltas) + # All deltas should be published when buffering is disabled + assert len(handler[0].deltas) == 3 + assert handler[0].deltas[0] == delta1 + assert handler[0].deltas[1] == delta2 + assert handler[0].deltas[2] == delta3 + + def test_handle_order_book_deltas_snapshot_replay_with_snapshot_at_day_start(self): + # Arrange + self.data_engine.register_client(self.binance_client) + self.binance_client.start() + + self.data_engine.process(ETHUSDT_BINANCE) + + # Create a snapshot delta at start of UTC day + day_start = pd.Timestamp("2024-01-15T00:00:00", tz="UTC") + day_start_ns = day_start.value + + snapshot_delta = TestDataStubs.order_book_delta( + instrument_id=ETHUSDT_BINANCE.id, + flags=RecordFlag.F_SNAPSHOT, + ts_init=int(day_start_ns), + ) + + # Create deltas after the original start date + original_start = pd.Timestamp("2024-01-15T10:00:00", tz="UTC") + original_start_ns = original_start.value + + delta1 = TestDataStubs.order_book_delta( + instrument_id=ETHUSDT_BINANCE.id, + flags=0, + ts_init=int(day_start_ns + 1_000_000_000), # 1 second after day start + ) + delta2 = TestDataStubs.order_book_delta( + instrument_id=ETHUSDT_BINANCE.id, + flags=0, + ts_init=int(original_start_ns + 1_000_000_000), # After original start + ) + + deltas_obj1 = OrderBookDeltas( + instrument_id=ETHUSDT_BINANCE.id, + deltas=[snapshot_delta, delta1], + ) + + deltas_obj2 = OrderBookDeltas( + instrument_id=ETHUSDT_BINANCE.id, + deltas=[delta2], + ) + + response = DataResponse( + client_id=None, + venue=BINANCE, + data_type=DataType(OrderBookDeltas), + data=[deltas_obj1, deltas_obj2], + correlation_id=UUID4(), + response_id=UUID4(), + start=day_start, + end=pd.Timestamp("2024-01-15T23:59:59", tz="UTC"), + ts_init=self.clock.timestamp_ns(), + params={ + "original_start_date": original_start, + "book_type": BookType.L2_MBP, + }, + ) + + # Act + self.data_engine._handle_order_book_deltas_snapshot_replay(response) + + # Assert + # Response data should be filtered and contain evolved snapshot + assert len(response.data) > 0 + # First item should be the evolved snapshot (not the original snapshot) + first_item = response.data[0] + assert isinstance(first_item, OrderBookDeltas) + # Should contain a Clear delta followed by Add deltas (snapshot format) + assert len(first_item.deltas) > 0 + # First delta should be a Clear (snapshot format) + assert first_item.deltas[0].action == BookAction.CLEAR + + def test_handle_order_book_deltas_snapshot_replay_without_snapshot_skips_replay(self): + # Arrange + self.data_engine.register_client(self.binance_client) + self.binance_client.start() + + self.data_engine.process(ETHUSDT_BINANCE) + + # Create deltas without snapshot flag + delta1 = TestDataStubs.order_book_delta( + instrument_id=ETHUSDT_BINANCE.id, + flags=0, + ) + + deltas_obj = OrderBookDeltas( + instrument_id=ETHUSDT_BINANCE.id, + deltas=[delta1], + ) + + response = DataResponse( + client_id=None, + venue=BINANCE, + data_type=DataType(OrderBookDeltas), + data=[deltas_obj], + correlation_id=UUID4(), + response_id=UUID4(), + start=pd.Timestamp("2024-01-15T00:00:00", tz="UTC"), + end=pd.Timestamp("2024-01-15T23:59:59", tz="UTC"), + ts_init=self.clock.timestamp_ns(), + params={ + "original_start_date": pd.Timestamp("2024-01-15T10:00:00", tz="UTC"), + "book_type": BookType.L2_MBP, + }, + ) + + original_data = response.data.copy() + + # Act + self.data_engine._handle_order_book_deltas_snapshot_replay(response) + + # Assert + # Data should remain unchanged (no snapshot replay) + assert response.data == original_data + + def test_handle_order_book_deltas_snapshot_replay_without_original_start_date_skips_replay( + self, + ): + # Arrange + self.data_engine.register_client(self.binance_client) + self.binance_client.start() + + self.data_engine.process(ETHUSDT_BINANCE) + + snapshot_delta = TestDataStubs.order_book_delta( + instrument_id=ETHUSDT_BINANCE.id, + flags=RecordFlag.F_SNAPSHOT, + ) + + deltas_obj = OrderBookDeltas( + instrument_id=ETHUSDT_BINANCE.id, + deltas=[snapshot_delta], + ) + + response = DataResponse( + client_id=None, + venue=BINANCE, + data_type=DataType(OrderBookDeltas), + data=[deltas_obj], + correlation_id=UUID4(), + response_id=UUID4(), + start=pd.Timestamp("2024-01-15T00:00:00", tz="UTC"), + end=pd.Timestamp("2024-01-15T23:59:59", tz="UTC"), + ts_init=self.clock.timestamp_ns(), + params={ + "book_type": BookType.L2_MBP, + }, + ) + + original_data = response.data.copy() + + # Act + self.data_engine._handle_order_book_deltas_snapshot_replay(response) + + # Assert + # Data should remain unchanged (no original_start_date in params) + assert response.data == original_data + def test_request_aggregated_bars_with_bars(self): # Arrange loader = DatabentoDataLoader() diff --git a/tests/unit_tests/data/test_messages.py b/tests/unit_tests/data/test_messages.py index c7ff44828162..e15865492687 100644 --- a/tests/unit_tests/data/test_messages.py +++ b/tests/unit_tests/data/test_messages.py @@ -26,7 +26,6 @@ from nautilus_trader.data.messages import SubscribeOrderBook from nautilus_trader.data.messages import UnsubscribeData from nautilus_trader.model.data import DataType -from nautilus_trader.model.data import OrderBookDelta from nautilus_trader.model.data import OrderBookDepth10 from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import TradeTick @@ -333,30 +332,3 @@ def test_subscribe_order_book_to_request_conversion(self): assert request.correlation_id == command_id assert "subscription_name" in request.params assert request.params["subscription_name"] == "OrderBookDepth10.AUD/USD.SIM" - - def test_subscribe_order_book_to_request_conversion_with_invalid_data_type_raises_error(self): - # Arrange - instrument_id = InstrumentId(Symbol("AUD/USD"), Venue("SIM")) - command_id = UUID4() - - subscribe = SubscribeOrderBook( - instrument_id=instrument_id, - book_data_type=OrderBookDelta, # Invalid data type for order book depth conversion - book_type=BookType.L2_MBP, - client_id=ClientId("TEST"), - venue=instrument_id.venue, - command_id=command_id, - ts_init=self.clock.timestamp_ns(), - depth=10, - ) - - callback = [].append - start = pd.Timestamp("2023-01-01", tz="UTC") - end = pd.Timestamp("2023-01-02", tz="UTC") - - # Act & Assert - with pytest.raises(ValueError) as e: - subscribe.to_request(start=start, end=end, callback=callback) - - assert "Cannot convert SubscribeOrderBook with data_type" in str(e.value) - assert "Only OrderBookDepth10 subscriptions can be converted" in str(e.value)