Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 75 additions & 3 deletions crates/adapters/databento/src/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use databento::{
use indexmap::IndexMap;
use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
use nautilus_model::{
data::{Bar, Data, InstrumentStatus, OrderBookDepth10, QuoteTick, TradeTick},
data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
enums::BarAggregation,
identifiers::{InstrumentId, Symbol, Venue},
instruments::InstrumentAny,
Expand All @@ -41,8 +41,8 @@ use nautilus_model::{
use crate::{
common::get_date_time_range,
decode::{
decode_imbalance_msg, decode_instrument_def_msg, decode_mbp10_msg, decode_record,
decode_statistics_msg, decode_status_msg,
decode_imbalance_msg, decode_instrument_def_msg, decode_mbo_msg, decode_mbp10_msg,
decode_record, decode_statistics_msg, decode_status_msg,
},
symbology::{
MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
Expand Down Expand Up @@ -404,6 +404,78 @@ impl DatabentoHistoricalClient {
Ok(result)
}

/// Fetches order book deltas for the given parameters.
///
/// # Errors
///
/// Returns an error if the API request or data processing fails.
pub async fn get_range_order_book_deltas(
&self,
params: RangeQueryParams,
) -> anyhow::Result<Vec<OrderBookDelta>> {
let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
check_consistent_symbology(&symbols)?;

let first_symbol = params
.symbols
.first()
.ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
let stype_in = infer_symbology_type(first_symbol);
let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
let time_range = get_date_time_range(params.start, end)?;

let range_params = GetRangeParams::builder()
.dataset(params.dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(stype_in)
.schema(dbn::Schema::Mbo)
.limit(params.limit.and_then(NonZeroU64::new))
.build();

let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);

let mut client = self.inner.lock().await;
let mut decoder = client
.timeseries()
.get_range(&range_params)
.await
.map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;

let metadata = decoder.metadata().clone();
let mut metadata_cache = MetadataCache::new(metadata);
let mut result: Vec<OrderBookDelta> = Vec::new();

let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
let sym_map = self
.symbol_venue_map
.read()
.map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
let instrument_id = decode_nautilus_instrument_id(
&record,
&mut metadata_cache,
&self.publisher_venue_map,
&sym_map,
)?;

if let Some(msg) = record.get::<dbn::MboMsg>() {
let (delta, _trade) =
decode_mbo_msg(msg, instrument_id, price_precision, None, false)?;
if let Some(delta) = delta {
result.push(delta);
}
}

Ok(())
};

while let Ok(Some(msg)) = decoder.decode_record::<dbn::MboMsg>().await {
process_record(dbn::RecordRef::from(msg))?;
}

Ok(result)
}

/// Fetches trade ticks for the given parameters.
///
/// # Errors
Expand Down
36 changes: 36 additions & 0 deletions crates/adapters/databento/src/python/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,42 @@ impl DatabentoHistoricalClient {
})
}

#[pyo3(name = "get_range_order_book_deltas")]
#[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
#[allow(clippy::too_many_arguments)]
fn py_get_range_order_book_deltas<'py>(
&self,
py: Python<'py>,
dataset: String,
instrument_ids: Vec<InstrumentId>,
start: u64,
end: Option<u64>,
limit: Option<u64>,
price_precision: Option<u8>,
) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
let symbols = inner
.prepare_symbols_from_instrument_ids(&instrument_ids)
.map_err(to_pyvalue_err)?;

let params = RangeQueryParams {
dataset,
symbols,
start: start.into(),
end: end.map(Into::into),
limit,
price_precision,
};

pyo3_async_runtimes::tokio::future_into_py(py, async move {
let deltas = inner
.get_range_order_book_deltas(params)
.await
.map_err(to_pyvalue_err)?;
Python::attach(|py| deltas.into_py_any(py))
})
}

#[pyo3(name = "get_range_imbalance")]
#[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
#[allow(clippy::too_many_arguments)]
Expand Down
25 changes: 25 additions & 0 deletions crates/model/src/ffi/orderbook/book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,31 @@ pub extern "C" fn orderbook_apply_deltas(book: &mut OrderBook_API, deltas: &Orde
}
}

/// Creates an `OrderBookDeltas` snapshot from the current order book state.
///
/// This is the reverse operation of `orderbook_apply_deltas`: it converts the current book state
/// back into a snapshot format with a `Clear` delta followed by `Add` deltas for all orders.
///
/// # Parameters
///
/// * `book` - The order book to convert.
/// * `sequence` - The message sequence number for the snapshot.
/// * `ts_event` - UNIX timestamp (nanoseconds) when the book event occurred.
/// * `ts_init` - UNIX timestamp (nanoseconds) when the instance was created.
///
/// # Returns
///
/// An `OrderBookDeltas_API` containing a snapshot of the current order book state.
#[unsafe(no_mangle)]
pub extern "C" fn orderbook_to_snapshot_deltas(
book: &OrderBook_API,
ts_event: u64,
ts_init: u64,
) -> OrderBookDeltas_API {
use nautilus_core::UnixNanos;
OrderBookDeltas_API::new(book.to_deltas(UnixNanos::from(ts_event), UnixNanos::from(ts_init)))
}

#[unsafe(no_mangle)]
pub extern "C" fn orderbook_apply_depth(book: &mut OrderBook_API, depth: &OrderBookDepth10) {
if let Err(e) = book.apply_depth_unchecked(depth) {
Expand Down
80 changes: 79 additions & 1 deletion crates/model/src/orderbook/book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use super::{
};
use crate::{
data::{BookOrder, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick},
enums::{BookAction, BookType, OrderSide, OrderSideSpecified, OrderStatus},
enums::{BookAction, BookType, OrderSide, OrderSideSpecified, OrderStatus, RecordFlag},
identifiers::InstrumentId,
orderbook::{
BookIntegrityError, InvalidBookOperation,
Expand Down Expand Up @@ -375,6 +375,84 @@ impl OrderBook {
Ok(())
}

/// Creates an `OrderBookDeltas` snapshot from the current order book state.
///
/// This is the reverse operation of `apply_deltas`: it converts the current book state
/// back into a snapshot format with a `Clear` delta followed by `Add` deltas for all orders.
///
/// # Parameters
///
/// * `ts_event` - UNIX timestamp (nanoseconds) when the book event occurred.
/// * `ts_init` - UNIX timestamp (nanoseconds) when the instance was created.
///
/// # Returns
///
/// An `OrderBookDeltas` containing a snapshot of the current order book state.
#[must_use]
pub fn to_deltas(&self, ts_event: UnixNanos, ts_init: UnixNanos) -> OrderBookDeltas {
let mut deltas = Vec::new();

// Add clear delta first
deltas.push(OrderBookDelta::clear(
self.instrument_id,
self.sequence,
ts_event,
ts_init,
));

// Count total orders to determine which one should have F_LAST flag
let total_orders = self.bids(None).map(|level| level.len()).sum::<usize>()
+ self.asks(None).map(|level| level.len()).sum::<usize>();

let mut order_count = 0;

// Add bid orders
for level in self.bids(None) {
for order in level.iter() {
order_count += 1;
let flags = if order_count == total_orders {
RecordFlag::F_SNAPSHOT as u8 | RecordFlag::F_LAST as u8
} else {
RecordFlag::F_SNAPSHOT as u8
};

deltas.push(OrderBookDelta::new(
self.instrument_id,
BookAction::Add,
*order,
flags,
self.sequence,
ts_event,
ts_init,
));
}
}

// Add ask orders
for level in self.asks(None) {
for order in level.iter() {
order_count += 1;
let flags = if order_count == total_orders {
RecordFlag::F_SNAPSHOT as u8 | RecordFlag::F_LAST as u8
} else {
RecordFlag::F_SNAPSHOT as u8
};

deltas.push(OrderBookDelta::new(
self.instrument_id,
BookAction::Add,
*order,
flags,
self.sequence,
ts_event,
ts_init,
));
}
}

OrderBookDeltas::new(self.instrument_id, deltas)
}

/// Replaces current book state with a depth snapshot.
///
/// # Errors
Expand Down
10 changes: 9 additions & 1 deletion examples/backtest/notebooks/databento_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# extension: .py
# format_name: percent
# format_version: '1.3'
# jupytext_version: 1.17.3
# jupytext_version: 1.18.1
# kernelspec:
# display_name: Python 3 (ipykernel)
# language: python
Expand Down Expand Up @@ -118,6 +118,14 @@
end=time_object_to_dt(end_time_1),
)

# %%
node.download_data(
"request_order_book_deltas",
instrument_id=InstrumentId.from_str("ESM4.XCME"),
start=time_object_to_dt("2024-05-08T10:00:00"),
end=time_object_to_dt("2024-05-08T10:00:01"),
)

# %%
# # Clean up
node.dispose()
Loading
Loading