@@ -31,7 +31,7 @@ use databento::{
3131use indexmap:: IndexMap ;
3232use nautilus_core:: { UnixNanos , consts:: NAUTILUS_USER_AGENT , time:: AtomicTime } ;
3333use nautilus_model:: {
34- data:: { Bar , Data , InstrumentStatus , OrderBookDepth10 , QuoteTick , TradeTick } ,
34+ data:: { Bar , Data , InstrumentStatus , OrderBookDelta , OrderBookDepth10 , QuoteTick , TradeTick } ,
3535 enums:: BarAggregation ,
3636 identifiers:: { InstrumentId , Symbol , Venue } ,
3737 instruments:: InstrumentAny ,
@@ -41,8 +41,8 @@ use nautilus_model::{
4141use crate :: {
4242 common:: get_date_time_range,
4343 decode:: {
44- decode_imbalance_msg, decode_instrument_def_msg, decode_mbp10_msg , decode_record ,
45- decode_statistics_msg, decode_status_msg,
44+ decode_imbalance_msg, decode_instrument_def_msg, decode_mbo_msg , decode_mbp10_msg ,
45+ decode_record , decode_statistics_msg, decode_status_msg,
4646 } ,
4747 symbology:: {
4848 MetadataCache , check_consistent_symbology, decode_nautilus_instrument_id,
@@ -404,6 +404,78 @@ impl DatabentoHistoricalClient {
404404 Ok ( result)
405405 }
406406
407+ /// Fetches order book deltas for the given parameters.
408+ ///
409+ /// # Errors
410+ ///
411+ /// Returns an error if the API request or data processing fails.
412+ pub async fn get_range_order_book_deltas (
413+ & self ,
414+ params : RangeQueryParams ,
415+ ) -> anyhow:: Result < Vec < OrderBookDelta > > {
416+ let symbols: Vec < & str > = params. symbols . iter ( ) . map ( String :: as_str) . collect ( ) ;
417+ check_consistent_symbology ( & symbols) ?;
418+
419+ let first_symbol = params
420+ . symbols
421+ . first ( )
422+ . ok_or_else ( || anyhow:: anyhow!( "No symbols provided" ) ) ?;
423+ let stype_in = infer_symbology_type ( first_symbol) ;
424+ let end = params. end . unwrap_or_else ( || self . clock . get_time_ns ( ) ) ;
425+ let time_range = get_date_time_range ( params. start , end) ?;
426+
427+ let range_params = GetRangeParams :: builder ( )
428+ . dataset ( params. dataset )
429+ . date_time_range ( time_range)
430+ . symbols ( symbols)
431+ . stype_in ( stype_in)
432+ . schema ( dbn:: Schema :: Mbo )
433+ . limit ( params. limit . and_then ( NonZeroU64 :: new) )
434+ . build ( ) ;
435+
436+ let price_precision = params. price_precision . unwrap_or ( Currency :: USD ( ) . precision ) ;
437+
438+ let mut client = self . inner . lock ( ) . await ;
439+ let mut decoder = client
440+ . timeseries ( )
441+ . get_range ( & range_params)
442+ . await
443+ . map_err ( |e| anyhow:: anyhow!( "Failed to get range: {e}" ) ) ?;
444+
445+ let metadata = decoder. metadata ( ) . clone ( ) ;
446+ let mut metadata_cache = MetadataCache :: new ( metadata) ;
447+ let mut result: Vec < OrderBookDelta > = Vec :: new ( ) ;
448+
449+ let mut process_record = |record : dbn:: RecordRef | -> anyhow:: Result < ( ) > {
450+ let sym_map = self
451+ . symbol_venue_map
452+ . read ( )
453+ . map_err ( |e| anyhow:: anyhow!( "symbol_venue_map lock poisoned: {e}" ) ) ?;
454+ let instrument_id = decode_nautilus_instrument_id (
455+ & record,
456+ & mut metadata_cache,
457+ & self . publisher_venue_map ,
458+ & sym_map,
459+ ) ?;
460+
461+ if let Some ( msg) = record. get :: < dbn:: MboMsg > ( ) {
462+ let ( delta, _trade) =
463+ decode_mbo_msg ( msg, instrument_id, price_precision, None , false ) ?;
464+ if let Some ( delta) = delta {
465+ result. push ( delta) ;
466+ }
467+ }
468+
469+ Ok ( ( ) )
470+ } ;
471+
472+ while let Ok ( Some ( msg) ) = decoder. decode_record :: < dbn:: MboMsg > ( ) . await {
473+ process_record ( dbn:: RecordRef :: from ( msg) ) ?;
474+ }
475+
476+ Ok ( result)
477+ }
478+
407479 /// Fetches trade ticks for the given parameters.
408480 ///
409481 /// # Errors
0 commit comments