Skip to content

Commit 12d05ac

Browse files
authored
Move live reconciliation real-time gates to the monotonic clock (#4376)
The continuous reconciliation checks measured their real-time settling windows on self.clock, which a clock factory can drive fast, anchor to a foreign epoch, or stall, so an N-second window stops meaning N real seconds. #4366 fixed this for the position grace; this moves the rest of the family onto the monotonic dst::time clock: - open-order local-activity grace (order_local_activity) - inflight submit and query timeout (InflightCheck) - recent-fills dedup TTL (recent_fills_cache) - shared open/inflight query cadence (ts_last_query) - inner reconciliation sub-check cadence gate (run_reconciliation_checks) self.clock is kept for every domain timestamp: generated event and command ts_event/ts_init, and venue lookback and purge cutoffs. The order-recency gate in handle_missing_order stays on domain time but is hardened from a panicking UnixNanos subtraction to a checked one that logs a warning and defers when a venue timestamp runs ahead of local time. Also drops the now-unused ExecutionManager::generate_timestamp_ns accessor. Tests migrate to tokio paused virtual time (test-util dev-dependency), with differential cases that advance one clock axis at a time to prove the split. Coded by an LLM.
1 parent c9480b8 commit 12d05ac

4 files changed

Lines changed: 584 additions & 168 deletions

File tree

crates/live/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ criterion = { workspace = true }
123123
madsim = { workspace = true }
124124
rstest = { workspace = true }
125125
rust_decimal_macros = { workspace = true }
126+
tokio = { workspace = true, features = ["test-util", "macros", "rt"] }
126127
toml = { workspace = true }
127128
ustr = { workspace = true }
128129

crates/live/src/execution/manager.rs

Lines changed: 90 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@ use nautilus_common::{
3939
};
4040
use nautilus_core::{
4141
UUID4, UnixNanos,
42-
datetime::{
43-
NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND, mins_to_nanos, mins_to_secs,
44-
nanos_to_millis,
45-
},
42+
datetime::{mins_to_nanos, mins_to_secs},
4643
};
4744
use nautilus_execution::{
4845
engine::ExecutionEngine,
@@ -234,9 +231,11 @@ impl ExecutionManagerConfig {
234231
struct InflightCheck {
235232
#[allow(dead_code)]
236233
pub client_order_id: ClientOrderId,
237-
pub ts_submitted: UnixNanos,
234+
pub ts_submitted: dst::time::Instant,
238235
pub retry_count: u32,
239-
pub last_query_ts: Option<UnixNanos>,
236+
// `Instant` debug output is runtime-specific and intentionally only useful
237+
// as an opaque monotonic offset.
238+
pub last_query_ts: Option<dst::time::Instant>,
240239
}
241240

242241
/// Manager for execution state.
@@ -270,12 +269,12 @@ pub struct ExecutionManager {
270269
external_order_claims: IndexMap<InstrumentId, StrategyId>,
271270
processed_fills: IndexMap<TradeId, ClientOrderId>,
272271
recon_check_retries: IndexMap<ClientOrderId, u32>,
273-
ts_last_query: IndexMap<ClientOrderId, UnixNanos>,
274-
order_local_activity_ns: IndexMap<ClientOrderId, UnixNanos>,
272+
ts_last_query: IndexMap<ClientOrderId, dst::time::Instant>,
273+
order_local_activity: IndexMap<ClientOrderId, dst::time::Instant>,
275274
// Monotonic (`dst::time`) instants, not `self.clock`; see `record_position_activity`.
276275
position_local_activity: IndexMap<InstrumentAccountKey, dst::time::Instant>,
277276
position_recon_retries: IndexMap<InstrumentAccountKey, u32>,
278-
recent_fills_cache: IndexMap<TradeId, UnixNanos>,
277+
recent_fills_cache: IndexMap<TradeId, dst::time::Instant>,
279278
}
280279

281280
impl Debug for ExecutionManager {
@@ -306,19 +305,13 @@ impl ExecutionManager {
306305
processed_fills: IndexMap::new(),
307306
recon_check_retries: IndexMap::new(),
308307
ts_last_query: IndexMap::new(),
309-
order_local_activity_ns: IndexMap::new(),
308+
order_local_activity: IndexMap::new(),
310309
position_local_activity: IndexMap::new(),
311310
position_recon_retries: IndexMap::new(),
312311
recent_fills_cache: IndexMap::new(),
313312
}
314313
}
315314

316-
/// Returns the current clock timestamp in nanoseconds.
317-
#[must_use]
318-
pub fn generate_timestamp_ns(&self) -> UnixNanos {
319-
self.clock.borrow().timestamp_ns()
320-
}
321-
322315
/// Reconciles orders and fills from a mass status report.
323316
///
324317
/// Order events are collected, sorted globally by `ts_event`, then processed through
@@ -802,13 +795,16 @@ impl ExecutionManager {
802795
/// (rejection or cancellation) based on the order's status.
803796
pub fn check_inflight_orders(&mut self) -> InflightCheckResult {
804797
let mut result = InflightCheckResult::default();
805-
let current_time = self.clock.borrow().timestamp_ns();
806-
let threshold_ns = self.config.inflight_threshold_ms * NANOSECONDS_IN_MILLISECOND;
798+
let now = dst::time::Instant::now();
799+
let threshold = Duration::from_millis(self.config.inflight_threshold_ms);
807800

808801
let mut to_check = Vec::new();
809802

810803
for (client_order_id, check) in &self.inflight_checks {
811-
if current_time - check.ts_submitted > threshold_ns {
804+
if now
805+
.checked_duration_since(check.ts_submitted)
806+
.is_some_and(|elapsed| elapsed > threshold)
807+
{
812808
to_check.push(*client_order_id);
813809
}
814810
}
@@ -824,14 +820,16 @@ impl ExecutionManager {
824820

825821
if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
826822
if let Some(last_query_ts) = check.last_query_ts
827-
&& current_time - last_query_ts < threshold_ns
823+
&& now
824+
.checked_duration_since(last_query_ts)
825+
.is_none_or(|elapsed| elapsed < threshold)
828826
{
829827
continue;
830828
}
831829

832830
check.retry_count += 1;
833-
check.last_query_ts = Some(current_time);
834-
self.ts_last_query.insert(client_order_id, current_time);
831+
check.last_query_ts = Some(now);
832+
self.ts_last_query.insert(client_order_id, now);
835833
self.recon_check_retries
836834
.insert(client_order_id, check.retry_count);
837835

@@ -875,6 +873,7 @@ impl ExecutionManager {
875873
self.clear_recon_tracking(&client_order_id, true);
876874
} else if let Some(order) = self.get_order(client_order_id) {
877875
// Intermediate retry: query the venue for current order status
876+
let ts_now = self.clock.borrow().timestamp_ns();
878877
let client_id = self.cache.borrow().client_id(&client_order_id).copied();
879878
let query = TradingCommand::QueryOrder(QueryOrder::new(
880879
order.trader_id(),
@@ -884,7 +883,7 @@ impl ExecutionManager {
884883
order.client_order_id(),
885884
order.venue_order_id(),
886885
UUID4::new(),
887-
current_time,
886+
ts_now,
888887
None,
889888
None, // correlation_id
890889
));
@@ -988,9 +987,8 @@ impl ExecutionManager {
988987
&mut self,
989988
client_ids: Option<&IndexSet<ClientId>>,
990989
) -> Vec<TradingCommand> {
991-
let current_time = self.clock.borrow().timestamp_ns();
992-
let query_delay_ns =
993-
u64::from(self.config.single_order_query_delay_ms) * NANOSECONDS_IN_MILLISECOND;
990+
let now = dst::time::Instant::now();
991+
let query_delay = Duration::from_millis(u64::from(self.config.single_order_query_delay_ms));
994992
let query_limit = self.config.max_single_order_queries_per_cycle as usize;
995993

996994
if query_limit == 0 {
@@ -1030,12 +1028,13 @@ impl ExecutionManager {
10301028
continue;
10311029
}
10321030

1033-
if let Some(&last_activity) = self.order_local_activity_ns.get(&client_order_id) {
1034-
let elapsed_ns = current_time.duration_since(&last_activity).unwrap_or(0);
1031+
if let Some(&last_activity) = self.order_local_activity.get(&client_order_id) {
1032+
let elapsed = last_activity.elapsed();
1033+
let threshold = Duration::from_nanos(self.config.open_check_threshold_ns);
10351034

1036-
if elapsed_ns < self.config.open_check_threshold_ns {
1037-
let elapsed_ms = nanos_to_millis(elapsed_ns);
1038-
let threshold_ms = nanos_to_millis(self.config.open_check_threshold_ns);
1035+
if elapsed < threshold {
1036+
let elapsed_ms = elapsed.as_millis();
1037+
let threshold_ms = threshold.as_millis();
10391038
log::debug!(
10401039
"Deferring open order query for {client_order_id}: recent local activity \
10411040
({elapsed_ms}ms < threshold={threshold_ms}ms)",
@@ -1045,14 +1044,15 @@ impl ExecutionManager {
10451044
}
10461045

10471046
if let Some(last_query_ts) = self.ts_last_query.get(&client_order_id)
1048-
&& current_time
1049-
.duration_since(last_query_ts)
1050-
.is_none_or(|elapsed_ns| elapsed_ns < query_delay_ns)
1047+
&& now
1048+
.checked_duration_since(*last_query_ts)
1049+
.is_none_or(|elapsed| elapsed < query_delay)
10511050
{
10521051
continue;
10531052
}
10541053

1055-
self.ts_last_query.insert(client_order_id, current_time);
1054+
self.ts_last_query.insert(client_order_id, now);
1055+
let ts_now = self.clock.borrow().timestamp_ns();
10561056

10571057
let cmd = TradingCommand::QueryOrder(QueryOrder::new(
10581058
order.trader_id(),
@@ -1062,7 +1062,7 @@ impl ExecutionManager {
10621062
client_order_id,
10631063
order.venue_order_id(),
10641064
UUID4::new(),
1065-
current_time,
1065+
ts_now,
10661066
None,
10671067
None,
10681068
));
@@ -1121,20 +1121,20 @@ impl ExecutionManager {
11211121
}
11221122
}
11231123

1124-
let ts_now = self.clock.borrow().timestamp_ns();
11251124
let mut events = Vec::new();
11261125

11271126
for report in all_reports {
11281127
if let Some(client_order_id) = &report.client_order_id
11291128
&& let Some(order) = self.get_order(*client_order_id)
11301129
{
11311130
// Check for recent local activity to avoid race conditions with in-flight fills
1132-
if let Some(&last_activity) = self.order_local_activity_ns.get(client_order_id) {
1133-
let elapsed_ns = ts_now.duration_since(&last_activity).unwrap_or(0);
1131+
if let Some(&last_activity) = self.order_local_activity.get(client_order_id) {
1132+
let elapsed = last_activity.elapsed();
1133+
let threshold = Duration::from_nanos(self.config.open_check_threshold_ns);
11341134

1135-
if elapsed_ns < self.config.open_check_threshold_ns {
1136-
let elapsed_ms = nanos_to_millis(elapsed_ns);
1137-
let threshold_ms = nanos_to_millis(self.config.open_check_threshold_ns);
1135+
if elapsed < threshold {
1136+
let elapsed_ms = elapsed.as_millis();
1137+
let threshold_ms = threshold.as_millis();
11381138
log::debug!(
11391139
"Deferring reconciliation for {client_order_id}: recent local activity ({elapsed_ms}ms < threshold={threshold_ms}ms)",
11401140
);
@@ -1379,7 +1379,7 @@ impl ExecutionManager {
13791379

13801380
/// Registers an order as inflight for tracking.
13811381
pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
1382-
let ts_submitted = self.clock.borrow().timestamp_ns();
1382+
let ts_submitted = dst::time::Instant::now();
13831383
self.inflight_checks.insert(
13841384
client_order_id,
13851385
InflightCheck {
@@ -1391,17 +1391,18 @@ impl ExecutionManager {
13911391
);
13921392
self.recon_check_retries.insert(client_order_id, 0);
13931393
self.ts_last_query.shift_remove(&client_order_id);
1394-
self.order_local_activity_ns.shift_remove(&client_order_id);
1394+
self.order_local_activity.shift_remove(&client_order_id);
13951395
}
13961396

13971397
/// Records local activity for the specified order.
13981398
///
1399-
/// Uses the current clock time (receipt time) instead of venue time to accurately
1400-
/// track when we last processed activity for this order. This avoids race conditions
1401-
/// where network/queue latency makes events appear "old" even though they just arrived.
1399+
/// Uses a monotonic receipt instant, not venue or domain time, to accurately
1400+
/// track when we last processed activity for this order. This avoids race
1401+
/// conditions where network/queue latency makes events appear "old" even
1402+
/// though they just arrived.
14021403
pub fn record_local_activity(&mut self, client_order_id: ClientOrderId) {
1403-
let ts_now = self.clock.borrow().timestamp_ns();
1404-
self.order_local_activity_ns.insert(client_order_id, ts_now);
1404+
self.order_local_activity
1405+
.insert(client_order_id, dst::time::Instant::now());
14051406
}
14061407

14071408
/// Clears reconciliation tracking state for an order.
@@ -1412,7 +1413,7 @@ impl ExecutionManager {
14121413
if drop_last_query {
14131414
self.ts_last_query.shift_remove(client_order_id);
14141415
}
1415-
self.order_local_activity_ns.shift_remove(client_order_id);
1416+
self.order_local_activity.shift_remove(client_order_id);
14161417
}
14171418

14181419
/// Returns any external order claim for the given instrument ID.
@@ -1548,27 +1549,31 @@ impl ExecutionManager {
15481549
self.recent_fills_cache.contains_key(trade_id)
15491550
}
15501551

1551-
/// Marks a fill as recently processed with current timestamp.
1552+
/// Marks a fill as recently processed with the current monotonic instant.
15521553
pub fn mark_fill_processed(&mut self, trade_id: TradeId) {
1553-
let ts_now = self.clock.borrow().timestamp_ns();
1554-
self.recent_fills_cache.insert(trade_id, ts_now);
1554+
self.recent_fills_cache
1555+
.insert(trade_id, dst::time::Instant::now());
15551556
}
15561557

15571558
/// Prunes expired fills from the recent fills cache.
15581559
///
15591560
/// Default TTL is 60 seconds.
1560-
#[expect(
1561-
clippy::cast_precision_loss,
1562-
clippy::cast_possible_truncation,
1563-
clippy::cast_sign_loss,
1564-
reason = "TTL is a small positive seconds value; the nanosecond product fits u64"
1565-
)]
15661561
pub fn prune_recent_fills_cache(&mut self, ttl_secs: f64) {
1567-
let ts_now = self.clock.borrow().timestamp_ns();
1568-
let ttl_ns = (ttl_secs * NANOSECONDS_IN_SECOND as f64) as u64;
1562+
// Map the f64 TTL to a Duration, reproducing the old
1563+
// (ttl_secs * NANOSECONDS_IN_SECOND) as u64 cast at the boundaries
1564+
// rather than panicking on this pub fn. The as cast saturated:
1565+
// - negative / NaN -> 0 (prune everything)
1566+
// - positive overflow / +inf -> u64::MAX (keep everything)
1567+
// try_from_secs_f64 returns Err for all three, so branch on the sign
1568+
// to keep the two behaviors distinct.
1569+
let ttl = match Duration::try_from_secs_f64(ttl_secs) {
1570+
Ok(ttl) => ttl,
1571+
Err(_) if ttl_secs > 0.0 => Duration::MAX,
1572+
Err(_) => Duration::ZERO,
1573+
};
15691574

15701575
self.recent_fills_cache
1571-
.retain(|_, &mut ts_cached| ts_now - ts_cached <= ttl_ns);
1576+
.retain(|_, &mut cached_at| cached_at.elapsed() <= ttl);
15721577
}
15731578

15741579
/// Purges closed orders from the cache that are older than the configured buffer.
@@ -1675,14 +1680,31 @@ impl ExecutionManager {
16751680
let ts_now = self.clock.borrow().timestamp_ns();
16761681
let ts_last = order.ts_last();
16771682

1678-
// Check if order is too recent
1679-
if (ts_now - ts_last) < self.config.open_check_threshold_ns {
1680-
return events;
1683+
// Domain-time recency gate: ts_last and ts_now are both domain
1684+
// timestamps, so this stays on self.clock (it is not a real-time
1685+
// settling window).
1686+
match ts_now.duration_since(&ts_last) {
1687+
// Within the recency threshold: order is genuinely too recent, defer.
1688+
Some(elapsed_ns) if elapsed_ns < self.config.open_check_threshold_ns => {
1689+
return events;
1690+
}
1691+
// Old enough: fall through to the reconciliation checks below.
1692+
Some(_) => {}
1693+
// ts_last is ahead of ts_now - impossible under a sane clock.
1694+
// A corrupted far-future ts_last (for example a double-scaled
1695+
// timestamp) would otherwise stall this order's reconciliation
1696+
// forever with no signal, so warn before deferring.
1697+
None => {
1698+
log::warn!(
1699+
"Order {client_order_id} has venue ts_last {ts_last} ahead of local ts_now {ts_now}; deferring reconciliation"
1700+
);
1701+
return events;
1702+
}
16811703
}
16821704

16831705
// Check local activity threshold
1684-
if let Some(&last_activity) = self.order_local_activity_ns.get(&client_order_id)
1685-
&& (ts_now - last_activity) < self.config.open_check_threshold_ns
1706+
if let Some(&last_activity) = self.order_local_activity.get(&client_order_id)
1707+
&& last_activity.elapsed() < Duration::from_nanos(self.config.open_check_threshold_ns)
16861708
{
16871709
return events;
16881710
}
@@ -2986,6 +3008,7 @@ impl ExecutionManager {
29863008
#[cfg(test)]
29873009
mod tests {
29883010
use nautilus_common::clock::TestClock;
3011+
use nautilus_core::datetime::NANOSECONDS_IN_SECOND;
29893012
use nautilus_model::{
29903013
enums::OmsType,
29913014
instruments::{

0 commit comments

Comments
 (0)