Skip to content

Commit 7a6bdca

Browse files
committed
Wire OrderEmulator lifecycle paths
- Start emulator replay through trader startup - Reset and dispose emulator state through kernel lifecycle paths - Reset emulator state in the direct backtest reset path - Skip reentrant emulator order events with clearer logging
1 parent 9521832 commit 7a6bdca

5 files changed

Lines changed: 370 additions & 8 deletions

File tree

crates/backtest/src/engine.rs

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,8 @@ impl BacktestEngine {
891891
self.kernel.risk_engine.borrow_mut().stop();
892892
self.kernel.risk_engine.borrow_mut().reset();
893893

894+
self.kernel.order_emulator.reset();
895+
894896
// Reset trader
895897
if let Err(e) = self.kernel.trader.borrow_mut().reset() {
896898
log::error!("Error resetting trader: {e:?}");
@@ -1840,19 +1842,34 @@ fn log_portfolio_performance(analyzer: &PortfolioAnalyzer) {
18401842

18411843
#[cfg(test)]
18421844
mod tests {
1843-
use nautilus_common::enums::Environment;
1845+
use nautilus_common::{
1846+
enums::Environment,
1847+
messages::{
1848+
data::{DataCommand, UnsubscribeCommand},
1849+
execution::SubmitOrder,
1850+
},
1851+
msgbus::{
1852+
self, MessagingSwitchboard,
1853+
stubs::{TypedIntoMessageSavingHandler, get_typed_into_message_saving_handler},
1854+
},
1855+
};
18441856
use nautilus_execution::engine::SnapshotAnchorer;
18451857
use nautilus_model::{
18461858
data::{Data, InstrumentStatus},
1847-
enums::{AccountType, BookType, MarketStatus, MarketStatusAction, OmsType},
1859+
enums::{
1860+
AccountType, BookType, MarketStatus, MarketStatusAction, OmsType, OrderSide, OrderType,
1861+
TriggerType,
1862+
},
18481863
identifiers::Venue,
18491864
instruments::{
18501865
CryptoPerpetual, Instrument, InstrumentAny, stubs::crypto_perpetual_ethusdt,
18511866
},
1852-
types::Money,
1867+
orders::{Order, OrderAny, OrderTestBuilder},
1868+
types::{Money, Price, Quantity},
18531869
};
18541870
use nautilus_system::{KernelEventStore, RegisteredComponents};
18551871
use rstest::*;
1872+
use ustr::Ustr;
18561873

18571874
use super::*;
18581875

@@ -1942,6 +1959,43 @@ mod tests {
19421959
engine
19431960
}
19441961

1962+
fn create_stop_market_order(instrument: &CryptoPerpetual) -> OrderAny {
1963+
OrderTestBuilder::new(OrderType::StopMarket)
1964+
.instrument_id(instrument.id())
1965+
.side(OrderSide::Buy)
1966+
.trigger_price(Price::from("5100.00"))
1967+
.quantity(Quantity::from(1))
1968+
.emulation_trigger(TriggerType::BidAsk)
1969+
.build()
1970+
}
1971+
1972+
fn create_submit_order_command(order: &OrderAny) -> SubmitOrder {
1973+
SubmitOrder::new(
1974+
order.trader_id(),
1975+
None,
1976+
order.strategy_id(),
1977+
order.instrument_id(),
1978+
order.client_order_id(),
1979+
order.init_event().clone(),
1980+
order.exec_algorithm_id(),
1981+
None,
1982+
None,
1983+
UUID4::new(),
1984+
0.into(),
1985+
None, // correlation_id
1986+
)
1987+
}
1988+
1989+
fn register_data_command_handler(id: &str) -> TypedIntoMessageSavingHandler<DataCommand> {
1990+
let (handler, saving_handler) =
1991+
get_typed_into_message_saving_handler::<DataCommand>(Some(Ustr::from(id)));
1992+
msgbus::register_data_command_endpoint(
1993+
MessagingSwitchboard::data_engine_queue_execute(),
1994+
handler,
1995+
);
1996+
saving_handler
1997+
}
1998+
19451999
#[rstest]
19462000
fn test_run_impl_event_store_replay_skips_trader_start() {
19472001
let mut engine = create_engine_with_replay_store(false);
@@ -2028,6 +2082,43 @@ mod tests {
20282082
);
20292083
}
20302084

2085+
#[rstest]
2086+
fn test_reset_resets_order_emulator_state(crypto_perpetual_ethusdt: CryptoPerpetual) {
2087+
let mut engine = create_engine();
2088+
let data_commands =
2089+
register_data_command_handler("DataEngine.queue_execute.backtest_reset");
2090+
let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
2091+
let instrument_id = instrument.id();
2092+
engine.add_instrument(&instrument).unwrap();
2093+
let order = create_stop_market_order(&crypto_perpetual_ethusdt);
2094+
let command = create_submit_order_command(&order);
2095+
engine
2096+
.kernel
2097+
.cache
2098+
.borrow_mut()
2099+
.add_order(order, None, None, false)
2100+
.unwrap();
2101+
let order_emulator = engine.kernel.order_emulator.emulator();
2102+
let mut order_emulator = order_emulator.borrow_mut();
2103+
order_emulator.cache_submit_order_command(command.clone());
2104+
order_emulator.handle_submit_order(command);
2105+
drop(order_emulator);
2106+
data_commands.clear();
2107+
2108+
engine.reset();
2109+
2110+
let commands = data_commands.get_messages();
2111+
let emulator = engine.kernel.order_emulator.get_emulator();
2112+
assert!(emulator.subscribed_quotes().is_empty());
2113+
assert!(emulator.subscribed_trades().is_empty());
2114+
assert!(emulator.get_matching_core(&instrument_id).is_none());
2115+
assert!(commands.iter().any(|command| matches!(
2116+
command,
2117+
DataCommand::Unsubscribe(UnsubscribeCommand::Quotes(command))
2118+
if command.instrument_id == instrument_id
2119+
)));
2120+
}
2121+
20312122
#[rstest]
20322123
fn test_route_data_to_exchange_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
20332124
let mut engine = create_engine();

crates/execution/src/order_emulator/adapter.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,20 @@ impl OrderEmulatorAdapter {
4949
pub fn emulator(&self) -> Rc<RefCell<OrderEmulator>> {
5050
self.emulator.clone()
5151
}
52+
53+
pub fn start(&self) {
54+
self.emulator.borrow_mut().start();
55+
}
56+
57+
pub fn stop(&self) {
58+
self.emulator.borrow().stop();
59+
}
60+
61+
pub fn reset(&self) {
62+
self.emulator.borrow_mut().reset();
63+
}
64+
65+
pub fn dispose(&self) {
66+
self.emulator.borrow_mut().dispose();
67+
}
5268
}

crates/execution/src/order_emulator/emulator.rs

Lines changed: 87 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ impl OrderEmulator {
339339
None, // correlation_id
340340
);
341341

342+
self.manager.cache_submit_order_command(command.clone());
342343
self.handle_submit_order(command);
343344
}
344345

@@ -524,8 +525,8 @@ impl OrderEmulator {
524525
self.dispatch_manager_actions(actions);
525526
return;
526527
}
527-
528-
self.check_monitoring(command.strategy_id, command.position_id);
528+
let strategy_id = command.strategy_id;
529+
let position_id = command.position_id;
529530

530531
// Get or create matching core
531532
let trigger_instrument_id = order
@@ -672,6 +673,8 @@ impl OrderEmulator {
672673
);
673674
}
674675

676+
self.check_monitoring(strategy_id, position_id);
677+
675678
// Since we are cloning the matching core, we need to insert it back into the original hashmap
676679
self.matching_cores
677680
.insert(trigger_instrument_id, matching_core);
@@ -1575,11 +1578,11 @@ mod tests {
15751578
use nautilus_model::{
15761579
data::{QuoteTick, TradeTick},
15771580
enums::{AggressorSide, OrderSide, OrderType, TriggerType},
1578-
identifiers::{ClientOrderId, StrategyId, TradeId, TraderId},
1581+
identifiers::{ClientOrderId, OrderListId, StrategyId, TradeId, TraderId},
15791582
instruments::{
15801583
CryptoPerpetual, Instrument, InstrumentAny, stubs::crypto_perpetual_ethusdt,
15811584
},
1582-
orders::OrderTestBuilder,
1585+
orders::{OrderList, OrderTestBuilder},
15831586
types::{Price, Quantity},
15841587
};
15851588
use rstest::{fixture, rstest};
@@ -1631,6 +1634,22 @@ mod tests {
16311634
.build()
16321635
}
16331636

1637+
fn create_list_stop_market_order(
1638+
instrument: &CryptoPerpetual,
1639+
client_order_id: &str,
1640+
order_list_id: OrderListId,
1641+
) -> OrderAny {
1642+
OrderTestBuilder::new(OrderType::StopMarket)
1643+
.instrument_id(instrument.id())
1644+
.client_order_id(ClientOrderId::from(client_order_id))
1645+
.order_list_id(order_list_id)
1646+
.side(OrderSide::Buy)
1647+
.trigger_price(Price::from("5100.00"))
1648+
.quantity(Quantity::from(1))
1649+
.emulation_trigger(TriggerType::BidAsk)
1650+
.build()
1651+
}
1652+
16341653
fn create_submit_order(instrument: &CryptoPerpetual, order: &OrderAny) -> SubmitOrder {
16351654
SubmitOrder::new(
16361655
TraderId::from("TRADER-001"),
@@ -2163,6 +2182,70 @@ mod tests {
21632182
assert_eq!(core.last, None);
21642183
}
21652184

2185+
#[rstest]
2186+
fn test_submit_order_list_handles_reentrant_order_events(instrument: CryptoPerpetual) {
2187+
let (_clock, cache, emulator) = create_emulator();
2188+
let data_commands = register_data_command_handler("DataEngine.queue_execute.list");
2189+
add_instrument_to_cache(&cache, &instrument);
2190+
let order_list_id = OrderListId::from("OL-EMULATOR-001");
2191+
let first_order = create_list_stop_market_order(&instrument, "O-LIST-001", order_list_id);
2192+
let second_order = create_list_stop_market_order(&instrument, "O-LIST-002", order_list_id);
2193+
let orders = vec![first_order.clone(), second_order.clone()];
2194+
let order_list = OrderList::from_orders(&orders, 0.into());
2195+
let order_inits = orders
2196+
.iter()
2197+
.map(|order| order.init_event().clone())
2198+
.collect();
2199+
cache
2200+
.borrow_mut()
2201+
.add_order(first_order.clone(), None, None, false)
2202+
.unwrap();
2203+
cache
2204+
.borrow_mut()
2205+
.add_order(second_order.clone(), None, None, false)
2206+
.unwrap();
2207+
let command = SubmitOrderList::new(
2208+
TraderId::from("TRADER-001"),
2209+
None,
2210+
StrategyId::from("STRATEGY-001"),
2211+
order_list,
2212+
order_inits,
2213+
None,
2214+
None,
2215+
None,
2216+
UUID4::new(),
2217+
0.into(),
2218+
None,
2219+
);
2220+
2221+
emulator
2222+
.borrow_mut()
2223+
.execute(TradingCommand::SubmitOrderList(command));
2224+
2225+
let commands = data_commands.get_messages();
2226+
let cache = cache.borrow();
2227+
let first_status = cache
2228+
.order(&first_order.client_order_id())
2229+
.unwrap()
2230+
.status();
2231+
let second_status = cache
2232+
.order(&second_order.client_order_id())
2233+
.unwrap()
2234+
.status();
2235+
drop(cache);
2236+
let emulator = emulator.borrow();
2237+
2238+
assert_eq!(first_status, OrderStatus::Emulated);
2239+
assert_eq!(second_status, OrderStatus::Emulated);
2240+
assert!(emulator.get_matching_core(&instrument.id()).is_some());
2241+
assert_eq!(emulator.subscribed_quotes(), vec![instrument.id()]);
2242+
assert!(commands.iter().any(|command| matches!(
2243+
command,
2244+
DataCommand::Subscribe(SubscribeCommand::Quotes(command))
2245+
if command.instrument_id == instrument.id()
2246+
)));
2247+
}
2248+
21662249
#[rstest]
21672250
fn test_submit_order_caches_command(instrument: CryptoPerpetual) {
21682251
let (_clock, cache, emulator) = create_emulator();

crates/execution/src/order_emulator/handlers.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,18 @@ impl Handler<OrderEventAny> for OrderEmulatorOnEventHandler {
7373

7474
fn handle(&self, event: &OrderEventAny) {
7575
if let Some(emulator) = self.emulator.upgrade() {
76-
emulator.borrow_mut().on_event(event);
76+
match emulator.try_borrow_mut() {
77+
Ok(mut emulator) => emulator.on_event(event),
78+
Err(_) => {
79+
log::debug!(
80+
concat!(
81+
"Skipping reentrant order event while OrderEmulator is already handling ",
82+
"a command or event; expected for self-published emulator events: {}"
83+
),
84+
event
85+
);
86+
}
87+
}
7788
}
7889
}
7990
}

0 commit comments

Comments
 (0)