diff --git a/crates/live/src/execution/manager.rs b/crates/live/src/execution/manager.rs index a7b8aece913c..1d2f01609025 100644 --- a/crates/live/src/execution/manager.rs +++ b/crates/live/src/execution/manager.rs @@ -1461,6 +1461,37 @@ impl ExecutionManager { self.position_recon_retries.get(key).copied().unwrap_or(0) } + /// Observes a local order event and updates tracking state. + /// + /// This is the `LiveNode` dispatch path for order events: acknowledgement + /// events clear reconciliation tracking, fills record fill/position + /// activity, and every event stamps local activity. The stamp must come + /// AFTER any [`Self::clear_recon_tracking`] call - that call drops the + /// local-activity mark, which is the sole grace gate protecting a + /// just-acknowledged order from missing-order reconciliation while the + /// venue report lags. + pub fn observe_order_event(&mut self, event: &OrderEventAny) { + match event { + OrderEventAny::Filled(fill) => { + self.record_position_activity(fill.instrument_id, fill.account_id); + self.mark_fill_processed(fill.trade_id); + } + OrderEventAny::Accepted(_) + | OrderEventAny::Rejected(_) + | OrderEventAny::Canceled(_) + | OrderEventAny::Expired(_) + | OrderEventAny::Denied(_) + | OrderEventAny::Updated(_) + | OrderEventAny::ModifyRejected(_) + | OrderEventAny::CancelRejected(_) => { + self.clear_recon_tracking(&event.client_order_id(), true); + } + _ => {} + } + + self.record_local_activity(event.client_order_id()); + } + /// Observes an incoming execution report and updates tracking state. /// /// This should be called **before** the report is dispatched to the execution @@ -1670,32 +1701,9 @@ impl ExecutionManager { return events; }; - let ts_now = self.clock.borrow().timestamp_ns(); - let ts_last = order.ts_last(); - - // Domain-time recency gate: ts_last and ts_now are both domain - // timestamps, so this stays on self.clock (it is not a real-time - // settling window). - match ts_now.duration_since(&ts_last) { - // Within the recency threshold: order is genuinely too recent, defer. - Some(elapsed_ns) if elapsed_ns < self.config.open_check_threshold_ns => { - return events; - } - // Old enough: fall through to the reconciliation checks below. - Some(_) => {} - // ts_last is ahead of ts_now - impossible under a sane clock. - // A corrupted far-future ts_last (for example a double-scaled - // timestamp) would otherwise stall this order's reconciliation - // forever with no signal, so warn before deferring. - None => { - log::warn!( - "Order {client_order_id} has venue ts_last {ts_last} ahead of local ts_now {ts_now}; deferring reconciliation" - ); - return events; - } - } - - // Check local activity threshold + // Recent local activity is the real-time settling window for missing + // orders. Venue/domain timestamps can be ahead of the trading clock and + // must not stall reconciliation. if self.order_local_activity.within( &client_order_id, Duration::from_nanos(self.config.open_check_threshold_ns), diff --git a/crates/live/src/node/config.rs b/crates/live/src/node/config.rs index 374ea8987bc7..9a9ccf91c60d 100644 --- a/crates/live/src/node/config.rs +++ b/crates/live/src/node/config.rs @@ -501,6 +501,10 @@ impl From for ExecutionEngineConfig { allow_overfills: config.allow_overfills, filter_unclaimed_external_orders: config.filter_unclaimed_external_orders, external_clients: config.external_clients, + // Keep purge intervals on the ExecutionEngine clock-timer path. + // LiveNode also dispatches purge checks from its maintenance loop, + // but engine timers must remain controlled by the injected Clock + // for callers using a custom live/sandbox clock factory. purge_closed_orders_interval_mins: config.purge_closed_orders_interval_mins, purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins, purge_closed_positions_interval_mins: config.purge_closed_positions_interval_mins, @@ -1285,6 +1289,12 @@ mod tests { load_cache: false, snapshot_positions_interval_secs: Some(30.0), filter_unclaimed_external_orders: true, + purge_closed_orders_interval_mins: Some(5), + purge_closed_orders_buffer_mins: Some(1), + purge_closed_positions_interval_mins: Some(10), + purge_closed_positions_buffer_mins: Some(2), + purge_account_events_interval_mins: Some(15), + purge_account_events_lookback_mins: Some(3), ..Default::default() }; @@ -1293,6 +1303,12 @@ mod tests { assert!(!converted.load_cache); assert_eq!(converted.snapshot_positions_interval_secs, Some(30.0)); assert!(converted.filter_unclaimed_external_orders); + assert_eq!(converted.purge_closed_orders_interval_mins, Some(5)); + assert_eq!(converted.purge_closed_orders_buffer_mins, Some(1)); + assert_eq!(converted.purge_closed_positions_interval_mins, Some(10)); + assert_eq!(converted.purge_closed_positions_buffer_mins, Some(2)); + assert_eq!(converted.purge_account_events_interval_mins, Some(15)); + assert_eq!(converted.purge_account_events_lookback_mins, Some(3)); } #[rstest] diff --git a/crates/live/src/node/mod.rs b/crates/live/src/node/mod.rs index af2921ab0b75..e41e42fd5733 100644 --- a/crates/live/src/node/mod.rs +++ b/crates/live/src/node/mod.rs @@ -1161,29 +1161,7 @@ impl LiveNode { match &evt { ExecutionEvent::Order(order_evt) => { - self.exec_manager.record_local_activity(order_evt.client_order_id()); - match order_evt { - OrderEventAny::Filled(fill) => { - self.exec_manager.record_position_activity( - fill.instrument_id, - fill.account_id, - ); - self.exec_manager.mark_fill_processed(fill.trade_id); - } - OrderEventAny::Accepted(_) - | OrderEventAny::Rejected(_) - | OrderEventAny::Canceled(_) - | OrderEventAny::Expired(_) - | OrderEventAny::Denied(_) - | OrderEventAny::Updated(_) - | OrderEventAny::ModifyRejected(_) - | OrderEventAny::CancelRejected(_) => { - self.exec_manager.clear_recon_tracking( - &order_evt.client_order_id(), true, - ); - } - _ => {} - } + self.exec_manager.observe_order_event(order_evt); close_ids.push(order_evt.client_order_id()); } ExecutionEvent::OrderSubmittedBatch(batch) => { @@ -1193,18 +1171,20 @@ impl LiveNode { } ExecutionEvent::OrderAcceptedBatch(batch) => { for accepted in &batch.events { - self.exec_manager.record_local_activity(accepted.client_order_id); + // Stamp after clearing: `clear_recon_tracking` drops the + // local-activity mark, the missing-order grace gate. self.exec_manager.clear_recon_tracking( &accepted.client_order_id, true, ); + self.exec_manager.record_local_activity(accepted.client_order_id); } } ExecutionEvent::OrderCanceledBatch(batch) => { for canceled in &batch.events { - self.exec_manager.record_local_activity(canceled.client_order_id); self.exec_manager.clear_recon_tracking( &canceled.client_order_id, true, ); + self.exec_manager.record_local_activity(canceled.client_order_id); close_ids.push(canceled.client_order_id); } } diff --git a/crates/live/tests/manager.rs b/crates/live/tests/manager.rs index 8804398e1e50..ed9429d46d83 100644 --- a/crates/live/tests/manager.rs +++ b/crates/live/tests/manager.rs @@ -7617,13 +7617,16 @@ async fn test_check_open_orders_submitted_missing_at_venue_generates_rejected() } } -#[rstest] -#[tokio::test] -async fn test_check_open_orders_defers_when_venue_ts_last_is_ahead() { - // A corrupted far-future ts_last must defer reconciliation without - // rejecting or panicking. +#[cfg_attr( + not(all(feature = "simulation", madsim)), + tokio::test(start_paused = true) +)] +#[cfg_attr(all(feature = "simulation", madsim), madsim::test)] +async fn test_check_open_orders_missing_gate_uses_local_activity_not_venue_ts_last() { + // A corrupted far-future ts_last must not stall missing-order reconciliation + // after the local activity grace expires. let config = ExecutionManagerConfig { - open_check_threshold_ns: 0, + open_check_threshold_ns: 200_000_000, open_check_missing_retries: 1, open_check_open_only: false, ..Default::default() @@ -7631,7 +7634,7 @@ async fn test_check_open_orders_defers_when_venue_ts_last_is_ahead() { let mut ctx = TestContext::with_config(config); ctx.add_instrument(test_instrument()); - let mut order = create_limit_order( + let order = create_limit_order( "O-AHEAD", test_instrument_id(), OrderSide::Buy, @@ -7639,9 +7642,14 @@ async fn test_check_open_orders_defers_when_venue_ts_last_is_ahead() { "100.0", ); let submitted = TestOrderEventStubs::submitted(&order, test_account_id()); - order.apply(submitted).unwrap(); + ctx.add_order(order); + let order = ctx.cache.borrow_mut().update_order(&submitted).unwrap(); - let future_ts = UnixNanos::from(10_000_000_000); + let future_ts = ctx + .clock + .borrow() + .timestamp_ns() + .saturating_add_ns(10_000_000_000_u64); let accepted = OrderEventAny::Accepted( OrderAcceptedSpec::builder() .trader_id(order.trader_id()) @@ -7654,9 +7662,71 @@ async fn test_check_open_orders_defers_when_venue_ts_last_is_ahead() { .ts_init(future_ts) .build(), ); - order.apply(accepted).unwrap(); + let client_order_id = order.client_order_id(); + ctx.cache.borrow_mut().update_order(&accepted).unwrap(); + // Track the event exactly as the LiveNode dispatch path does: ack cleanup + // plus the local-activity stamp, in that order. + ctx.manager.observe_order_event(&accepted); + + let mock_client = MockExecutionClient::new(vec![]); + let clients: Vec<&dyn ExecutionClient> = vec![&mock_client]; + + let events = ctx.manager.check_open_orders(&clients).await; + + assert!( + events.is_empty(), + "recent local activity should defer reconciliation", + ); + + advance_clock(dst::time::Duration::from_millis(250)).await; + + let events = ctx.manager.check_open_orders(&clients).await; + + assert_eq!(events.len(), 1); + assert!( + matches!(&events[0], OrderEventAny::Rejected(rejected) if rejected.client_order_id == client_order_id), + "far-future venue ts_last must not keep deferring reconciliation", + ); +} + +#[cfg_attr( + not(all(feature = "simulation", madsim)), + tokio::test(start_paused = true) +)] +#[cfg_attr(all(feature = "simulation", madsim), madsim::test)] +async fn test_check_open_orders_defers_for_just_accepted_order() { + // Regression: the LiveNode dispatch path for Accepted performs ack cleanup + // (clear_recon_tracking) and stamps local activity via observe_order_event. + // The stamp must survive the cleanup so a just-accepted order that a + // lagging venue report omits defers until the grace expires, rather than + // being rejected as missing. + let config = ExecutionManagerConfig { + open_check_threshold_ns: 200_000_000, + open_check_missing_retries: 1, + open_check_open_only: false, + ..Default::default() + }; + let mut ctx = TestContext::with_config(config); + ctx.add_instrument(test_instrument()); + + let order = create_limit_order( + "O-ACCEPTED", + test_instrument_id(), + OrderSide::Buy, + "10.0", + "100.0", + ); + let submitted = TestOrderEventStubs::submitted(&order, test_account_id()); ctx.add_order(order); + let order = ctx.cache.borrow_mut().update_order(&submitted).unwrap(); + let client_order_id = order.client_order_id(); + + let accepted = + TestOrderEventStubs::accepted(&order, test_account_id(), VenueOrderId::from("V-ACCEPTED")); + ctx.cache.borrow_mut().update_order(&accepted).unwrap(); + ctx.manager.observe_order_event(&accepted); + // Venue response lags and omits the just-accepted order let mock_client = MockExecutionClient::new(vec![]); let clients: Vec<&dyn ExecutionClient> = vec![&mock_client]; @@ -7664,7 +7734,17 @@ async fn test_check_open_orders_defers_when_venue_ts_last_is_ahead() { assert!( events.is_empty(), - "a far-future venue ts_last must defer reconciliation, not reject or panic", + "a just-accepted order must defer missing-order reconciliation", + ); + + advance_clock(dst::time::Duration::from_millis(250)).await; + + let events = ctx.manager.check_open_orders(&clients).await; + + assert_eq!(events.len(), 1); + assert!( + matches!(&events[0], OrderEventAny::Rejected(rejected) if rejected.client_order_id == client_order_id), + "reconciliation must proceed once the local-activity grace expires", ); }