Skip to content

Commit 89b7618

Browse files
committed
Add DataEngine continuous futures requests in Rust
- Add continuous futures parsing, state, and segment dispatch - Aggregate adjusted bars across child bar, trade, and quote responses - Wire request-scoped bar chains and synthetic target instruments - Cover parsing, adjustment math, error cleanup, and request bounds - Document Rust request-path behavior and current scope
1 parent a87d1a1 commit 89b7618

6 files changed

Lines changed: 2493 additions & 67 deletions

File tree

crates/data/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ crate-type = ["rlib"]
2222

2323
[features]
2424
default = []
25-
streaming = ["dep:serde_json", "nautilus-persistence"]
25+
streaming = ["nautilus-persistence"]
2626
ffi = [
2727
"nautilus-common/ffi",
2828
"nautilus-core/ffi",
@@ -65,18 +65,17 @@ indexmap = { workspace = true }
6565
log = { workspace = true }
6666
rust_decimal = { workspace = true }
6767
serde = { workspace = true }
68+
serde_json = { workspace = true }
6869
ustr = { workspace = true }
6970

7071
alloy-primitives = { workspace = true, optional = true }
7172
pyo3 = { workspace = true, optional = true }
7273
pyo3-stub-gen = { workspace = true, optional = true }
73-
serde_json = { workspace = true, optional = true }
7474

7575
[dev-dependencies]
7676
criterion = { workspace = true }
7777
proptest = { workspace = true }
7878
rstest = { workspace = true }
79-
serde_json = { workspace = true }
8079
tokio = { workspace = true, features = ["test-util", "macros", "rt"] }
8180

8281
# Run with `cargo bench -p nautilus-data --bench engine`

crates/data/src/aggregation.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ pub trait BarAggregator: Any + Debug {
110110
/// Sets the weak reference to the aggregator wrapper (for historical mode).
111111
/// Default implementation does nothing, `TimeBarAggregator` overrides.
112112
fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
113+
/// Configures the continuous-future price adjustment for the underlying builder.
114+
fn set_adjustment(&mut self, _adjustment: Decimal, _mode: ContinuousFutureAdjustmentType) {}
113115
}
114116

115117
impl dyn BarAggregator {
@@ -427,6 +429,10 @@ impl BarAggregatorCore {
427429
let bar = self.builder.build(ts_event, ts_init);
428430
(self.handler)(bar);
429431
}
432+
433+
fn set_adjustment(&mut self, adjustment: Decimal, mode: ContinuousFutureAdjustmentType) {
434+
self.builder.set_adjustment(adjustment, mode);
435+
}
430436
}
431437

432438
macro_rules! impl_set_historical_handler {
@@ -437,6 +443,14 @@ macro_rules! impl_set_historical_handler {
437443
};
438444
}
439445

446+
macro_rules! impl_set_adjustment {
447+
() => {
448+
fn set_adjustment(&mut self, adjustment: Decimal, mode: ContinuousFutureAdjustmentType) {
449+
self.core.set_adjustment(adjustment, mode);
450+
}
451+
};
452+
}
453+
440454
/// Provides a means of building tick bars aggregated from quote and trades.
441455
///
442456
/// When received tick count reaches the step threshold of the bar
@@ -485,6 +499,7 @@ impl BarAggregator for TickBarAggregator {
485499
}
486500

487501
impl_set_historical_handler!();
502+
impl_set_adjustment!();
488503

489504
/// Apply the given update to the aggregator.
490505
fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
@@ -557,6 +572,7 @@ impl BarAggregator for TickImbalanceBarAggregator {
557572
}
558573

559574
impl_set_historical_handler!();
575+
impl_set_adjustment!();
560576

561577
/// Apply the given update to the aggregator.
562578
///
@@ -644,6 +660,7 @@ impl BarAggregator for TickRunsBarAggregator {
644660
}
645661

646662
impl_set_historical_handler!();
663+
impl_set_adjustment!();
647664

648665
/// Apply the given update to the aggregator.
649666
///
@@ -738,6 +755,7 @@ impl BarAggregator for VolumeBarAggregator {
738755
}
739756

740757
impl_set_historical_handler!();
758+
impl_set_adjustment!();
741759

742760
/// Apply the given update to the aggregator.
743761
fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
@@ -852,6 +870,7 @@ impl BarAggregator for VolumeImbalanceBarAggregator {
852870
}
853871

854872
impl_set_historical_handler!();
873+
impl_set_adjustment!();
855874

856875
/// Apply the given update to the aggregator.
857876
///
@@ -957,6 +976,7 @@ impl BarAggregator for VolumeRunsBarAggregator {
957976
}
958977

959978
impl_set_historical_handler!();
979+
impl_set_adjustment!();
960980

961981
/// Apply the given update to the aggregator.
962982
///
@@ -1074,6 +1094,7 @@ impl BarAggregator for ValueBarAggregator {
10741094
}
10751095

10761096
impl_set_historical_handler!();
1097+
impl_set_adjustment!();
10771098

10781099
/// Apply the given update to the aggregator.
10791100
fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
@@ -1220,6 +1241,7 @@ impl BarAggregator for ValueImbalanceBarAggregator {
12201241
}
12211242

12221243
impl_set_historical_handler!();
1244+
impl_set_adjustment!();
12231245

12241246
/// Apply the given update to the aggregator.
12251247
///
@@ -1388,6 +1410,7 @@ impl BarAggregator for ValueRunsBarAggregator {
13881410
}
13891411

13901412
impl_set_historical_handler!();
1413+
impl_set_adjustment!();
13911414

13921415
/// Apply the given update to the aggregator.
13931416
///
@@ -1529,6 +1552,7 @@ impl BarAggregator for RenkoBarAggregator {
15291552
}
15301553

15311554
impl_set_historical_handler!();
1555+
impl_set_adjustment!();
15321556

15331557
/// Apply the given update to the aggregator.
15341558
///
@@ -2055,6 +2079,10 @@ impl BarAggregator for TimeBarAggregator {
20552079
fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
20562080
self.start_timer_internal(aggregator_rc);
20572081
}
2082+
2083+
fn set_adjustment(&mut self, adjustment: Decimal, mode: ContinuousFutureAdjustmentType) {
2084+
self.core.set_adjustment(adjustment, mode);
2085+
}
20582086
}
20592087

20602088
fn is_below_min_size(size: f64, precision: u8) -> bool {

0 commit comments

Comments
 (0)