Skip to content

Commit aac06b2

Browse files
committed
Refine handling of timers in backtest engine
1 parent cbcfafc commit aac06b2

File tree

4 files changed

+288
-53
lines changed

4 files changed

+288
-53
lines changed

examples/backtest/notebooks/databento_test_order_book_deltas.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,18 +93,19 @@ def __init__(self, config: TestOrderBookDeltasConfig):
9393

9494
def on_start(self):
9595
self.request_instrument(self.config.symbol_id)
96-
# Set time alert for 1 second after start
9796

98-
self.on_subscribe_timer(1)
99-
# alert_time = self.clock.utc_now() + pd.Timedelta(seconds=1)
100-
# self.clock.set_time_alert(
101-
# "subscribe_alert",
102-
# alert_time,
103-
# self.on_subscribe_timer
104-
# )
97+
# Set time alert for 1 second after start
98+
alert_time = self.clock.utc_now() + pd.Timedelta(seconds=1)
99+
self.clock.set_time_alert(
100+
"subscribe_alert",
101+
alert_time,
102+
self.on_subscribe_timer,
103+
)
105104

106105
def on_subscribe_timer(self, event):
107-
self.user_log("Subscribing to order book deltas after 1 second delay")
106+
self.user_log(
107+
f"Subscribing to order book deltas after 1 second delay, clock={self.clock.utc_now()}",
108+
)
108109
self.subscribe_order_book_deltas(self.config.symbol_id)
109110

110111
def on_order_book_deltas(self, deltas):
@@ -194,7 +195,7 @@ def user_log(self, msg, color=LogColor.GREEN):
194195
configs = [
195196
BacktestRunConfig(
196197
engine=engine_config,
197-
data=[],
198+
data=[], # data,
198199
venues=venues,
199200
chunk_size=None, # use None when loading custom data
200201
start=start_time,

nautilus_trader/backtest/engine.pxd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ cdef class BacktestEngine:
124124
cdef list _response_data
125125

126126
cdef CVec _advance_time(self, uint64_t ts_now)
127-
cdef bint _process_remaining_timers(self, uint64_t end_ns)
127+
cdef bint _process_next_timer(self)
128128
cdef void _flush_accumulator_events(self, uint64_t ts_now)
129129
cdef void _process_raw_time_event_handlers(
130130
self,

nautilus_trader/backtest/engine.pyx

Lines changed: 35 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,6 @@ cdef class BacktestEngine:
823823

824824
if validate:
825825
first = data[0]
826-
827826
if hasattr(first, "instrument_id"):
828827
Condition.is_true(
829828
first.instrument_id in self._kernel.cache.instrument_ids(),
@@ -1532,14 +1531,17 @@ cdef class BacktestEngine:
15321531
break
15331532
15341533
# -- MAIN BACKTEST LOOP -----------------------------------------------#
1535-
self._last_ns = 0
1534+
self._last_ns = start_ns
15361535
cdef uint64_t raw_handlers_count = 0
15371536
cdef Data data = self._data_iterator.next()
15381537
cdef CVec raw_handlers
1538+
raw_handlers.ptr = NULL
1539+
raw_handlers.len = 0
1540+
raw_handlers.cap = 0
15391541
try:
15401542
while True:
15411543
if data is None:
1542-
if self._process_remaining_timers(end_ns):
1544+
if self._process_next_timer():
15431545
break
15441546
15451547
data = self._data_iterator.next()
@@ -1711,55 +1713,46 @@ cdef class BacktestEngine:
17111713
empty_vec.cap = 0
17121714
return empty_vec
17131715
1714-
cdef bint _process_remaining_timers(self, uint64_t end_ns):
1715-
# Process remaining timers when data is exhausted.
1716-
# Returns True if there are no more timers (should break), False otherwise (should continue).
1716+
cdef bint _process_next_timer(self):
1717+
# Process the next chronological timer when data is exhausted.
1718+
#
1719+
# This method is used when the data stream is empty, but timers (alerts) might
1720+
# still be active. Instead of jumping to the end of the backtest, it finds the
1721+
# absolute next timer time across all component clocks and advances to it.
1722+
#
1723+
# This allows for scenarios where a timer callback might load new data on-the-fly
1724+
# (e.g. via a subscription), which should then be processed in proper sequence.
1725+
#
1726+
# Returns True if there are no more timers within the backtest range (should break),
1727+
# False otherwise (should continue the backtest loop to check for new data).
17171728
17181729
cdef list[TestClock] clocks = get_component_clocks(self._instance_id)
17191730
cdef TestClock clock
1720-
cdef bint has_timers
1731+
cdef uint64_t min_next_time = 0
17211732
cdef uint64_t next_timer_time
1733+
cdef str name
17221734
1723-
# If we've already advanced to end_ns or beyond, and there are still timers,
1724-
# they must be beyond end_ns, so we should break to avoid infinite loop
1725-
if self._last_ns >= end_ns:
1726-
return True
1735+
# 1. Process all timers up to current time
1736+
self._flush_accumulator_events(self._last_ns)
17271737
1728-
# Advance accumulator to capture timers from clocks
1738+
# 2. Find the absolute next timer time across all clocks
17291739
for clock in clocks:
1730-
time_event_accumulator_advance_clock(
1731-
&self._accumulator,
1732-
&clock._mem,
1733-
end_ns,
1734-
False,
1735-
)
1736-
1737-
# Check if there are any timers that can be processed (next time <= end_ns)
1738-
next_timer_time = time_event_accumulator_peek_next_time(&self._accumulator)
1739-
if next_timer_time == 0:
1740-
# No timers in accumulator, break
1740+
for name in clock.timer_names:
1741+
next_timer_time = clock.next_time_ns(name)
1742+
if next_timer_time > self._last_ns:
1743+
if min_next_time == 0 or next_timer_time < min_next_time:
1744+
min_next_time = next_timer_time
1745+
1746+
if min_next_time == 0 or min_next_time > self._end_ns:
1747+
# No more timers in the backtest range
17411748
return True
17421749
1743-
if next_timer_time > end_ns:
1744-
# All timers are beyond end_ns, break to avoid infinite loop
1745-
return True
1746-
1747-
# Advance time and flush events up to end_ns to process all remaining timers
1748-
self._last_ns = end_ns
1749-
self._flush_accumulator_events(end_ns)
1750-
1751-
# Set clock time after flushing
1752-
for clock in clocks:
1753-
clock.set_time(end_ns)
1754-
1755-
set_logging_clock_static_time(end_ns)
1756-
1757-
if LOGGING_PYO3:
1758-
nautilus_pyo3.logging_clock_set_static_time(end_ns)
1750+
# 3. Process the next closest possible timer(s)
1751+
self._last_ns = min_next_time
1752+
self._flush_accumulator_events(min_next_time)
17591753
1760-
# After flushing up to end_ns, any remaining timers must be beyond end_ns
1761-
# Always break to avoid infinite loop (we've processed all timers up to end_ns)
1762-
return True
1754+
# Return False to indicate we should continue checking for data
1755+
return False
17631756
17641757
cdef void _flush_accumulator_events(self, uint64_t ts_now):
17651758
cdef list[TestClock] clocks = get_component_clocks(self._instance_id)

0 commit comments

Comments
 (0)