Skip to content

Commit d6be483

Browse files
committed
Add reconciliation threshold check to Rust ExecutionManager
- Port reconciliation race condition fix from Python PR #3493 to Rust - Defer reconciliation when recent local activity within threshold - Use receipt time instead of venue time for local activity tracking - Add MockExecutionClient and tests for check_open_orders threshold - Update Python _record_local_activity to use clock time for consistency
1 parent 0aa7f7f commit d6be483

File tree

6 files changed

+323
-17
lines changed

6 files changed

+323
-17
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/live/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ ustr = { workspace = true }
7474
pyo3 = { workspace = true, optional = true }
7575

7676
[dev-dependencies]
77+
async-trait = { workspace = true }
7778
criterion = { workspace = true }
7879
rstest = { workspace = true }
7980
rust_decimal_macros = { workspace = true }

crates/live/src/manager.rs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use nautilus_common::{
3232
};
3333
use nautilus_core::{
3434
UUID4, UnixNanos,
35-
datetime::{NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND},
35+
datetime::{NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND, nanos_to_millis},
3636
};
3737
use nautilus_execution::{
3838
engine::ExecutionEngine,
@@ -370,8 +370,8 @@ impl ExecutionManager {
370370
let mut fills_applied = 0usize;
371371

372372
let fill_reports = &adjusted_fill_reports;
373-
374373
let mut seen_trade_ids: AHashSet<TradeId> = AHashSet::new();
374+
375375
for fills in fill_reports.values() {
376376
for fill in fills {
377377
if !seen_trade_ids.insert(fill.trade_id) {
@@ -623,6 +623,7 @@ impl ExecutionManager {
623623
// Process orphan fills (fills without matching order reports)
624624
let processed_venue_order_ids: AHashSet<VenueOrderId> =
625625
order_reports.keys().copied().collect();
626+
626627
for (venue_order_id, fills) in fill_reports {
627628
if processed_venue_order_ids.contains(venue_order_id) {
628629
continue;
@@ -837,6 +838,7 @@ impl ExecutionManager {
837838
let threshold_ns = self.config.inflight_threshold_ms * NANOSECONDS_IN_MILLISECOND;
838839

839840
let mut to_check = Vec::new();
841+
840842
for (client_order_id, check) in &self.inflight_checks {
841843
if current_time - check.ts_submitted > threshold_ns {
842844
to_check.push(*client_order_id);
@@ -957,12 +959,27 @@ impl ExecutionManager {
957959
}
958960

959961
// Reconcile reports against cached orders
962+
let ts_now = self.clock.borrow().timestamp_ns();
960963
let mut events = Vec::new();
964+
961965
for report in all_reports {
962966
if let Some(client_order_id) = &report.client_order_id
963967
&& let Some(order) = self.get_order(client_order_id)
964968
{
969+
// Check for recent local activity to avoid race conditions with in-flight fills
970+
if let Some(&last_activity) = self.order_local_activity_ns.get(client_order_id)
971+
&& (ts_now - last_activity) < self.config.open_check_threshold_ns
972+
{
973+
let elapsed_ms = nanos_to_millis((ts_now - last_activity).as_u64());
974+
let threshold_ms = nanos_to_millis(self.config.open_check_threshold_ns);
975+
log::info!(
976+
"Deferring reconciliation for {client_order_id}: recent local activity ({elapsed_ms}ms < threshold={threshold_ms}ms)",
977+
);
978+
continue;
979+
}
980+
965981
let instrument = self.get_instrument(&report.instrument_id);
982+
966983
if let Some(event) =
967984
self.reconcile_order_report(&order, &report, instrument.as_ref())
968985
{
@@ -1103,9 +1120,13 @@ impl ExecutionManager {
11031120
}
11041121

11051122
/// Records local activity for the specified order.
1106-
pub fn record_local_activity(&mut self, client_order_id: ClientOrderId, ts_event: UnixNanos) {
1107-
self.order_local_activity_ns
1108-
.insert(client_order_id, ts_event);
1123+
///
1124+
/// Uses the current clock time (receipt time) instead of venue time to accurately
1125+
/// track when we last processed activity for this order. This avoids race conditions
1126+
/// where network/queue latency makes events appear "old" even though they just arrived.
1127+
pub fn record_local_activity(&mut self, client_order_id: ClientOrderId) {
1128+
let ts_now = self.clock.borrow().timestamp_ns();
1129+
self.order_local_activity_ns.insert(client_order_id, ts_now);
11091130
}
11101131

11111132
/// Clears reconciliation tracking state for an order.

crates/live/tests/manager.rs

Lines changed: 283 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,16 @@
2121
use std::{cell::RefCell, rc::Rc};
2222

2323
use ahash::AHashSet;
24-
use nautilus_common::{cache::Cache, clock::TestClock};
24+
use async_trait::async_trait;
25+
use nautilus_common::{
26+
cache::Cache,
27+
clients::ExecutionClient,
28+
clock::TestClock,
29+
messages::execution::{
30+
BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateOrderStatusReports, ModifyOrder,
31+
QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
32+
},
33+
};
2534
use nautilus_core::{UUID4, UnixNanos};
2635
use nautilus_execution::{
2736
engine::ExecutionEngine, reconciliation::process_mass_status_for_reconciliation,
@@ -45,7 +54,7 @@ use nautilus_model::{
4554
orders::{Order, OrderAny, OrderTestBuilder, stubs::TestOrderEventStubs},
4655
position::Position,
4756
reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48-
types::{AccountBalance, Currency, Money, Price, Quantity},
57+
types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
4958
};
5059
use rstest::rstest;
5160
use rust_decimal_macros::dec;
@@ -5846,3 +5855,275 @@ async fn test_reconciliation_instrument_ids_filters_position_reports() {
58465855
"No position should be created for excluded instrument"
58475856
);
58485857
}
5858+
5859+
struct MockExecutionClient {
5860+
client_id: ClientId,
5861+
account_id: AccountId,
5862+
venue: Venue,
5863+
order_reports: RefCell<Vec<OrderStatusReport>>,
5864+
}
5865+
5866+
impl MockExecutionClient {
5867+
fn new(order_reports: Vec<OrderStatusReport>) -> Self {
5868+
Self {
5869+
client_id: test_client_id(),
5870+
account_id: test_account_id(),
5871+
venue: test_venue(),
5872+
order_reports: RefCell::new(order_reports),
5873+
}
5874+
}
5875+
}
5876+
5877+
#[async_trait(?Send)]
5878+
impl ExecutionClient for MockExecutionClient {
5879+
fn is_connected(&self) -> bool {
5880+
true
5881+
}
5882+
5883+
fn client_id(&self) -> ClientId {
5884+
self.client_id
5885+
}
5886+
5887+
fn account_id(&self) -> AccountId {
5888+
self.account_id
5889+
}
5890+
5891+
fn venue(&self) -> Venue {
5892+
self.venue
5893+
}
5894+
5895+
fn oms_type(&self) -> OmsType {
5896+
OmsType::Hedging
5897+
}
5898+
5899+
fn get_account(&self) -> Option<AccountAny> {
5900+
None
5901+
}
5902+
5903+
fn generate_account_state(
5904+
&self,
5905+
_balances: Vec<AccountBalance>,
5906+
_margins: Vec<MarginBalance>,
5907+
_reported: bool,
5908+
_ts_event: UnixNanos,
5909+
) -> anyhow::Result<()> {
5910+
Ok(())
5911+
}
5912+
5913+
fn start(&mut self) -> anyhow::Result<()> {
5914+
Ok(())
5915+
}
5916+
5917+
fn stop(&mut self) -> anyhow::Result<()> {
5918+
Ok(())
5919+
}
5920+
5921+
fn submit_order(&self, _cmd: &SubmitOrder) -> anyhow::Result<()> {
5922+
Ok(())
5923+
}
5924+
5925+
fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
5926+
Ok(())
5927+
}
5928+
5929+
fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
5930+
Ok(())
5931+
}
5932+
5933+
fn cancel_order(&self, _cmd: &CancelOrder) -> anyhow::Result<()> {
5934+
Ok(())
5935+
}
5936+
5937+
fn cancel_all_orders(&self, _cmd: &CancelAllOrders) -> anyhow::Result<()> {
5938+
Ok(())
5939+
}
5940+
5941+
fn batch_cancel_orders(&self, _cmd: &BatchCancelOrders) -> anyhow::Result<()> {
5942+
Ok(())
5943+
}
5944+
5945+
fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
5946+
Ok(())
5947+
}
5948+
5949+
fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
5950+
Ok(())
5951+
}
5952+
5953+
async fn generate_order_status_reports(
5954+
&self,
5955+
_cmd: &GenerateOrderStatusReports,
5956+
) -> anyhow::Result<Vec<OrderStatusReport>> {
5957+
Ok(self.order_reports.borrow().clone())
5958+
}
5959+
}
5960+
5961+
#[rstest]
5962+
#[tokio::test]
5963+
async fn test_check_open_orders_defers_with_recent_local_activity() {
5964+
// Test that reconciliation is deferred when there's recent local activity
5965+
// within the threshold, to avoid race conditions with in-flight fills.
5966+
let config = ExecutionManagerConfig {
5967+
open_check_threshold_ns: 200_000_000, // 200ms threshold
5968+
..Default::default()
5969+
};
5970+
let mut ctx = TestContext::with_config(config);
5971+
ctx.add_instrument(test_instrument());
5972+
5973+
let client_order_id = ClientOrderId::from("O-001");
5974+
let venue_order_id = VenueOrderId::from("V-001");
5975+
let instrument_id = test_instrument_id();
5976+
5977+
let mut order = OrderTestBuilder::new(OrderType::Limit)
5978+
.client_order_id(client_order_id)
5979+
.instrument_id(instrument_id)
5980+
.quantity(Quantity::from("10.0"))
5981+
.price(Price::from("100.0"))
5982+
.build();
5983+
let submitted = TestOrderEventStubs::submitted(&order, test_account_id());
5984+
order.apply(submitted).unwrap();
5985+
let accepted = TestOrderEventStubs::accepted(&order, test_account_id(), venue_order_id);
5986+
order.apply(accepted).unwrap();
5987+
ctx.add_order(order.clone());
5988+
5989+
ctx.manager.record_local_activity(client_order_id);
5990+
5991+
let report = create_order_status_report(
5992+
Some(client_order_id),
5993+
venue_order_id,
5994+
instrument_id,
5995+
OrderStatus::PartiallyFilled,
5996+
Quantity::from("10.0"),
5997+
Quantity::from("5.0"),
5998+
)
5999+
.with_avg_px(100.0)
6000+
.unwrap();
6001+
6002+
let mock_client = Rc::new(MockExecutionClient::new(vec![report]));
6003+
let clients: Vec<Rc<dyn ExecutionClient>> = vec![mock_client];
6004+
6005+
let events = ctx.manager.check_open_orders(&clients).await;
6006+
6007+
assert!(
6008+
events.is_empty(),
6009+
"Reconciliation should be deferred with recent local activity"
6010+
);
6011+
let cached_order = ctx.get_order(&client_order_id).unwrap();
6012+
assert_eq!(cached_order.status(), OrderStatus::Accepted);
6013+
assert_eq!(cached_order.filled_qty(), Quantity::from("0.0"));
6014+
}
6015+
6016+
#[rstest]
6017+
#[tokio::test]
6018+
async fn test_check_open_orders_proceeds_after_threshold_exceeded() {
6019+
// Test that reconciliation proceeds when the local activity is older than
6020+
// the configured threshold.
6021+
let config = ExecutionManagerConfig {
6022+
open_check_threshold_ns: 200_000_000, // 200ms threshold
6023+
..Default::default()
6024+
};
6025+
let mut ctx = TestContext::with_config(config);
6026+
ctx.add_instrument(test_instrument());
6027+
6028+
let client_order_id = ClientOrderId::from("O-001");
6029+
let venue_order_id = VenueOrderId::from("V-001");
6030+
let instrument_id = test_instrument_id();
6031+
6032+
let mut order = OrderTestBuilder::new(OrderType::Limit)
6033+
.client_order_id(client_order_id)
6034+
.instrument_id(instrument_id)
6035+
.quantity(Quantity::from("10.0"))
6036+
.price(Price::from("100.0"))
6037+
.build();
6038+
let submitted = TestOrderEventStubs::submitted(&order, test_account_id());
6039+
order.apply(submitted).unwrap();
6040+
let accepted = TestOrderEventStubs::accepted(&order, test_account_id(), venue_order_id);
6041+
order.apply(accepted).unwrap();
6042+
ctx.add_order(order.clone());
6043+
6044+
ctx.manager.record_local_activity(client_order_id);
6045+
ctx.advance_time(500_000_000);
6046+
6047+
let report = create_order_status_report(
6048+
Some(client_order_id),
6049+
venue_order_id,
6050+
instrument_id,
6051+
OrderStatus::PartiallyFilled,
6052+
Quantity::from("10.0"),
6053+
Quantity::from("5.0"),
6054+
)
6055+
.with_avg_px(100.0)
6056+
.unwrap();
6057+
6058+
let mock_client = Rc::new(MockExecutionClient::new(vec![report]));
6059+
let clients: Vec<Rc<dyn ExecutionClient>> = vec![mock_client];
6060+
6061+
let events = ctx.manager.check_open_orders(&clients).await;
6062+
6063+
assert_eq!(
6064+
events.len(),
6065+
1,
6066+
"Reconciliation should proceed when threshold exceeded"
6067+
);
6068+
if let OrderEventAny::Filled(filled) = &events[0] {
6069+
assert_eq!(filled.last_qty, Quantity::from("5.0"));
6070+
} else {
6071+
panic!("Expected OrderFilled event, was {:?}", events[0]);
6072+
}
6073+
}
6074+
6075+
#[rstest]
6076+
#[tokio::test]
6077+
async fn test_check_open_orders_proceeds_without_local_activity() {
6078+
// Test that reconciliation proceeds normally when there's no recorded
6079+
// local activity for the order.
6080+
let config = ExecutionManagerConfig {
6081+
open_check_threshold_ns: 200_000_000, // 200ms threshold
6082+
..Default::default()
6083+
};
6084+
let mut ctx = TestContext::with_config(config);
6085+
ctx.add_instrument(test_instrument());
6086+
6087+
let client_order_id = ClientOrderId::from("O-001");
6088+
let venue_order_id = VenueOrderId::from("V-001");
6089+
let instrument_id = test_instrument_id();
6090+
6091+
let mut order = OrderTestBuilder::new(OrderType::Limit)
6092+
.client_order_id(client_order_id)
6093+
.instrument_id(instrument_id)
6094+
.quantity(Quantity::from("10.0"))
6095+
.price(Price::from("100.0"))
6096+
.build();
6097+
let submitted = TestOrderEventStubs::submitted(&order, test_account_id());
6098+
order.apply(submitted).unwrap();
6099+
let accepted = TestOrderEventStubs::accepted(&order, test_account_id(), venue_order_id);
6100+
order.apply(accepted).unwrap();
6101+
ctx.add_order(order.clone());
6102+
6103+
let report = create_order_status_report(
6104+
Some(client_order_id),
6105+
venue_order_id,
6106+
instrument_id,
6107+
OrderStatus::PartiallyFilled,
6108+
Quantity::from("10.0"),
6109+
Quantity::from("5.0"),
6110+
)
6111+
.with_avg_px(100.0)
6112+
.unwrap();
6113+
6114+
let mock_client = Rc::new(MockExecutionClient::new(vec![report]));
6115+
let clients: Vec<Rc<dyn ExecutionClient>> = vec![mock_client];
6116+
6117+
let events = ctx.manager.check_open_orders(&clients).await;
6118+
6119+
assert_eq!(
6120+
events.len(),
6121+
1,
6122+
"Reconciliation should proceed without local activity"
6123+
);
6124+
if let OrderEventAny::Filled(filled) = &events[0] {
6125+
assert_eq!(filled.last_qty, Quantity::from("5.0"));
6126+
} else {
6127+
panic!("Expected OrderFilled event, was {:?}", events[0]);
6128+
}
6129+
}

0 commit comments

Comments
 (0)