Skip to content

Commit 4e379e4

Browse files
committed
Rebase missing-order reconciliation gate onto monotonic receipt time
`handle_missing_order` had a recency gate comparing the order's venue `ts_last` against `self.clock` now - a cross-axis compare that #4376 hardened against underflow but left on the trading clock. Under a custom live/sandbox clock factory the trading clock is not wall-paced (it can run accelerated or sit on a foreign epoch), so that window did not measure the real settling time it was meant to, and a corrupt far-future `ts_last` could stall the order's reconciliation. Drop the venue-`ts_last` gate. The missing-order settling window is now solely the monotonic `order_local_activity` recency gate (the `RecencyMap` from the recency-map consolidation), which measures real receipt-time elapsed at any clock speed. This also removes the warn-and-defer arm #4376 added for a far-future `ts_last`: with the cross-axis gate gone there is no longer a failure mode to warn about - the order simply reconciles once the real grace expires. Making local activity the sole gate exposed an ordering bug in the `LiveNode` dispatch path: acknowledgement events (`Accepted` et al) stamped local activity and then immediately wiped it via `clear_recon_tracking`, so a just-accepted order omitted by a lagging venue report could be falsely rejected as NOT_FOUND_AT_VENUE. The per-order-event tracking now lives in `ExecutionManager::observe_order_event`, which clears first and stamps after - matching the ordering `observe_execution_report` already used - and the node's batch accept/cancel arms are reordered the same way. Audit of the remaining live timers for the same class: the cache-purge intervals are deliberately left on the ExecutionEngine clock-timer path so they stay controlled by the injected Clock for custom-clock callers; a comment and a conversion test now pin that. Data-engine, order emulator, and core timing stay domain/deterministic, and `snapshot_positions_interval_secs` is left as-is (no live monotonic replacement). The missing-order test becomes a differential paused-time case: with a far-future venue `ts_last` present throughout, recent local activity defers, and after the monotonic grace expires reconciliation proceeds. It tracks the accepted event through `observe_order_event` - the exact `LiveNode` call - and a dedicated regression test covers the just-accepted-order deferral end to end. Coded by an LLM.
1 parent f52ad86 commit 4e379e4

4 files changed

Lines changed: 146 additions & 62 deletions

File tree

crates/live/src/execution/manager.rs

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1461,6 +1461,37 @@ impl ExecutionManager {
14611461
self.position_recon_retries.get(key).copied().unwrap_or(0)
14621462
}
14631463

1464+
/// Observes a local order event and updates tracking state.
1465+
///
1466+
/// This is the `LiveNode` dispatch path for order events: acknowledgement
1467+
/// events clear reconciliation tracking, fills record fill/position
1468+
/// activity, and every event stamps local activity. The stamp must come
1469+
/// AFTER any [`Self::clear_recon_tracking`] call - that call drops the
1470+
/// local-activity mark, which is the sole grace gate protecting a
1471+
/// just-acknowledged order from missing-order reconciliation while the
1472+
/// venue report lags.
1473+
pub fn observe_order_event(&mut self, event: &OrderEventAny) {
1474+
match event {
1475+
OrderEventAny::Filled(fill) => {
1476+
self.record_position_activity(fill.instrument_id, fill.account_id);
1477+
self.mark_fill_processed(fill.trade_id);
1478+
}
1479+
OrderEventAny::Accepted(_)
1480+
| OrderEventAny::Rejected(_)
1481+
| OrderEventAny::Canceled(_)
1482+
| OrderEventAny::Expired(_)
1483+
| OrderEventAny::Denied(_)
1484+
| OrderEventAny::Updated(_)
1485+
| OrderEventAny::ModifyRejected(_)
1486+
| OrderEventAny::CancelRejected(_) => {
1487+
self.clear_recon_tracking(&event.client_order_id(), true);
1488+
}
1489+
_ => {}
1490+
}
1491+
1492+
self.record_local_activity(event.client_order_id());
1493+
}
1494+
14641495
/// Observes an incoming execution report and updates tracking state.
14651496
///
14661497
/// This should be called **before** the report is dispatched to the execution
@@ -1670,32 +1701,9 @@ impl ExecutionManager {
16701701
return events;
16711702
};
16721703

1673-
let ts_now = self.clock.borrow().timestamp_ns();
1674-
let ts_last = order.ts_last();
1675-
1676-
// Domain-time recency gate: ts_last and ts_now are both domain
1677-
// timestamps, so this stays on self.clock (it is not a real-time
1678-
// settling window).
1679-
match ts_now.duration_since(&ts_last) {
1680-
// Within the recency threshold: order is genuinely too recent, defer.
1681-
Some(elapsed_ns) if elapsed_ns < self.config.open_check_threshold_ns => {
1682-
return events;
1683-
}
1684-
// Old enough: fall through to the reconciliation checks below.
1685-
Some(_) => {}
1686-
// ts_last is ahead of ts_now - impossible under a sane clock.
1687-
// A corrupted far-future ts_last (for example a double-scaled
1688-
// timestamp) would otherwise stall this order's reconciliation
1689-
// forever with no signal, so warn before deferring.
1690-
None => {
1691-
log::warn!(
1692-
"Order {client_order_id} has venue ts_last {ts_last} ahead of local ts_now {ts_now}; deferring reconciliation"
1693-
);
1694-
return events;
1695-
}
1696-
}
1697-
1698-
// Check local activity threshold
1704+
// Recent local activity is the real-time settling window for missing
1705+
// orders. Venue/domain timestamps can be ahead of the trading clock and
1706+
// must not stall reconciliation.
16991707
if self.order_local_activity.within(
17001708
&client_order_id,
17011709
Duration::from_nanos(self.config.open_check_threshold_ns),

crates/live/src/node/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,10 @@ impl From<LiveExecEngineConfig> for ExecutionEngineConfig {
501501
allow_overfills: config.allow_overfills,
502502
filter_unclaimed_external_orders: config.filter_unclaimed_external_orders,
503503
external_clients: config.external_clients,
504+
// Keep purge intervals on the ExecutionEngine clock-timer path.
505+
// LiveNode also dispatches purge checks from its maintenance loop,
506+
// but engine timers must remain controlled by the injected Clock
507+
// for callers using a custom live/sandbox clock factory.
504508
purge_closed_orders_interval_mins: config.purge_closed_orders_interval_mins,
505509
purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
506510
purge_closed_positions_interval_mins: config.purge_closed_positions_interval_mins,
@@ -1285,6 +1289,12 @@ mod tests {
12851289
load_cache: false,
12861290
snapshot_positions_interval_secs: Some(30.0),
12871291
filter_unclaimed_external_orders: true,
1292+
purge_closed_orders_interval_mins: Some(5),
1293+
purge_closed_orders_buffer_mins: Some(1),
1294+
purge_closed_positions_interval_mins: Some(10),
1295+
purge_closed_positions_buffer_mins: Some(2),
1296+
purge_account_events_interval_mins: Some(15),
1297+
purge_account_events_lookback_mins: Some(3),
12881298
..Default::default()
12891299
};
12901300

@@ -1293,6 +1303,12 @@ mod tests {
12931303
assert!(!converted.load_cache);
12941304
assert_eq!(converted.snapshot_positions_interval_secs, Some(30.0));
12951305
assert!(converted.filter_unclaimed_external_orders);
1306+
assert_eq!(converted.purge_closed_orders_interval_mins, Some(5));
1307+
assert_eq!(converted.purge_closed_orders_buffer_mins, Some(1));
1308+
assert_eq!(converted.purge_closed_positions_interval_mins, Some(10));
1309+
assert_eq!(converted.purge_closed_positions_buffer_mins, Some(2));
1310+
assert_eq!(converted.purge_account_events_interval_mins, Some(15));
1311+
assert_eq!(converted.purge_account_events_lookback_mins, Some(3));
12961312
}
12971313

12981314
#[rstest]

crates/live/src/node/mod.rs

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,29 +1161,7 @@ impl LiveNode {
11611161

11621162
match &evt {
11631163
ExecutionEvent::Order(order_evt) => {
1164-
self.exec_manager.record_local_activity(order_evt.client_order_id());
1165-
match order_evt {
1166-
OrderEventAny::Filled(fill) => {
1167-
self.exec_manager.record_position_activity(
1168-
fill.instrument_id,
1169-
fill.account_id,
1170-
);
1171-
self.exec_manager.mark_fill_processed(fill.trade_id);
1172-
}
1173-
OrderEventAny::Accepted(_)
1174-
| OrderEventAny::Rejected(_)
1175-
| OrderEventAny::Canceled(_)
1176-
| OrderEventAny::Expired(_)
1177-
| OrderEventAny::Denied(_)
1178-
| OrderEventAny::Updated(_)
1179-
| OrderEventAny::ModifyRejected(_)
1180-
| OrderEventAny::CancelRejected(_) => {
1181-
self.exec_manager.clear_recon_tracking(
1182-
&order_evt.client_order_id(), true,
1183-
);
1184-
}
1185-
_ => {}
1186-
}
1164+
self.exec_manager.observe_order_event(order_evt);
11871165
close_ids.push(order_evt.client_order_id());
11881166
}
11891167
ExecutionEvent::OrderSubmittedBatch(batch) => {
@@ -1193,18 +1171,20 @@ impl LiveNode {
11931171
}
11941172
ExecutionEvent::OrderAcceptedBatch(batch) => {
11951173
for accepted in &batch.events {
1196-
self.exec_manager.record_local_activity(accepted.client_order_id);
1174+
// Stamp after clearing: `clear_recon_tracking` drops the
1175+
// local-activity mark, the missing-order grace gate.
11971176
self.exec_manager.clear_recon_tracking(
11981177
&accepted.client_order_id, true,
11991178
);
1179+
self.exec_manager.record_local_activity(accepted.client_order_id);
12001180
}
12011181
}
12021182
ExecutionEvent::OrderCanceledBatch(batch) => {
12031183
for canceled in &batch.events {
1204-
self.exec_manager.record_local_activity(canceled.client_order_id);
12051184
self.exec_manager.clear_recon_tracking(
12061185
&canceled.client_order_id, true,
12071186
);
1187+
self.exec_manager.record_local_activity(canceled.client_order_id);
12081188
close_ids.push(canceled.client_order_id);
12091189
}
12101190
}

crates/live/tests/manager.rs

Lines changed: 91 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7617,31 +7617,39 @@ async fn test_check_open_orders_submitted_missing_at_venue_generates_rejected()
76177617
}
76187618
}
76197619

7620-
#[rstest]
7621-
#[tokio::test]
7622-
async fn test_check_open_orders_defers_when_venue_ts_last_is_ahead() {
7623-
// A corrupted far-future ts_last must defer reconciliation without
7624-
// rejecting or panicking.
7620+
#[cfg_attr(
7621+
not(all(feature = "simulation", madsim)),
7622+
tokio::test(start_paused = true)
7623+
)]
7624+
#[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
7625+
async fn test_check_open_orders_missing_gate_uses_local_activity_not_venue_ts_last() {
7626+
// A corrupted far-future ts_last must not stall missing-order reconciliation
7627+
// after the local activity grace expires.
76257628
let config = ExecutionManagerConfig {
7626-
open_check_threshold_ns: 0,
7629+
open_check_threshold_ns: 200_000_000,
76277630
open_check_missing_retries: 1,
76287631
open_check_open_only: false,
76297632
..Default::default()
76307633
};
76317634
let mut ctx = TestContext::with_config(config);
76327635
ctx.add_instrument(test_instrument());
76337636

7634-
let mut order = create_limit_order(
7637+
let order = create_limit_order(
76357638
"O-AHEAD",
76367639
test_instrument_id(),
76377640
OrderSide::Buy,
76387641
"10.0",
76397642
"100.0",
76407643
);
76417644
let submitted = TestOrderEventStubs::submitted(&order, test_account_id());
7642-
order.apply(submitted).unwrap();
7645+
ctx.add_order(order);
7646+
let order = ctx.cache.borrow_mut().update_order(&submitted).unwrap();
76437647

7644-
let future_ts = UnixNanos::from(10_000_000_000);
7648+
let future_ts = ctx
7649+
.clock
7650+
.borrow()
7651+
.timestamp_ns()
7652+
.saturating_add_ns(10_000_000_000_u64);
76457653
let accepted = OrderEventAny::Accepted(
76467654
OrderAcceptedSpec::builder()
76477655
.trader_id(order.trader_id())
@@ -7654,17 +7662,89 @@ async fn test_check_open_orders_defers_when_venue_ts_last_is_ahead() {
76547662
.ts_init(future_ts)
76557663
.build(),
76567664
);
7657-
order.apply(accepted).unwrap();
7665+
let client_order_id = order.client_order_id();
7666+
ctx.cache.borrow_mut().update_order(&accepted).unwrap();
7667+
// Track the event exactly as the LiveNode dispatch path does: ack cleanup
7668+
// plus the local-activity stamp, in that order.
7669+
ctx.manager.observe_order_event(&accepted);
7670+
7671+
let mock_client = MockExecutionClient::new(vec![]);
7672+
let clients: Vec<&dyn ExecutionClient> = vec![&mock_client];
7673+
7674+
let events = ctx.manager.check_open_orders(&clients).await;
7675+
7676+
assert!(
7677+
events.is_empty(),
7678+
"recent local activity should defer reconciliation",
7679+
);
7680+
7681+
advance_clock(dst::time::Duration::from_millis(250)).await;
7682+
7683+
let events = ctx.manager.check_open_orders(&clients).await;
7684+
7685+
assert_eq!(events.len(), 1);
7686+
assert!(
7687+
matches!(&events[0], OrderEventAny::Rejected(rejected) if rejected.client_order_id == client_order_id),
7688+
"far-future venue ts_last must not keep deferring reconciliation",
7689+
);
7690+
}
7691+
7692+
#[cfg_attr(
7693+
not(all(feature = "simulation", madsim)),
7694+
tokio::test(start_paused = true)
7695+
)]
7696+
#[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
7697+
async fn test_check_open_orders_defers_for_just_accepted_order() {
7698+
// Regression: the LiveNode dispatch path for Accepted performs ack cleanup
7699+
// (clear_recon_tracking) and stamps local activity via observe_order_event.
7700+
// The stamp must survive the cleanup so a just-accepted order that a
7701+
// lagging venue report omits defers until the grace expires, rather than
7702+
// being rejected as missing.
7703+
let config = ExecutionManagerConfig {
7704+
open_check_threshold_ns: 200_000_000,
7705+
open_check_missing_retries: 1,
7706+
open_check_open_only: false,
7707+
..Default::default()
7708+
};
7709+
let mut ctx = TestContext::with_config(config);
7710+
ctx.add_instrument(test_instrument());
7711+
7712+
let order = create_limit_order(
7713+
"O-ACCEPTED",
7714+
test_instrument_id(),
7715+
OrderSide::Buy,
7716+
"10.0",
7717+
"100.0",
7718+
);
7719+
let submitted = TestOrderEventStubs::submitted(&order, test_account_id());
76587720
ctx.add_order(order);
7721+
let order = ctx.cache.borrow_mut().update_order(&submitted).unwrap();
7722+
let client_order_id = order.client_order_id();
7723+
7724+
let accepted =
7725+
TestOrderEventStubs::accepted(&order, test_account_id(), VenueOrderId::from("V-ACCEPTED"));
7726+
ctx.cache.borrow_mut().update_order(&accepted).unwrap();
7727+
ctx.manager.observe_order_event(&accepted);
76597728

7729+
// Venue response lags and omits the just-accepted order
76607730
let mock_client = MockExecutionClient::new(vec![]);
76617731
let clients: Vec<&dyn ExecutionClient> = vec![&mock_client];
76627732

76637733
let events = ctx.manager.check_open_orders(&clients).await;
76647734

76657735
assert!(
76667736
events.is_empty(),
7667-
"a far-future venue ts_last must defer reconciliation, not reject or panic",
7737+
"a just-accepted order must defer missing-order reconciliation",
7738+
);
7739+
7740+
advance_clock(dst::time::Duration::from_millis(250)).await;
7741+
7742+
let events = ctx.manager.check_open_orders(&clients).await;
7743+
7744+
assert_eq!(events.len(), 1);
7745+
assert!(
7746+
matches!(&events[0], OrderEventAny::Rejected(rejected) if rejected.client_order_id == client_order_id),
7747+
"reconciliation must proceed once the local-activity grace expires",
76687748
);
76697749
}
76707750

0 commit comments

Comments
 (0)