Skip to content

Commit ecfeb24

Browse files
committed
Refine handling of timers in backtest engine
1 parent dc165a4 commit ecfeb24

File tree

4 files changed

+324
-10
lines changed

4 files changed

+324
-10
lines changed

examples/backtest/notebooks/databento_test_order_book_deltas.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
# %%
2323

24+
import pandas as pd
25+
2426
from nautilus_trader.adapters.databento.data_utils import databento_data
2527
from nautilus_trader.adapters.databento.data_utils import load_catalog
2628
from nautilus_trader.backtest.node import BacktestNode
@@ -91,6 +93,19 @@ def __init__(self, config: TestOrderBookDeltasConfig):
9193

9294
def on_start(self):
9395
self.request_instrument(self.config.symbol_id)
96+
97+
# Set time alert for 1 second after start
98+
alert_time = self.clock.utc_now() + pd.Timedelta(seconds=1)
99+
self.clock.set_time_alert(
100+
"subscribe_alert",
101+
alert_time,
102+
self.on_subscribe_timer,
103+
)
104+
105+
def on_subscribe_timer(self, event):
106+
self.user_log(
107+
f"Subscribing to order book deltas after 1 second delay, clock={self.clock.utc_now()}",
108+
)
94109
self.subscribe_order_book_deltas(self.config.symbol_id)
95110

96111
def on_order_book_deltas(self, deltas):
@@ -100,12 +115,12 @@ def on_order_book_deltas(self, deltas):
100115

101116
self._deltas_count += 1
102117

103-
def user_log(self, msg, color=LogColor.GREEN):
104-
self.log.warning(f"{msg}", color=color)
105-
106118
def on_stop(self):
107119
order_book = self.cache.order_book(self.config.symbol_id)
108-
self.user_log(f"{order_book}")
120+
self.user_log(f"Final OrderBook: {order_book}")
121+
122+
def user_log(self, msg, color=LogColor.GREEN):
123+
self.log.warning(f"{msg}", color=color)
109124

110125

111126
# %% [markdown]

nautilus_trader/backtest/engine.pxd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ cdef class BacktestEngine:
124124
cdef list _response_data
125125

126126
cdef CVec _advance_time(self, uint64_t ts_now)
127+
cdef bint _process_next_timer(self)
127128
cdef void _flush_accumulator_events(self, uint64_t ts_now)
128129
cdef void _process_raw_time_event_handlers(
129130
self,

nautilus_trader/backtest/engine.pyx

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,6 @@ cdef class BacktestEngine:
823823

824824
if validate:
825825
first = data[0]
826-
827826
if hasattr(first, "instrument_id"):
828827
Condition.is_true(
829828
first.instrument_id in self._kernel.cache.instrument_ids(),
@@ -1532,12 +1531,22 @@ cdef class BacktestEngine:
15321531
break
15331532
15341533
# -- MAIN BACKTEST LOOP -----------------------------------------------#
1535-
self._last_ns = 0
1534+
self._last_ns = start_ns
15361535
cdef uint64_t raw_handlers_count = 0
15371536
cdef Data data = self._data_iterator.next()
15381537
cdef CVec raw_handlers
1538+
raw_handlers.ptr = NULL
1539+
raw_handlers.len = 0
1540+
raw_handlers.cap = 0
15391541
try:
1540-
while data is not None:
1542+
while True:
1543+
if data is None:
1544+
if self._process_next_timer():
1545+
break
1546+
1547+
data = self._data_iterator.next()
1548+
continue
1549+
15411550
if data.ts_init > end_ns:
15421551
# End of backtest
15431552
break
@@ -1593,6 +1602,7 @@ cdef class BacktestEngine:
15931602
)
15941603
if raw_handlers.ptr != NULL:
15951604
vec_time_event_handlers_drop(raw_handlers)
1605+
15961606
raw_handlers_count = 0
15971607
15981608
self._iteration += 1
@@ -1612,9 +1622,15 @@ cdef class BacktestEngine:
16121622
for exchange in self._venues.values():
16131623
exchange.process(self._kernel.clock.timestamp_ns())
16141624
1615-
# Flush remaining events at the last data timestamp
1616-
if self._last_ns > 0:
1617-
self._flush_accumulator_events(self._last_ns)
1625+
# Flush remaining events up to end_ns to ensure all timers are processed
1626+
# This handles the case where the loop ended because data.ts_init > end_ns
1627+
# (when data is None, _process_remaining_timers already flushed to end_ns, but
1628+
# flushing again is safe since _flush_accumulator_events only processes events <= ts_now)
1629+
if end_ns > 0:
1630+
if self._last_ns < end_ns:
1631+
self._last_ns = end_ns
1632+
1633+
self._flush_accumulator_events(end_ns)
16181634
16191635
cdef CVec _advance_time(self, uint64_t ts_now):
16201636
# Advance clocks and process all events before ts_now in timestamp order.
@@ -1697,6 +1713,47 @@ cdef class BacktestEngine:
16971713
empty_vec.cap = 0
16981714
return empty_vec
16991715
1716+
cdef bint _process_next_timer(self):
1717+
# Process the next chronological timer when data is exhausted.
1718+
#
1719+
# This method is used when the data stream is empty, but timers (alerts) might
1720+
# still be active. Instead of jumping to the end of the backtest, it finds the
1721+
# absolute next timer time across all component clocks and advances to it.
1722+
#
1723+
# This allows for scenarios where a timer callback might load new data on-the-fly
1724+
# (e.g. via a subscription), which should then be processed in proper sequence.
1725+
#
1726+
# Returns True if there are no more timers within the backtest range (should break),
1727+
# False otherwise (should continue the backtest loop to check for new data).
1728+
1729+
cdef list[TestClock] clocks = get_component_clocks(self._instance_id)
1730+
cdef TestClock clock
1731+
cdef uint64_t min_next_time = 0
1732+
cdef uint64_t next_timer_time
1733+
cdef str name
1734+
1735+
# 1. Process all timers up to current time
1736+
self._flush_accumulator_events(self._last_ns)
1737+
1738+
# 2. Find the absolute next timer time across all clocks
1739+
for clock in clocks:
1740+
for name in clock.timer_names:
1741+
next_timer_time = clock.next_time_ns(name)
1742+
if next_timer_time > self._last_ns:
1743+
if min_next_time == 0 or next_timer_time < min_next_time:
1744+
min_next_time = next_timer_time
1745+
1746+
if min_next_time == 0 or min_next_time > self._end_ns:
1747+
# No more timers in the backtest range
1748+
return True
1749+
1750+
# 3. Process the next closest possible timer(s)
1751+
self._last_ns = min_next_time
1752+
self._flush_accumulator_events(min_next_time)
1753+
1754+
# Return False to indicate we should continue checking for data
1755+
return False
1756+
17001757
cdef void _flush_accumulator_events(self, uint64_t ts_now):
17011758
cdef list[TestClock] clocks = get_component_clocks(self._instance_id)
17021759
cdef TestClock clock

0 commit comments

Comments
 (0)