Skip to content

Commit 900cfcd

Browse files
authored
Measure position-reconciliation grace on the monotonic clock (#4366)
The continuous position-consistency check has a grace window: when the node recently observed local activity on a position, check_position_discrepancy suppresses a cache-vs-venue discrepancy for position_check_threshold_ns, giving the local execution pipeline a moment to settle before synthesizing an EXTERNAL reconciliation position. That window is inherently a real-time duration — you want N real seconds of cover. But it was measured on self.clock: last_activity was stamped from self.clock (and before that carried from the venue event's ts_event), and the threshold was compared against a self.clock delta. self.clock is not guaranteed to advance at wall rate — a live node can be given a non-wall clock through a clock factory (e.g. a simulated or replay venue driving the node clock). When that clock runs fast or is anchored to a different epoch, the delta is not real elapsed time and the threshold no longer means real cover: an accelerated clock shrinks the window by its speed and the grace can collapse before the first reconciliation poll. Measure the grace on the monotonic dst::time clock instead: stamp last_activity from dst::time::Instant and compare via elapsed() against the threshold as a Duration. The window is now real elapsed time at any clock speed or epoch. This also aligns the grace with the reconciliation loop itself, which already schedules its polls on dst::time (LiveNode::run) — the grace was the only part measuring durations on self.clock. The continuous position check is a live-only feature and never runs under backtest, so a monotonic grace clock has no effect on backtest determinism. record_position_activity no longer takes a ts_event parameter and no longer reads self.clock; position_local_activity now holds monotonic instants. Add a regression test that records activity, races self.clock ~954 days ahead in a single step, and asserts the grace still holds; it fails against a self.clock-based grace and passes with the monotonic grace. Coded by an LLM.
1 parent fe7c00b commit 900cfcd

3 files changed

Lines changed: 110 additions & 28 deletions

File tree

crates/live/src/execution/manager.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818
//! This module provides the execution manager for reconciling execution state between
1919
//! the local cache and connected venues, as well as purging old state during live trading.
2020
21-
use std::{cell::RefCell, fmt::Debug, rc::Rc, str::FromStr, sync::LazyLock};
21+
use std::{cell::RefCell, fmt::Debug, rc::Rc, str::FromStr, sync::LazyLock, time::Duration};
2222

2323
use indexmap::{IndexMap, IndexSet};
2424
use nautilus_common::{
2525
cache::Cache,
2626
clients::ExecutionClient,
2727
clock::Clock,
2828
enums::{LogColor, LogLevel},
29+
live::dst,
2930
log_info,
3031
messages::{
3132
ExecutionReport,
@@ -271,7 +272,8 @@ pub struct ExecutionManager {
271272
recon_check_retries: IndexMap<ClientOrderId, u32>,
272273
ts_last_query: IndexMap<ClientOrderId, UnixNanos>,
273274
order_local_activity_ns: IndexMap<ClientOrderId, UnixNanos>,
274-
position_local_activity_ns: IndexMap<InstrumentAccountKey, UnixNanos>,
275+
// Monotonic (`dst::time`) instants, not `self.clock`; see `record_position_activity`.
276+
position_local_activity: IndexMap<InstrumentAccountKey, dst::time::Instant>,
275277
position_recon_retries: IndexMap<InstrumentAccountKey, u32>,
276278
recent_fills_cache: IndexMap<TradeId, UnixNanos>,
277279
}
@@ -305,7 +307,7 @@ impl ExecutionManager {
305307
recon_check_retries: IndexMap::new(),
306308
ts_last_query: IndexMap::new(),
307309
order_local_activity_ns: IndexMap::new(),
308-
position_local_activity_ns: IndexMap::new(),
310+
position_local_activity: IndexMap::new(),
309311
position_recon_retries: IndexMap::new(),
310312
recent_fills_cache: IndexMap::new(),
311313
}
@@ -1439,14 +1441,21 @@ impl ExecutionManager {
14391441
}
14401442

14411443
/// Records position activity for reconciliation tracking, scoped per (instrument, account).
1442-
pub fn record_position_activity(
1443-
&mut self,
1444-
instrument_id: InstrumentId,
1445-
account_id: AccountId,
1446-
ts_event: UnixNanos,
1447-
) {
1448-
self.position_local_activity_ns
1449-
.insert((instrument_id, account_id), ts_event);
1444+
///
1445+
/// The activity is stamped from the monotonic `dst::time` clock (real elapsed
1446+
/// time), **not** from `self.clock` and **not** from the venue event's
1447+
/// `ts_event`. The position-discrepancy grace is a real-time settling window:
1448+
/// give the local pipeline a moment to catch up before flagging a
1449+
/// cache-vs-venue gap. That is inherently wall/monotonic time; you want N
1450+
/// real seconds of cover regardless of the trading clock's epoch or speed.
1451+
/// `self.clock` can be driven off wall time (e.g. an accelerated simulated
1452+
/// venue), which would shrink the window by the clock's speed; the venue
1453+
/// `ts_event` lives on yet another axis. Measuring against the same monotonic
1454+
/// clock the reconciliation loop already schedules on keeps the grace honest.
1455+
/// See `check_position_discrepancy`.
1456+
pub fn record_position_activity(&mut self, instrument_id: InstrumentId, account_id: AccountId) {
1457+
self.position_local_activity
1458+
.insert((instrument_id, account_id), dst::time::Instant::now());
14501459
}
14511460

14521461
/// Returns the current position-reconciliation retry count for the given
@@ -1498,11 +1507,7 @@ impl ExecutionManager {
14981507
if let Some(coid) = client_order_id {
14991508
self.record_local_activity(coid);
15001509
}
1501-
self.record_position_activity(
1502-
fill_report.instrument_id,
1503-
fill_report.account_id,
1504-
fill_report.ts_event,
1505-
);
1510+
self.record_position_activity(fill_report.instrument_id, fill_report.account_id);
15061511
}
15071512
ExecutionReport::OrderWithFills(order_report, fills) => {
15081513
if let Some(client_order_id) = &order_report.client_order_id
@@ -1522,15 +1527,13 @@ impl ExecutionManager {
15221527
self.record_position_activity(
15231528
fill_report.instrument_id,
15241529
fill_report.account_id,
1525-
fill_report.ts_event,
15261530
);
15271531
}
15281532
}
15291533
ExecutionReport::Position(position_report) => {
15301534
self.record_position_activity(
15311535
position_report.instrument_id,
15321536
position_report.account_id,
1533-
position_report.ts_last,
15341537
);
15351538
}
15361539
ExecutionReport::MassStatus(_) => {
@@ -1737,8 +1740,10 @@ impl ExecutionManager {
17371740

17381741
let ts_now = self.clock.borrow().timestamp_ns();
17391742

1740-
if let Some(&last_activity) = self.position_local_activity_ns.get(&key)
1741-
&& (ts_now - last_activity) < self.config.position_check_threshold_ns
1743+
// Grace window measured on the monotonic `dst::time` clock; see `record_position_activity`.
1744+
if let Some(&last_activity) = self.position_local_activity.get(&key)
1745+
&& last_activity.elapsed()
1746+
< Duration::from_nanos(self.config.position_check_threshold_ns)
17421747
{
17431748
log::debug!(
17441749
"Skipping position reconciliation for {instrument_id}: recent activity within threshold"

crates/live/src/node/mod.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,7 +1166,6 @@ impl LiveNode {
11661166
self.exec_manager.record_position_activity(
11671167
fill.instrument_id,
11681168
fill.account_id,
1169-
fill.ts_event,
11701169
);
11711170
self.exec_manager.mark_fill_processed(fill.trade_id);
11721171
}
@@ -1406,11 +1405,8 @@ impl LiveNode {
14061405
self.exec_manager
14071406
.record_local_activity(event.client_order_id());
14081407
if let OrderEventAny::Filled(fill) = event {
1409-
self.exec_manager.record_position_activity(
1410-
fill.instrument_id,
1411-
fill.account_id,
1412-
fill.ts_event,
1413-
);
1408+
self.exec_manager
1409+
.record_position_activity(fill.instrument_id, fill.account_id);
14141410
self.exec_manager.mark_fill_processed(fill.trade_id);
14151411
}
14161412
self.kernel.exec_engine.borrow_mut().process(event);

crates/live/tests/manager.rs

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8110,9 +8110,10 @@ async fn test_position_check_activity_throttle_independent_per_account() {
81108110
ctx.add_position(&pos_b);
81118111

81128112
// Simulate recent activity on account A only: B must remain unthrottled.
8113-
let ts_now = ctx.clock.borrow().get_time_ns();
8113+
// Activity is stamped from the monotonic `dst::time` clock inside
8114+
// `record_position_activity`, so no explicit timestamp is passed.
81148115
ctx.manager
8115-
.record_position_activity(instrument_id, account_a, ts_now);
8116+
.record_position_activity(instrument_id, account_a);
81168117

81178118
// No venue report for B: treated as flat, so a discrepancy.
81188119
let mock_client = MockPositionExecutionClient::new(vec![], vec![]);
@@ -8134,6 +8135,86 @@ async fn test_position_check_activity_throttle_independent_per_account() {
81348135
);
81358136
}
81368137

8138+
#[tokio::test]
8139+
async fn test_position_check_grace_survives_accelerated_trading_clock() {
8140+
// The position-reconciliation grace is measured on the monotonic clock, not
8141+
// `self.clock` (see `record_position_activity`). Here we record local activity,
8142+
// then jump `self.clock` ~954 days forward in a single step, standing in for a
8143+
// trading clock that has raced ahead while only milliseconds of real time have
8144+
// elapsed, and assert the grace still suppresses the discrepancy.
8145+
let config = ExecutionManagerConfig {
8146+
position_check_retries: 3,
8147+
position_check_threshold_ns: 60_000_000_000, // 60s of real cover
8148+
..Default::default()
8149+
};
8150+
let mut ctx = TestContext::with_config(config);
8151+
let instrument = test_instrument();
8152+
let instrument_id = instrument.id();
8153+
let account = AccountId::from("BINANCE-A");
8154+
8155+
ctx.add_instrument(instrument.clone());
8156+
ctx.add_margin_account(account);
8157+
8158+
// Observe a fill: records local activity on the monotonic clock. The venue
8159+
// event timestamps do not feed the grace and are left arbitrary.
8160+
let ts_event = UnixNanos::from(1_000_000_000);
8161+
let fill_report = FillReport::new(
8162+
account,
8163+
instrument_id,
8164+
VenueOrderId::from("V-1"),
8165+
TradeId::from("T-1"),
8166+
OrderSide::Buy,
8167+
Quantity::from("0.06"),
8168+
Price::from("3000.00"),
8169+
Money::new(0.0, Currency::USDT()),
8170+
LiquiditySide::Taker,
8171+
Some(ClientOrderId::from("O-1")),
8172+
None, // venue_position_id
8173+
ts_event, // ts_event
8174+
ts_event, // ts_init
8175+
None, // report_id
8176+
);
8177+
ctx.manager
8178+
.observe_execution_report(&ExecutionReport::Fill(Box::new(fill_report)));
8179+
8180+
// Race the trading clock ~954 days ahead. A `self.clock`-based grace would now
8181+
// read "activity was 954 days ago" and fire; the monotonic grace must not.
8182+
let accelerated_jump_ns: u64 = 954 * 86_400 * 1_000_000_000; // 954 days in ns
8183+
ctx.advance_time(accelerated_jump_ns);
8184+
8185+
// Cache is flat but the venue reports a 0.06 long: a genuine discrepancy. The
8186+
// only thing between it and a synthesized EXTERNAL position is the grace.
8187+
let venue_report = PositionStatusReport::new(
8188+
account,
8189+
instrument_id,
8190+
PositionSideSpecified::Long,
8191+
Quantity::from("0.06"),
8192+
UnixNanos::from(accelerated_jump_ns),
8193+
UnixNanos::from(accelerated_jump_ns),
8194+
None, // report_id
8195+
None, // venue_position_id
8196+
Some(dec!(3000.00)),
8197+
);
8198+
let mock_client = MockPositionExecutionClient::new(vec![], vec![venue_report]);
8199+
let clients: Vec<&dyn ExecutionClient> = vec![&mock_client];
8200+
8201+
let events = ctx.manager.check_positions_consistency(&clients).await;
8202+
8203+
assert!(
8204+
events.is_empty(),
8205+
"grace must survive an accelerated trading clock (it is measured on the \
8206+
monotonic clock); got {} reconciliation event(s), the grace regressed \
8207+
to self.clock",
8208+
events.len(),
8209+
);
8210+
assert_eq!(
8211+
ctx.manager
8212+
.position_recon_retry_count(&(instrument_id, account)),
8213+
0,
8214+
"grace path must return before the retry counter is touched",
8215+
);
8216+
}
8217+
81378218
#[tokio::test]
81388219
async fn test_check_positions_consistency_processes_only_discrepant_account() {
81398220
// With cache and venue agreeing on account A and disagreeing on account B,

0 commit comments

Comments
 (0)