Skip to content

Commit 6bd7508

Browse files
committed
Refine handling of OrderBookDeltas
1 parent cbd900f commit 6bd7508

File tree

34 files changed

+1817
-90
lines changed

34 files changed

+1817
-90
lines changed

crates/adapters/databento/src/historical.rs

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use databento::{
3131
use indexmap::IndexMap;
3232
use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
3333
use nautilus_model::{
34-
data::{Bar, Data, InstrumentStatus, OrderBookDepth10, QuoteTick, TradeTick},
34+
data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
3535
enums::BarAggregation,
3636
identifiers::{InstrumentId, Symbol, Venue},
3737
instruments::InstrumentAny,
@@ -41,8 +41,8 @@ use nautilus_model::{
4141
use crate::{
4242
common::get_date_time_range,
4343
decode::{
44-
decode_imbalance_msg, decode_instrument_def_msg, decode_mbp10_msg, decode_record,
45-
decode_statistics_msg, decode_status_msg,
44+
decode_imbalance_msg, decode_instrument_def_msg, decode_mbo_msg, decode_mbp10_msg,
45+
decode_record, decode_statistics_msg, decode_status_msg,
4646
},
4747
symbology::{
4848
MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
@@ -404,6 +404,78 @@ impl DatabentoHistoricalClient {
404404
Ok(result)
405405
}
406406

407+
/// Fetches order book deltas for the given parameters.
408+
///
409+
/// # Errors
410+
///
411+
/// Returns an error if the API request or data processing fails.
412+
pub async fn get_range_order_book_deltas(
413+
&self,
414+
params: RangeQueryParams,
415+
) -> anyhow::Result<Vec<OrderBookDelta>> {
416+
let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
417+
check_consistent_symbology(&symbols)?;
418+
419+
let first_symbol = params
420+
.symbols
421+
.first()
422+
.ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
423+
let stype_in = infer_symbology_type(first_symbol);
424+
let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
425+
let time_range = get_date_time_range(params.start, end)?;
426+
427+
let range_params = GetRangeParams::builder()
428+
.dataset(params.dataset)
429+
.date_time_range(time_range)
430+
.symbols(symbols)
431+
.stype_in(stype_in)
432+
.schema(dbn::Schema::Mbo)
433+
.limit(params.limit.and_then(NonZeroU64::new))
434+
.build();
435+
436+
let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
437+
438+
let mut client = self.inner.lock().await;
439+
let mut decoder = client
440+
.timeseries()
441+
.get_range(&range_params)
442+
.await
443+
.map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
444+
445+
let metadata = decoder.metadata().clone();
446+
let mut metadata_cache = MetadataCache::new(metadata);
447+
let mut result: Vec<OrderBookDelta> = Vec::new();
448+
449+
let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
450+
let sym_map = self
451+
.symbol_venue_map
452+
.read()
453+
.map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
454+
let instrument_id = decode_nautilus_instrument_id(
455+
&record,
456+
&mut metadata_cache,
457+
&self.publisher_venue_map,
458+
&sym_map,
459+
)?;
460+
461+
if let Some(msg) = record.get::<dbn::MboMsg>() {
462+
let (delta, _trade) =
463+
decode_mbo_msg(msg, instrument_id, price_precision, None, false)?;
464+
if let Some(delta) = delta {
465+
result.push(delta);
466+
}
467+
}
468+
469+
Ok(())
470+
};
471+
472+
while let Ok(Some(msg)) = decoder.decode_record::<dbn::MboMsg>().await {
473+
process_record(dbn::RecordRef::from(msg))?;
474+
}
475+
476+
Ok(result)
477+
}
478+
407479
/// Fetches trade ticks for the given parameters.
408480
///
409481
/// # Errors

crates/adapters/databento/src/python/historical.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,42 @@ impl DatabentoHistoricalClient {
282282
})
283283
}
284284

285+
#[pyo3(name = "get_range_order_book_deltas")]
286+
#[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
287+
#[allow(clippy::too_many_arguments)]
288+
fn py_get_range_order_book_deltas<'py>(
289+
&self,
290+
py: Python<'py>,
291+
dataset: String,
292+
instrument_ids: Vec<InstrumentId>,
293+
start: u64,
294+
end: Option<u64>,
295+
limit: Option<u64>,
296+
price_precision: Option<u8>,
297+
) -> PyResult<Bound<'py, PyAny>> {
298+
let inner = self.inner.clone();
299+
let symbols = inner
300+
.prepare_symbols_from_instrument_ids(&instrument_ids)
301+
.map_err(to_pyvalue_err)?;
302+
303+
let params = RangeQueryParams {
304+
dataset,
305+
symbols,
306+
start: start.into(),
307+
end: end.map(Into::into),
308+
limit,
309+
price_precision,
310+
};
311+
312+
pyo3_async_runtimes::tokio::future_into_py(py, async move {
313+
let deltas = inner
314+
.get_range_order_book_deltas(params)
315+
.await
316+
.map_err(to_pyvalue_err)?;
317+
Python::attach(|py| deltas.into_py_any(py))
318+
})
319+
}
320+
285321
#[pyo3(name = "get_range_imbalance")]
286322
#[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
287323
#[allow(clippy::too_many_arguments)]

crates/model/src/ffi/orderbook/book.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,31 @@ pub extern "C" fn orderbook_apply_deltas(book: &mut OrderBook_API, deltas: &Orde
164164
}
165165
}
166166

167+
/// Creates an `OrderBookDeltas` snapshot from the current order book state.
168+
///
169+
/// This is the reverse operation of `orderbook_apply_deltas`: it converts the current book state
170+
/// back into a snapshot format with a `Clear` delta followed by `Add` deltas for all orders.
171+
///
172+
/// # Parameters
173+
///
174+
/// * `book` - The order book to convert.
175+
/// * `sequence` - The message sequence number for the snapshot.
176+
/// * `ts_event` - UNIX timestamp (nanoseconds) when the book event occurred.
177+
/// * `ts_init` - UNIX timestamp (nanoseconds) when the instance was created.
178+
///
179+
/// # Returns
180+
///
181+
/// An `OrderBookDeltas_API` containing a snapshot of the current order book state.
182+
#[unsafe(no_mangle)]
183+
pub extern "C" fn orderbook_to_snapshot_deltas(
184+
book: &OrderBook_API,
185+
ts_event: u64,
186+
ts_init: u64,
187+
) -> OrderBookDeltas_API {
188+
use nautilus_core::UnixNanos;
189+
OrderBookDeltas_API::new(book.to_deltas(UnixNanos::from(ts_event), UnixNanos::from(ts_init)))
190+
}
191+
167192
#[unsafe(no_mangle)]
168193
pub extern "C" fn orderbook_apply_depth(book: &mut OrderBook_API, depth: &OrderBookDepth10) {
169194
if let Err(e) = book.apply_depth_unchecked(depth) {

crates/model/src/orderbook/book.rs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use super::{
2828
};
2929
use crate::{
3030
data::{BookOrder, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick},
31-
enums::{BookAction, BookType, OrderSide, OrderSideSpecified, OrderStatus},
31+
enums::{BookAction, BookType, OrderSide, OrderSideSpecified, OrderStatus, RecordFlag},
3232
identifiers::InstrumentId,
3333
orderbook::{
3434
BookIntegrityError, InvalidBookOperation,
@@ -375,6 +375,84 @@ impl OrderBook {
375375
Ok(())
376376
}
377377

378+
/// Creates an `OrderBookDeltas` snapshot from the current order book state.
379+
///
380+
/// This is the reverse operation of `apply_deltas`: it converts the current book state
381+
/// back into a snapshot format with a `Clear` delta followed by `Add` deltas for all orders.
382+
///
383+
/// # Parameters
384+
///
385+
/// * `ts_event` - UNIX timestamp (nanoseconds) when the book event occurred.
386+
/// * `ts_init` - UNIX timestamp (nanoseconds) when the instance was created.
387+
///
388+
/// # Returns
389+
///
390+
/// An `OrderBookDeltas` containing a snapshot of the current order book state.
391+
#[must_use]
392+
pub fn to_deltas(&self, ts_event: UnixNanos, ts_init: UnixNanos) -> OrderBookDeltas {
393+
let mut deltas = Vec::new();
394+
395+
// Add clear delta first
396+
deltas.push(OrderBookDelta::clear(
397+
self.instrument_id,
398+
self.sequence,
399+
ts_event,
400+
ts_init,
401+
));
402+
403+
// Count total orders to determine which one should have F_LAST flag
404+
let total_orders = self.bids(None).map(|level| level.len()).sum::<usize>()
405+
+ self.asks(None).map(|level| level.len()).sum::<usize>();
406+
407+
let mut order_count = 0;
408+
409+
// Add bid orders
410+
for level in self.bids(None) {
411+
for order in level.iter() {
412+
order_count += 1;
413+
let flags = if order_count == total_orders {
414+
RecordFlag::F_SNAPSHOT as u8 | RecordFlag::F_LAST as u8
415+
} else {
416+
RecordFlag::F_SNAPSHOT as u8
417+
};
418+
419+
deltas.push(OrderBookDelta::new(
420+
self.instrument_id,
421+
BookAction::Add,
422+
*order,
423+
flags,
424+
self.sequence,
425+
ts_event,
426+
ts_init,
427+
));
428+
}
429+
}
430+
431+
// Add ask orders
432+
for level in self.asks(None) {
433+
for order in level.iter() {
434+
order_count += 1;
435+
let flags = if order_count == total_orders {
436+
RecordFlag::F_SNAPSHOT as u8 | RecordFlag::F_LAST as u8
437+
} else {
438+
RecordFlag::F_SNAPSHOT as u8
439+
};
440+
441+
deltas.push(OrderBookDelta::new(
442+
self.instrument_id,
443+
BookAction::Add,
444+
*order,
445+
flags,
446+
self.sequence,
447+
ts_event,
448+
ts_init,
449+
));
450+
}
451+
}
452+
453+
OrderBookDeltas::new(self.instrument_id, deltas)
454+
}
455+
378456
/// Replaces current book state with a depth snapshot.
379457
///
380458
/// # Errors

examples/backtest/notebooks/databento_download.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# extension: .py
77
# format_name: percent
88
# format_version: '1.3'
9-
# jupytext_version: 1.17.3
9+
# jupytext_version: 1.18.1
1010
# kernelspec:
1111
# display_name: Python 3 (ipykernel)
1212
# language: python
@@ -118,6 +118,14 @@
118118
end=time_object_to_dt(end_time_1),
119119
)
120120

121+
# %%
122+
node.download_data(
123+
"request_order_book_deltas",
124+
instrument_id=InstrumentId.from_str("ESM4.XCME"),
125+
start=time_object_to_dt("2024-05-08T10:00:00"),
126+
end=time_object_to_dt("2024-05-08T10:00:01"),
127+
)
128+
121129
# %%
122130
# # Clean up
123131
node.dispose()

0 commit comments

Comments
 (0)