Skip to content

Commit 6fb1482

Browse files
committed
Rebase missing-order reconciliation gate onto monotonic receipt time
`handle_missing_order` had a recency gate comparing the order's venue `ts_last` against `self.clock` now - a cross-axis compare that #4376 hardened against underflow but left on the trading clock. Under a custom live/sandbox clock factory the trading clock is not wall-paced (it can run accelerated or sit on a foreign epoch), so that window did not measure the real settling time it was meant to, and a corrupt far-future `ts_last` could stall the order's reconciliation. Drop the venue-`ts_last` gate. The missing-order settling window is now solely the monotonic `order_local_activity` recency gate (the `RecencyMap` from the recency-map consolidation), which measures real receipt-time elapsed at any clock speed. This also removes the warn-and-defer arm #4376 added for a far-future `ts_last`: with the cross-axis gate gone there is no longer a failure mode to warn about - the order simply reconciles once the real grace expires. Audit of the remaining live timers for the same class: the cache-purge intervals are deliberately left on the ExecutionEngine clock-timer path so they stay controlled by the injected Clock for custom-clock callers; a comment and a conversion test now pin that. Data-engine, order emulator, and core timing stay domain/deterministic, and `snapshot_positions_interval_secs` is left as-is (no live monotonic replacement). The missing-order test becomes a differential paused-time case: with a far-future venue `ts_last` present throughout, recent local activity defers, and after the monotonic grace expires reconciliation proceeds. Coded by an LLM.
1 parent e1f9f36 commit 6fb1482

3 files changed

Lines changed: 50 additions & 38 deletions

File tree

crates/live/src/execution/manager.rs

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1670,32 +1670,9 @@ impl ExecutionManager {
16701670
return events;
16711671
};
16721672

1673-
let ts_now = self.clock.borrow().timestamp_ns();
1674-
let ts_last = order.ts_last();
1675-
1676-
// Domain-time recency gate: ts_last and ts_now are both domain
1677-
// timestamps, so this stays on self.clock (it is not a real-time
1678-
// settling window).
1679-
match ts_now.duration_since(&ts_last) {
1680-
// Within the recency threshold: order is genuinely too recent, defer.
1681-
Some(elapsed_ns) if elapsed_ns < self.config.open_check_threshold_ns => {
1682-
return events;
1683-
}
1684-
// Old enough: fall through to the reconciliation checks below.
1685-
Some(_) => {}
1686-
// ts_last is ahead of ts_now - impossible under a sane clock.
1687-
// A corrupted far-future ts_last (for example a double-scaled
1688-
// timestamp) would otherwise stall this order's reconciliation
1689-
// forever with no signal, so warn before deferring.
1690-
None => {
1691-
log::warn!(
1692-
"Order {client_order_id} has venue ts_last {ts_last} ahead of local ts_now {ts_now}; deferring reconciliation"
1693-
);
1694-
return events;
1695-
}
1696-
}
1697-
1698-
// Check local activity threshold
1673+
// Recent local activity is the real-time settling window for missing
1674+
// orders. Venue/domain timestamps can be ahead of the trading clock and
1675+
// must not stall reconciliation.
16991676
if self.order_local_activity.within(
17001677
&client_order_id,
17011678
Duration::from_nanos(self.config.open_check_threshold_ns),

crates/live/src/node/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,10 @@ impl From<LiveExecEngineConfig> for ExecutionEngineConfig {
501501
allow_overfills: config.allow_overfills,
502502
filter_unclaimed_external_orders: config.filter_unclaimed_external_orders,
503503
external_clients: config.external_clients,
504+
// Keep purge intervals on the ExecutionEngine clock-timer path.
505+
// LiveNode also dispatches purge checks from its maintenance loop,
506+
// but engine timers must remain controlled by the injected Clock
507+
// for callers using a custom live/sandbox clock factory.
504508
purge_closed_orders_interval_mins: config.purge_closed_orders_interval_mins,
505509
purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
506510
purge_closed_positions_interval_mins: config.purge_closed_positions_interval_mins,
@@ -1285,6 +1289,12 @@ mod tests {
12851289
load_cache: false,
12861290
snapshot_positions_interval_secs: Some(30.0),
12871291
filter_unclaimed_external_orders: true,
1292+
purge_closed_orders_interval_mins: Some(5),
1293+
purge_closed_orders_buffer_mins: Some(1),
1294+
purge_closed_positions_interval_mins: Some(10),
1295+
purge_closed_positions_buffer_mins: Some(2),
1296+
purge_account_events_interval_mins: Some(15),
1297+
purge_account_events_lookback_mins: Some(3),
12881298
..Default::default()
12891299
};
12901300

@@ -1293,6 +1303,12 @@ mod tests {
12931303
assert!(!converted.load_cache);
12941304
assert_eq!(converted.snapshot_positions_interval_secs, Some(30.0));
12951305
assert!(converted.filter_unclaimed_external_orders);
1306+
assert_eq!(converted.purge_closed_orders_interval_mins, Some(5));
1307+
assert_eq!(converted.purge_closed_orders_buffer_mins, Some(1));
1308+
assert_eq!(converted.purge_closed_positions_interval_mins, Some(10));
1309+
assert_eq!(converted.purge_closed_positions_buffer_mins, Some(2));
1310+
assert_eq!(converted.purge_account_events_interval_mins, Some(15));
1311+
assert_eq!(converted.purge_account_events_lookback_mins, Some(3));
12961312
}
12971313

12981314
#[rstest]

crates/live/tests/manager.rs

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7617,31 +7617,39 @@ async fn test_check_open_orders_submitted_missing_at_venue_generates_rejected()
76177617
}
76187618
}
76197619

7620-
#[rstest]
7621-
#[tokio::test]
7622-
async fn test_check_open_orders_defers_when_venue_ts_last_is_ahead() {
7623-
// A corrupted far-future ts_last must defer reconciliation without
7624-
// rejecting or panicking.
7620+
#[cfg_attr(
7621+
not(all(feature = "simulation", madsim)),
7622+
tokio::test(start_paused = true)
7623+
)]
7624+
#[cfg_attr(all(feature = "simulation", madsim), madsim::test)]
7625+
async fn test_check_open_orders_missing_gate_uses_local_activity_not_venue_ts_last() {
7626+
// A corrupted far-future ts_last must not stall missing-order reconciliation
7627+
// after the local activity grace expires.
76257628
let config = ExecutionManagerConfig {
7626-
open_check_threshold_ns: 0,
7629+
open_check_threshold_ns: 200_000_000,
76277630
open_check_missing_retries: 1,
76287631
open_check_open_only: false,
76297632
..Default::default()
76307633
};
76317634
let mut ctx = TestContext::with_config(config);
76327635
ctx.add_instrument(test_instrument());
76337636

7634-
let mut order = create_limit_order(
7637+
let order = create_limit_order(
76357638
"O-AHEAD",
76367639
test_instrument_id(),
76377640
OrderSide::Buy,
76387641
"10.0",
76397642
"100.0",
76407643
);
76417644
let submitted = TestOrderEventStubs::submitted(&order, test_account_id());
7642-
order.apply(submitted).unwrap();
7645+
ctx.add_order(order);
7646+
let order = ctx.cache.borrow_mut().update_order(&submitted).unwrap();
76437647

7644-
let future_ts = UnixNanos::from(10_000_000_000);
7648+
let future_ts = ctx
7649+
.clock
7650+
.borrow()
7651+
.timestamp_ns()
7652+
.saturating_add_ns(10_000_000_000_u64);
76457653
let accepted = OrderEventAny::Accepted(
76467654
OrderAcceptedSpec::builder()
76477655
.trader_id(order.trader_id())
@@ -7654,8 +7662,9 @@ async fn test_check_open_orders_defers_when_venue_ts_last_is_ahead() {
76547662
.ts_init(future_ts)
76557663
.build(),
76567664
);
7657-
order.apply(accepted).unwrap();
7658-
ctx.add_order(order);
7665+
let client_order_id = order.client_order_id();
7666+
ctx.cache.borrow_mut().update_order(&accepted).unwrap();
7667+
ctx.manager.record_local_activity(client_order_id);
76597668

76607669
let mock_client = MockExecutionClient::new(vec![]);
76617670
let clients: Vec<&dyn ExecutionClient> = vec![&mock_client];
@@ -7664,7 +7673,17 @@ async fn test_check_open_orders_defers_when_venue_ts_last_is_ahead() {
76647673

76657674
assert!(
76667675
events.is_empty(),
7667-
"a far-future venue ts_last must defer reconciliation, not reject or panic",
7676+
"recent local activity should defer reconciliation",
7677+
);
7678+
7679+
advance_clock(dst::time::Duration::from_millis(250)).await;
7680+
7681+
let events = ctx.manager.check_open_orders(&clients).await;
7682+
7683+
assert_eq!(events.len(), 1);
7684+
assert!(
7685+
matches!(&events[0], OrderEventAny::Rejected(rejected) if rejected.client_order_id == client_order_id),
7686+
"far-future venue ts_last must not keep deferring reconciliation",
76687687
);
76697688
}
76707689

0 commit comments

Comments
 (0)