Skip to content

Commit da76dd1

Browse files
committed
Add Live reconciliation coverage
- Cover live exec-event pre-dispatch fill and batch paths - Assert clock factory and exec-manager config mapping - Check open-only missing orders keep submitted state
1 parent 4d3b547 commit da76dd1

3 files changed

Lines changed: 283 additions & 52 deletions

File tree

crates/live/src/node/config.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1311,6 +1311,95 @@ mod tests {
13111311
assert_eq!(converted.purge_account_events_lookback_mins, Some(3));
13121312
}
13131313

1314+
#[rstest]
1315+
fn test_live_exec_engine_config_converts_to_execution_manager_config() {
1316+
let config = LiveExecEngineConfig {
1317+
reconciliation: false,
1318+
reconciliation_lookback_mins: Some(45),
1319+
reconciliation_instrument_ids: Some(vec![
1320+
"ETHUSDT.BINANCE".to_string(),
1321+
"BTCUSDT.BINANCE".to_string(),
1322+
]),
1323+
filter_unclaimed_external_orders: true,
1324+
filter_position_reports: true,
1325+
filtered_client_order_ids: Some(vec!["O-001".to_string(), "O-002".to_string()]),
1326+
generate_missing_orders: false,
1327+
inflight_check_interval_ms: 321,
1328+
inflight_check_threshold_ms: 654,
1329+
inflight_check_retries: 7,
1330+
open_check_interval_secs: Some(1.5),
1331+
open_check_lookback_mins: Some(9),
1332+
open_check_threshold_ms: 234,
1333+
open_check_missing_retries: 4,
1334+
open_check_open_only: false,
1335+
max_single_order_queries_per_cycle: 8,
1336+
single_order_query_delay_ms: 76,
1337+
position_check_interval_secs: Some(2.5),
1338+
position_check_lookback_mins: 11,
1339+
position_check_threshold_ms: 345,
1340+
position_check_retries: 6,
1341+
purge_closed_orders_buffer_mins: Some(12),
1342+
purge_closed_positions_buffer_mins: Some(13),
1343+
purge_account_events_lookback_mins: Some(14),
1344+
purge_from_database: true,
1345+
..Default::default()
1346+
};
1347+
1348+
let converted = ExecutionManagerConfig::from(&config);
1349+
1350+
assert!(!converted.reconciliation);
1351+
assert_eq!(converted.lookback_mins, Some(45));
1352+
assert_eq!(converted.reconciliation_instrument_ids.len(), 2);
1353+
assert!(
1354+
converted
1355+
.reconciliation_instrument_ids
1356+
.contains(&InstrumentId::from("ETHUSDT.BINANCE"))
1357+
);
1358+
assert!(
1359+
converted
1360+
.reconciliation_instrument_ids
1361+
.contains(&InstrumentId::from("BTCUSDT.BINANCE"))
1362+
);
1363+
assert!(converted.filter_unclaimed_external);
1364+
assert!(converted.filter_position_reports);
1365+
assert_eq!(converted.filtered_client_order_ids.len(), 2);
1366+
assert!(
1367+
converted
1368+
.filtered_client_order_ids
1369+
.contains(&ClientOrderId::from("O-001"))
1370+
);
1371+
assert!(
1372+
converted
1373+
.filtered_client_order_ids
1374+
.contains(&ClientOrderId::from("O-002"))
1375+
);
1376+
assert!(!converted.generate_missing_orders);
1377+
assert_eq!(converted.inflight_check_interval_ms, 321);
1378+
assert_eq!(converted.inflight_threshold_ms, 654);
1379+
assert_eq!(converted.inflight_max_retries, 7);
1380+
assert_eq!(converted.open_check_interval_secs, Some(1.5));
1381+
assert_eq!(converted.open_check_lookback_mins, Some(9));
1382+
assert_eq!(
1383+
converted.open_check_threshold_ns,
1384+
234 * NANOSECONDS_IN_MILLISECOND
1385+
);
1386+
assert_eq!(converted.open_check_missing_retries, 4);
1387+
assert!(!converted.open_check_open_only);
1388+
assert_eq!(converted.max_single_order_queries_per_cycle, 8);
1389+
assert_eq!(converted.single_order_query_delay_ms, 76);
1390+
assert_eq!(converted.position_check_interval_secs, Some(2.5));
1391+
assert_eq!(converted.position_check_lookback_mins, 11);
1392+
assert_eq!(
1393+
converted.position_check_threshold_ns,
1394+
345 * NANOSECONDS_IN_MILLISECOND
1395+
);
1396+
assert_eq!(converted.position_check_retries, 6);
1397+
assert_eq!(converted.purge_closed_orders_buffer_mins, Some(12));
1398+
assert_eq!(converted.purge_closed_positions_buffer_mins, Some(13));
1399+
assert_eq!(converted.purge_account_events_lookback_mins, Some(14));
1400+
assert!(converted.purge_from_database);
1401+
}
1402+
13141403
#[rstest]
13151404
fn test_live_risk_engine_config_converts_to_risk_engine_config() {
13161405
let config = LiveRiskEngineConfig {

crates/live/src/node/mod.rs

Lines changed: 161 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,56 +1157,15 @@ impl LiveNode {
11571157
residual_events += 1;
11581158
}
11591159

1160-
let mut close_ids: Vec<ClientOrderId> = Vec::new();
1161-
1162-
match &evt {
1163-
ExecutionEvent::Order(order_evt) => {
1164-
self.exec_manager.observe_order_event(order_evt);
1165-
close_ids.push(order_evt.client_order_id());
1166-
}
1167-
ExecutionEvent::OrderSubmittedBatch(batch) => {
1168-
for submitted in &batch.events {
1169-
self.exec_manager.record_local_activity(submitted.client_order_id);
1170-
}
1171-
}
1172-
ExecutionEvent::OrderAcceptedBatch(batch) => {
1173-
for accepted in &batch.events {
1174-
// Stamp after clearing: `clear_recon_tracking` drops the
1175-
// local-activity mark, the missing-order grace gate.
1176-
self.exec_manager.clear_recon_tracking(
1177-
&accepted.client_order_id, true,
1178-
);
1179-
self.exec_manager.record_local_activity(accepted.client_order_id);
1180-
}
1181-
}
1182-
ExecutionEvent::OrderCanceledBatch(batch) => {
1183-
for canceled in &batch.events {
1184-
self.exec_manager.clear_recon_tracking(
1185-
&canceled.client_order_id, true,
1186-
);
1187-
self.exec_manager.record_local_activity(canceled.client_order_id);
1188-
close_ids.push(canceled.client_order_id);
1189-
}
1190-
}
1191-
ExecutionEvent::Report(report) => {
1192-
if let ExecutionReport::Fill(fill_report) = report
1193-
&& self.exec_manager.is_fill_recently_processed(&fill_report.trade_id) {
1194-
log::debug!(
1195-
"Skipping recently processed fill report: {}",
1196-
fill_report.trade_id,
1197-
);
1198-
record_runner_dispatch(
1199-
&metrics,
1200-
RunnerMetricChannel::ExecEvents,
1201-
dispatch_start,
1202-
metrics_start,
1203-
);
1204-
continue;
1205-
}
1206-
self.exec_manager.observe_execution_report(report);
1207-
}
1208-
ExecutionEvent::Account(_) => {}
1209-
}
1160+
let Some(close_ids) = self.observe_exec_event_before_dispatch(&evt) else {
1161+
record_runner_dispatch(
1162+
&metrics,
1163+
RunnerMetricChannel::ExecEvents,
1164+
dispatch_start,
1165+
metrics_start,
1166+
);
1167+
continue;
1168+
};
12101169

12111170
AsyncRunner::handle_exec_event(evt);
12121171

@@ -1504,6 +1463,60 @@ impl LiveNode {
15041463
}
15051464
}
15061465

1466+
fn observe_exec_event_before_dispatch(
1467+
&mut self,
1468+
evt: &ExecutionEvent,
1469+
) -> Option<Vec<ClientOrderId>> {
1470+
let mut close_ids = Vec::new();
1471+
1472+
match evt {
1473+
ExecutionEvent::Order(order_evt) => {
1474+
self.exec_manager.observe_order_event(order_evt);
1475+
close_ids.push(order_evt.client_order_id());
1476+
}
1477+
ExecutionEvent::OrderSubmittedBatch(batch) => {
1478+
for submitted in &batch.events {
1479+
self.exec_manager
1480+
.record_local_activity(submitted.client_order_id);
1481+
}
1482+
}
1483+
ExecutionEvent::OrderAcceptedBatch(batch) => {
1484+
for accepted in &batch.events {
1485+
self.exec_manager
1486+
.clear_recon_tracking(&accepted.client_order_id, true);
1487+
self.exec_manager
1488+
.record_local_activity(accepted.client_order_id);
1489+
}
1490+
}
1491+
ExecutionEvent::OrderCanceledBatch(batch) => {
1492+
for canceled in &batch.events {
1493+
self.exec_manager
1494+
.clear_recon_tracking(&canceled.client_order_id, true);
1495+
self.exec_manager
1496+
.record_local_activity(canceled.client_order_id);
1497+
close_ids.push(canceled.client_order_id);
1498+
}
1499+
}
1500+
ExecutionEvent::Report(report) => {
1501+
if let ExecutionReport::Fill(fill_report) = report
1502+
&& self
1503+
.exec_manager
1504+
.is_fill_recently_processed(&fill_report.trade_id)
1505+
{
1506+
log::debug!(
1507+
"Skipping recently processed fill report: {}",
1508+
fill_report.trade_id,
1509+
);
1510+
return None;
1511+
}
1512+
self.exec_manager.observe_execution_report(report);
1513+
}
1514+
ExecutionEvent::Account(_) => {}
1515+
}
1516+
1517+
Some(close_ids)
1518+
}
1519+
15071520
/// Gets the node's environment.
15081521
#[must_use]
15091522
pub fn environment(&self) -> Environment {
@@ -2305,7 +2318,7 @@ mod tests {
23052318
use nautilus_common::{
23062319
actor::DataActor,
23072320
cache::Cache,
2308-
clock::Clock,
2321+
clock::{Clock, TestClock},
23092322
enums::SerializationEncoding,
23102323
messages::execution::{SubmitOrder, TradingCommand},
23112324
msgbus::{
@@ -2321,8 +2334,10 @@ mod tests {
23212334
use nautilus_model::{
23222335
data::QuoteTick,
23232336
enums::{OmsType, OrderStatus, OrderType},
2337+
events::{OrderAcceptedBatch, order::spec::OrderAcceptedSpec},
23242338
identifiers::{
2325-
AccountId, ClientId, InstrumentId, PositionId, StrategyId, TraderId, VenueOrderId,
2339+
AccountId, ClientId, InstrumentId, PositionId, StrategyId, TradeId, TraderId,
2340+
VenueOrderId,
23262341
},
23272342
instruments::{Instrument, InstrumentAny, stubs::crypto_perpetual_ethusdt},
23282343
orders::{OrderTestBuilder, stubs::TestOrderEventStubs},
@@ -2363,6 +2378,100 @@ mod tests {
23632378
assert_eq!(output, expected);
23642379
}
23652380

2381+
#[rstest]
2382+
fn test_observe_exec_event_before_dispatch_skips_recent_fill_report() {
2383+
let config = LiveNodeConfig {
2384+
exec_engine: crate::config::LiveExecEngineConfig {
2385+
reconciliation: false,
2386+
..Default::default()
2387+
},
2388+
..Default::default()
2389+
};
2390+
let mut node = LiveNode::build("FillSkipNode".to_string(), Some(config)).unwrap();
2391+
let event = stub_exec_event();
2392+
let trade_id = TradeId::from("T-001");
2393+
2394+
let close_ids = node.observe_exec_event_before_dispatch(&event);
2395+
assert_eq!(close_ids, Some(Vec::new()));
2396+
assert!(!node.exec_manager.is_fill_recently_processed(&trade_id));
2397+
2398+
node.exec_manager.mark_fill_processed(trade_id);
2399+
2400+
let close_ids = node.observe_exec_event_before_dispatch(&event);
2401+
assert_eq!(close_ids, None);
2402+
}
2403+
2404+
#[rstest]
2405+
fn test_observe_exec_event_before_dispatch_accepted_batch_stamps_local_activity() {
2406+
let config = LiveNodeConfig {
2407+
exec_engine: crate::config::LiveExecEngineConfig {
2408+
reconciliation: true,
2409+
open_check_threshold_ms: 5_000,
2410+
single_order_query_delay_ms: 0,
2411+
..Default::default()
2412+
},
2413+
..Default::default()
2414+
};
2415+
let mut node = LiveNode::build("AcceptedBatchNode".to_string(), Some(config)).unwrap();
2416+
let account_id = AccountId::from("TEST-ACCEPTED-BATCH-001");
2417+
let client_id = ClientId::from("TEST-ACCEPTED-BATCH");
2418+
let instrument = crypto_perpetual_ethusdt();
2419+
let instrument_id = instrument.id();
2420+
let client_order_id = ClientOrderId::from("O-ACCEPTED-BATCH");
2421+
let venue_order_id = VenueOrderId::from("V-ACCEPTED-BATCH");
2422+
2423+
node.kernel
2424+
.cache
2425+
.borrow_mut()
2426+
.add_instrument(InstrumentAny::CryptoPerpetual(instrument))
2427+
.unwrap();
2428+
insert_accepted_limit_order_in_node(
2429+
&node,
2430+
account_id,
2431+
client_id,
2432+
instrument_id,
2433+
client_order_id,
2434+
venue_order_id,
2435+
);
2436+
2437+
assert_eq!(node.exec_manager.check_open_order_queries().len(), 1);
2438+
2439+
let accepted = OrderAcceptedSpec::builder()
2440+
.instrument_id(instrument_id)
2441+
.client_order_id(client_order_id)
2442+
.venue_order_id(venue_order_id)
2443+
.account_id(account_id)
2444+
.build();
2445+
let event = ExecutionEvent::OrderAcceptedBatch(OrderAcceptedBatch::new(vec![accepted]));
2446+
2447+
let close_ids = node.observe_exec_event_before_dispatch(&event);
2448+
2449+
assert_eq!(close_ids, Some(Vec::new()));
2450+
assert!(node.exec_manager.check_open_order_queries().is_empty());
2451+
}
2452+
2453+
#[rstest]
2454+
fn test_live_node_builder_clock_factory_drives_kernel_clock() {
2455+
let calls = Rc::new(Cell::new(0usize));
2456+
let calls_in_factory = calls.clone();
2457+
let sentinel = UnixNanos::from(123_456_789_u64);
2458+
2459+
let node = LiveNode::builder(TraderId::from("TESTER-001"), Environment::Sandbox)
2460+
.unwrap()
2461+
.with_reconciliation(false)
2462+
.with_clock_factory(move || {
2463+
calls_in_factory.set(calls_in_factory.get() + 1);
2464+
let mut clock = TestClock::new();
2465+
clock.advance_time(sentinel, true);
2466+
Rc::new(RefCell::new(clock)) as Rc<RefCell<dyn Clock>>
2467+
})
2468+
.build()
2469+
.unwrap();
2470+
2471+
assert_eq!(node.kernel().clock().borrow().timestamp_ns(), sentinel);
2472+
assert_eq!(calls.get(), 1);
2473+
}
2474+
23662475
#[derive(Debug)]
23672476
struct ReplayKernelEventStore {
23682477
fail_restore: bool,

crates/live/tests/manager.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7617,6 +7617,39 @@ async fn test_check_open_orders_submitted_missing_at_venue_generates_rejected()
76177617
}
76187618
}
76197619

7620+
#[rstest]
7621+
#[tokio::test]
7622+
async fn test_check_open_orders_open_only_missing_venue_order_does_not_reject() {
7623+
let config = ExecutionManagerConfig {
7624+
open_check_threshold_ns: 0,
7625+
open_check_missing_retries: 1,
7626+
open_check_open_only: true,
7627+
..Default::default()
7628+
};
7629+
let mut ctx = TestContext::with_config(config);
7630+
ctx.add_instrument(test_instrument());
7631+
7632+
let order = create_limit_order(
7633+
"O-OPEN-ONLY",
7634+
test_instrument_id(),
7635+
OrderSide::Buy,
7636+
"10.0",
7637+
"100.0",
7638+
);
7639+
let submitted = TestOrderEventStubs::submitted(&order, test_account_id());
7640+
ctx.add_order(order);
7641+
ctx.cache.borrow_mut().update_order(&submitted).unwrap();
7642+
7643+
let mock_client = MockExecutionClient::new(vec![]);
7644+
let clients: Vec<&dyn ExecutionClient> = vec![&mock_client];
7645+
7646+
let events = ctx.manager.check_open_orders(&clients).await;
7647+
7648+
assert!(events.is_empty());
7649+
let cached_order = ctx.get_order(&ClientOrderId::from("O-OPEN-ONLY")).unwrap();
7650+
assert_eq!(cached_order.status(), OrderStatus::Submitted);
7651+
}
7652+
76207653
#[cfg_attr(
76217654
not(all(feature = "simulation", madsim)),
76227655
tokio::test(start_paused = true)

0 commit comments

Comments
 (0)