Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 34 additions & 26 deletions crates/live/src/execution/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,37 @@ impl ExecutionManager {
self.position_recon_retries.get(key).copied().unwrap_or(0)
}

/// Observes a local order event and updates tracking state.
///
/// This is the `LiveNode` dispatch path for order events: acknowledgement
/// events clear reconciliation tracking, fills record fill/position
/// activity, and every event stamps local activity. The stamp must come
/// AFTER any [`Self::clear_recon_tracking`] call - that call drops the
/// local-activity mark, which is the sole grace gate protecting a
/// just-acknowledged order from missing-order reconciliation while the
/// venue report lags.
pub fn observe_order_event(&mut self, event: &OrderEventAny) {
match event {
OrderEventAny::Filled(fill) => {
self.record_position_activity(fill.instrument_id, fill.account_id);
self.mark_fill_processed(fill.trade_id);
}
OrderEventAny::Accepted(_)
| OrderEventAny::Rejected(_)
| OrderEventAny::Canceled(_)
| OrderEventAny::Expired(_)
| OrderEventAny::Denied(_)
| OrderEventAny::Updated(_)
| OrderEventAny::ModifyRejected(_)
| OrderEventAny::CancelRejected(_) => {
self.clear_recon_tracking(&event.client_order_id(), true);
}
_ => {}
}

self.record_local_activity(event.client_order_id());
}

/// Observes an incoming execution report and updates tracking state.
///
/// This should be called **before** the report is dispatched to the execution
Expand Down Expand Up @@ -1670,32 +1701,9 @@ impl ExecutionManager {
return events;
};

let ts_now = self.clock.borrow().timestamp_ns();
let ts_last = order.ts_last();

// Domain-time recency gate: ts_last and ts_now are both domain
// timestamps, so this stays on self.clock (it is not a real-time
// settling window).
match ts_now.duration_since(&ts_last) {
// Within the recency threshold: order is genuinely too recent, defer.
Some(elapsed_ns) if elapsed_ns < self.config.open_check_threshold_ns => {
return events;
}
// Old enough: fall through to the reconciliation checks below.
Some(_) => {}
// ts_last is ahead of ts_now - impossible under a sane clock.
// A corrupted far-future ts_last (for example a double-scaled
// timestamp) would otherwise stall this order's reconciliation
// forever with no signal, so warn before deferring.
None => {
log::warn!(
"Order {client_order_id} has venue ts_last {ts_last} ahead of local ts_now {ts_now}; deferring reconciliation"
);
return events;
}
}

// Check local activity threshold
// Recent local activity is the real-time settling window for missing
// orders. Venue/domain timestamps can be ahead of the trading clock and
// must not stall reconciliation.
if self.order_local_activity.within(
&client_order_id,
Duration::from_nanos(self.config.open_check_threshold_ns),
Expand Down
16 changes: 16 additions & 0 deletions crates/live/src/node/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,10 @@ impl From<LiveExecEngineConfig> for ExecutionEngineConfig {
allow_overfills: config.allow_overfills,
filter_unclaimed_external_orders: config.filter_unclaimed_external_orders,
external_clients: config.external_clients,
// Keep purge intervals on the ExecutionEngine clock-timer path.
// LiveNode also dispatches purge checks from its maintenance loop,
// but engine timers must remain controlled by the injected Clock
// for callers using a custom live/sandbox clock factory.
purge_closed_orders_interval_mins: config.purge_closed_orders_interval_mins,
purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
purge_closed_positions_interval_mins: config.purge_closed_positions_interval_mins,
Expand Down Expand Up @@ -1285,6 +1289,12 @@ mod tests {
load_cache: false,
snapshot_positions_interval_secs: Some(30.0),
filter_unclaimed_external_orders: true,
purge_closed_orders_interval_mins: Some(5),
purge_closed_orders_buffer_mins: Some(1),
purge_closed_positions_interval_mins: Some(10),
purge_closed_positions_buffer_mins: Some(2),
purge_account_events_interval_mins: Some(15),
purge_account_events_lookback_mins: Some(3),
..Default::default()
};

Expand All @@ -1293,6 +1303,12 @@ mod tests {
assert!(!converted.load_cache);
assert_eq!(converted.snapshot_positions_interval_secs, Some(30.0));
assert!(converted.filter_unclaimed_external_orders);
assert_eq!(converted.purge_closed_orders_interval_mins, Some(5));
assert_eq!(converted.purge_closed_orders_buffer_mins, Some(1));
assert_eq!(converted.purge_closed_positions_interval_mins, Some(10));
assert_eq!(converted.purge_closed_positions_buffer_mins, Some(2));
assert_eq!(converted.purge_account_events_interval_mins, Some(15));
assert_eq!(converted.purge_account_events_lookback_mins, Some(3));
}

#[rstest]
Expand Down
30 changes: 5 additions & 25 deletions crates/live/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1161,29 +1161,7 @@ impl LiveNode {

match &evt {
ExecutionEvent::Order(order_evt) => {
self.exec_manager.record_local_activity(order_evt.client_order_id());
match order_evt {
OrderEventAny::Filled(fill) => {
self.exec_manager.record_position_activity(
fill.instrument_id,
fill.account_id,
);
self.exec_manager.mark_fill_processed(fill.trade_id);
}
OrderEventAny::Accepted(_)
| OrderEventAny::Rejected(_)
| OrderEventAny::Canceled(_)
| OrderEventAny::Expired(_)
| OrderEventAny::Denied(_)
| OrderEventAny::Updated(_)
| OrderEventAny::ModifyRejected(_)
| OrderEventAny::CancelRejected(_) => {
self.exec_manager.clear_recon_tracking(
&order_evt.client_order_id(), true,
);
}
_ => {}
}
self.exec_manager.observe_order_event(order_evt);
close_ids.push(order_evt.client_order_id());
}
ExecutionEvent::OrderSubmittedBatch(batch) => {
Expand All @@ -1193,18 +1171,20 @@ impl LiveNode {
}
ExecutionEvent::OrderAcceptedBatch(batch) => {
for accepted in &batch.events {
self.exec_manager.record_local_activity(accepted.client_order_id);
// Stamp after clearing: `clear_recon_tracking` drops the
// local-activity mark, the missing-order grace gate.
self.exec_manager.clear_recon_tracking(
&accepted.client_order_id, true,
);
self.exec_manager.record_local_activity(accepted.client_order_id);
}
}
ExecutionEvent::OrderCanceledBatch(batch) => {
for canceled in &batch.events {
self.exec_manager.record_local_activity(canceled.client_order_id);
self.exec_manager.clear_recon_tracking(
&canceled.client_order_id, true,
);
self.exec_manager.record_local_activity(canceled.client_order_id);
close_ids.push(canceled.client_order_id);
}
}
Expand Down
102 changes: 91 additions & 11 deletions crates/live/tests/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7617,31 +7617,39 @@ async fn test_check_open_orders_submitted_missing_at_venue_generates_rejected()
}
}

#[rstest]
#[tokio::test]
async fn test_check_open_orders_defers_when_venue_ts_last_is_ahead() {
// A corrupted far-future ts_last must defer reconciliation without
// rejecting or panicking.
#[cfg_attr(
not(all(feature = "simulation", madsim)),
tokio::test(start_paused = true)
)]
#[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
async fn test_check_open_orders_missing_gate_uses_local_activity_not_venue_ts_last() {
// A corrupted far-future ts_last must not stall missing-order reconciliation
// after the local activity grace expires.
let config = ExecutionManagerConfig {
open_check_threshold_ns: 0,
open_check_threshold_ns: 200_000_000,
open_check_missing_retries: 1,
open_check_open_only: false,
..Default::default()
};
let mut ctx = TestContext::with_config(config);
ctx.add_instrument(test_instrument());

let mut order = create_limit_order(
let order = create_limit_order(
"O-AHEAD",
test_instrument_id(),
OrderSide::Buy,
"10.0",
"100.0",
);
let submitted = TestOrderEventStubs::submitted(&order, test_account_id());
order.apply(submitted).unwrap();
ctx.add_order(order);
let order = ctx.cache.borrow_mut().update_order(&submitted).unwrap();

let future_ts = UnixNanos::from(10_000_000_000);
let future_ts = ctx
.clock
.borrow()
.timestamp_ns()
.saturating_add_ns(10_000_000_000_u64);
let accepted = OrderEventAny::Accepted(
OrderAcceptedSpec::builder()
.trader_id(order.trader_id())
Expand All @@ -7654,17 +7662,89 @@ async fn test_check_open_orders_defers_when_venue_ts_last_is_ahead() {
.ts_init(future_ts)
.build(),
);
order.apply(accepted).unwrap();
let client_order_id = order.client_order_id();
ctx.cache.borrow_mut().update_order(&accepted).unwrap();
// Track the event exactly as the LiveNode dispatch path does: ack cleanup
// plus the local-activity stamp, in that order.
ctx.manager.observe_order_event(&accepted);

let mock_client = MockExecutionClient::new(vec![]);
let clients: Vec<&dyn ExecutionClient> = vec![&mock_client];

let events = ctx.manager.check_open_orders(&clients).await;

assert!(
events.is_empty(),
"recent local activity should defer reconciliation",
);

advance_clock(dst::time::Duration::from_millis(250)).await;

let events = ctx.manager.check_open_orders(&clients).await;

assert_eq!(events.len(), 1);
assert!(
matches!(&events[0], OrderEventAny::Rejected(rejected) if rejected.client_order_id == client_order_id),
"far-future venue ts_last must not keep deferring reconciliation",
);
}

#[cfg_attr(
not(all(feature = "simulation", madsim)),
tokio::test(start_paused = true)
)]
#[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
async fn test_check_open_orders_defers_for_just_accepted_order() {
// Regression: the LiveNode dispatch path for Accepted performs ack cleanup
// (clear_recon_tracking) and stamps local activity via observe_order_event.
// The stamp must survive the cleanup so a just-accepted order that a
// lagging venue report omits defers until the grace expires, rather than
// being rejected as missing.
let config = ExecutionManagerConfig {
open_check_threshold_ns: 200_000_000,
open_check_missing_retries: 1,
open_check_open_only: false,
..Default::default()
};
let mut ctx = TestContext::with_config(config);
ctx.add_instrument(test_instrument());

let order = create_limit_order(
"O-ACCEPTED",
test_instrument_id(),
OrderSide::Buy,
"10.0",
"100.0",
);
let submitted = TestOrderEventStubs::submitted(&order, test_account_id());
ctx.add_order(order);
let order = ctx.cache.borrow_mut().update_order(&submitted).unwrap();
let client_order_id = order.client_order_id();

let accepted =
TestOrderEventStubs::accepted(&order, test_account_id(), VenueOrderId::from("V-ACCEPTED"));
ctx.cache.borrow_mut().update_order(&accepted).unwrap();
ctx.manager.observe_order_event(&accepted);

// Venue response lags and omits the just-accepted order
let mock_client = MockExecutionClient::new(vec![]);
let clients: Vec<&dyn ExecutionClient> = vec![&mock_client];

let events = ctx.manager.check_open_orders(&clients).await;

assert!(
events.is_empty(),
"a far-future venue ts_last must defer reconciliation, not reject or panic",
"a just-accepted order must defer missing-order reconciliation",
);

advance_clock(dst::time::Duration::from_millis(250)).await;

let events = ctx.manager.check_open_orders(&clients).await;

assert_eq!(events.len(), 1);
assert!(
matches!(&events[0], OrderEventAny::Rejected(rejected) if rejected.client_order_id == client_order_id),
"reconciliation must proceed once the local-activity grace expires",
);
}

Expand Down
Loading