Skip to content

Commit 65ae969

Browse files
committed
Refine handling of timers in backtest engine
1 parent 6a58245 commit 65ae969

File tree

4 files changed

+315
-11
lines changed

4 files changed

+315
-11
lines changed

examples/backtest/notebooks/databento_test_order_book_deltas.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
# %%
2323

24+
import pandas as pd
25+
2426
from nautilus_trader.adapters.databento.data_utils import databento_data
2527
from nautilus_trader.adapters.databento.data_utils import load_catalog
2628
from nautilus_trader.backtest.node import BacktestNode
@@ -91,6 +93,19 @@ def __init__(self, config: TestOrderBookDeltasConfig):
9193

9294
def on_start(self):
9395
self.request_instrument(self.config.symbol_id)
96+
97+
# Set time alert for 1 second after start
98+
alert_time = self.clock.utc_now() + pd.Timedelta(seconds=1)
99+
self.clock.set_time_alert(
100+
"subscribe_alert",
101+
alert_time,
102+
self.on_subscribe_timer,
103+
)
104+
105+
def on_subscribe_timer(self, event):
106+
self.user_log(
107+
f"Subscribing to order book deltas after 1 second delay, clock={self.clock.utc_now()}",
108+
)
94109
self.subscribe_order_book_deltas(self.config.symbol_id)
95110

96111
def on_order_book_deltas(self, deltas):
@@ -100,12 +115,12 @@ def on_order_book_deltas(self, deltas):
100115

101116
self._deltas_count += 1
102117

103-
def user_log(self, msg, color=LogColor.GREEN):
104-
self.log.warning(f"{msg}", color=color)
105-
106118
def on_stop(self):
107119
order_book = self.cache.order_book(self.config.symbol_id)
108-
self.user_log(f"{order_book}")
120+
self.user_log(f"Final OrderBook: {order_book}")
121+
122+
def user_log(self, msg, color=LogColor.GREEN):
123+
self.log.warning(f"{msg}", color=color)
109124

110125

111126
# %% [markdown]

nautilus_trader/backtest/engine.pxd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ cdef class BacktestEngine:
124124
cdef list _response_data
125125

126126
cdef CVec _advance_time(self, uint64_t ts_now)
127+
cdef bint _process_next_timer(self)
127128
cdef void _flush_accumulator_events(self, uint64_t ts_now)
128129
cdef void _process_raw_time_event_handlers(
129130
self,

nautilus_trader/backtest/engine.pyx

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,6 @@ cdef class BacktestEngine:
823823

824824
if validate:
825825
first = data[0]
826-
827826
if hasattr(first, "instrument_id"):
828827
Condition.is_true(
829828
first.instrument_id in self._kernel.cache.instrument_ids(),
@@ -1532,12 +1531,22 @@ cdef class BacktestEngine:
15321531
break
15331532
15341533
# -- MAIN BACKTEST LOOP -----------------------------------------------#
1535-
self._last_ns = 0
1534+
self._last_ns = start_ns
15361535
cdef uint64_t raw_handlers_count = 0
15371536
cdef Data data = self._data_iterator.next()
15381537
cdef CVec raw_handlers
1538+
raw_handlers.ptr = NULL
1539+
raw_handlers.len = 0
1540+
raw_handlers.cap = 0
15391541
try:
1540-
while data is not None:
1542+
while True:
1543+
if data is None:
1544+
if self._process_next_timer():
1545+
break
1546+
1547+
data = self._data_iterator.next()
1548+
continue
1549+
15411550
if data.ts_init > end_ns:
15421551
# End of backtest
15431552
break
@@ -1593,6 +1602,7 @@ cdef class BacktestEngine:
15931602
)
15941603
if raw_handlers.ptr != NULL:
15951604
vec_time_event_handlers_drop(raw_handlers)
1605+
15961606
raw_handlers_count = 0
15971607
15981608
self._iteration += 1
@@ -1612,10 +1622,6 @@ cdef class BacktestEngine:
16121622
for exchange in self._venues.values():
16131623
exchange.process(self._kernel.clock.timestamp_ns())
16141624
1615-
# Flush remaining events at the last data timestamp
1616-
if self._last_ns > 0:
1617-
self._flush_accumulator_events(self._last_ns)
1618-
16191625
cdef CVec _advance_time(self, uint64_t ts_now):
16201626
# Advance clocks and process all events before ts_now in timestamp order.
16211627
#
@@ -1697,6 +1703,47 @@ cdef class BacktestEngine:
16971703
empty_vec.cap = 0
16981704
return empty_vec
16991705
1706+
cdef bint _process_next_timer(self):
1707+
# Process the next chronological timer when data is exhausted.
1708+
#
1709+
# This method is used when the data stream is empty, but timers (alerts) might
1710+
# still be active. Instead of jumping to the end of the backtest, it finds the
1711+
# absolute next timer time across all component clocks and advances to it.
1712+
#
1713+
# This allows for scenarios where a timer callback might load new data on-the-fly
1714+
# (e.g. via a subscription), which should then be processed in proper sequence.
1715+
#
1716+
# Returns True if there are no more timers within the backtest range (should break),
1717+
# False otherwise (should continue the backtest loop to check for new data).
1718+
1719+
cdef list[TestClock] clocks = get_component_clocks(self._instance_id)
1720+
cdef TestClock clock
1721+
cdef uint64_t min_next_time = 0
1722+
cdef uint64_t next_timer_time
1723+
cdef str name
1724+
1725+
# 1. Process all timers up to current time
1726+
self._flush_accumulator_events(self._last_ns)
1727+
1728+
# 2. Find the absolute next timer time across all clocks
1729+
for clock in clocks:
1730+
for name in clock.timer_names:
1731+
next_timer_time = clock.next_time_ns(name)
1732+
if next_timer_time > self._last_ns:
1733+
if min_next_time == 0 or next_timer_time < min_next_time:
1734+
min_next_time = next_timer_time
1735+
1736+
if min_next_time == 0 or min_next_time > self._end_ns:
1737+
# No more timers in the backtest range
1738+
return True
1739+
1740+
# 3. Process the next closest possible timer(s)
1741+
self._last_ns = min_next_time
1742+
self._flush_accumulator_events(min_next_time)
1743+
1744+
# Return False to indicate we should continue checking for data
1745+
return False
1746+
17001747
cdef void _flush_accumulator_events(self, uint64_t ts_now):
17011748
cdef list[TestClock] clocks = get_component_clocks(self._instance_id)
17021749
cdef TestClock clock
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
# -------------------------------------------------------------------------------------------------
2+
# Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3+
# https://nautechsystems.io
4+
#
5+
# Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6+
# You may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
# -------------------------------------------------------------------------------------------------
15+
16+
import pandas as pd
17+
18+
from nautilus_trader.backtest.engine import BacktestEngine
19+
from nautilus_trader.backtest.engine import BacktestEngineConfig
20+
from nautilus_trader.common.actor import Actor
21+
from nautilus_trader.config import LoggingConfig
22+
from nautilus_trader.model.currencies import USD
23+
from nautilus_trader.model.enums import AccountType
24+
from nautilus_trader.model.enums import OmsType
25+
from nautilus_trader.model.identifiers import Venue
26+
from nautilus_trader.model.objects import Money
27+
from nautilus_trader.test_kit.providers import TestInstrumentProvider
28+
from nautilus_trader.test_kit.stubs.data import TestDataStubs
29+
30+
31+
class TimerActor(Actor):
32+
def __init__(self, instrument_id):
33+
super().__init__()
34+
self.instrument_id = instrument_id
35+
self.timer_fired_count = 0
36+
self.last_timer_time = 0
37+
self.received_data = []
38+
39+
def on_start(self):
40+
self.subscribe_quote_ticks(self.instrument_id)
41+
42+
def timer_callback(self, event):
43+
self.timer_fired_count += 1
44+
self.last_timer_time = self.clock.timestamp_ns()
45+
46+
def on_quote_tick(self, tick):
47+
self.received_data.append(tick)
48+
49+
50+
class TestBacktestEngineTimers:
51+
def setup_method(self):
52+
self.engine = BacktestEngine(
53+
BacktestEngineConfig(logging=LoggingConfig(bypass_logging=True)),
54+
)
55+
self.engine.add_venue(
56+
venue=Venue("SIM"),
57+
oms_type=OmsType.HEDGING,
58+
account_type=AccountType.MARGIN,
59+
base_currency=USD,
60+
starting_balances=[Money(1000000, USD)],
61+
)
62+
self.instrument = TestInstrumentProvider.default_fx_ccy("EUR/USD")
63+
self.engine.add_instrument(self.instrument)
64+
65+
def test_timer_execution_no_data(self):
66+
# Test that timers fire correctly even when no data is provided
67+
actor = TimerActor(self.instrument.id)
68+
self.engine.add_actor(actor)
69+
70+
start_time = pd.Timestamp("2024-01-01", tz="UTC")
71+
end_time = start_time + pd.Timedelta(seconds=10)
72+
timer_time = start_time + pd.Timedelta(seconds=5)
73+
74+
actor.clock.set_time_alert("test_timer", timer_time, actor.timer_callback)
75+
76+
self.engine.run(start=start_time, end=end_time)
77+
78+
assert actor.timer_fired_count == 1
79+
assert actor.last_timer_time == timer_time.value
80+
81+
def test_on_the_fly_data_loading_from_timer(self):
82+
# Test that data added via add_data_iterator in a timer callback is processed correctly.
83+
# This mirrors the real-world pattern where subscriptions use generators for on-the-fly loading.
84+
actor = TimerActor(self.instrument.id)
85+
self.engine.add_actor(actor)
86+
87+
start_time = pd.Timestamp("2024-01-01", tz="UTC")
88+
end_time = start_time + pd.Timedelta(seconds=10)
89+
timer_time = start_time + pd.Timedelta(seconds=5)
90+
data_time = start_time + pd.Timedelta(seconds=6)
91+
92+
def data_generator():
93+
# Generator that yields data when called from timer callback
94+
# This simulates how _handle_subscribe uses generators for on-the-fly loading
95+
yield [TestDataStubs.quote_tick(self.instrument, ts_init=data_time.value)]
96+
97+
def timer_callback_with_data(event):
98+
actor.timer_callback(event)
99+
# Load data on the fly using add_data_iterator (like _handle_subscribe does)
100+
self.engine.add_data_iterator("on_the_fly_data", data_generator())
101+
102+
actor.clock.set_time_alert("test_timer", timer_time, timer_callback_with_data)
103+
104+
self.engine.run(start=start_time, end=end_time)
105+
106+
assert actor.timer_fired_count == 1
107+
assert len(actor.received_data) == 1
108+
assert actor.received_data[0].ts_init == data_time.value
109+
110+
def test_multiple_timers_same_timestamp(self):
111+
# Test that multiple timers at the exact same timestamp all fire
112+
actor = TimerActor(self.instrument.id)
113+
self.engine.add_actor(actor)
114+
115+
start_time = pd.Timestamp("2024-01-01", tz="UTC")
116+
end_time = start_time + pd.Timedelta(seconds=10)
117+
timer_time = start_time + pd.Timedelta(seconds=5)
118+
119+
actor.clock.set_time_alert("timer1", timer_time, actor.timer_callback)
120+
actor.clock.set_time_alert("timer2", timer_time, actor.timer_callback)
121+
actor.clock.set_time_alert("timer3", timer_time, actor.timer_callback)
122+
123+
self.engine.run(start=start_time, end=end_time)
124+
125+
assert actor.timer_fired_count == 3
126+
assert actor.last_timer_time == timer_time.value
127+
128+
def test_chained_timers(self):
129+
# Test that a timer scheduling another timer works
130+
actor = TimerActor(self.instrument.id)
131+
self.engine.add_actor(actor)
132+
133+
start_time = pd.Timestamp("2024-01-01", tz="UTC")
134+
end_time = start_time + pd.Timedelta(seconds=10)
135+
timer1_time = start_time + pd.Timedelta(seconds=5)
136+
timer2_time = start_time + pd.Timedelta(seconds=6)
137+
138+
def timer1_callback(event):
139+
actor.timer_callback(event)
140+
actor.clock.set_time_alert("timer2", timer2_time, actor.timer_callback)
141+
142+
actor.clock.set_time_alert("timer1", timer1_time, timer1_callback)
143+
144+
self.engine.run(start=start_time, end=end_time)
145+
146+
assert actor.timer_fired_count == 2
147+
assert actor.last_timer_time == timer2_time.value
148+
149+
def test_chained_timers_same_timestamp(self):
150+
# Test that a timer scheduling another timer for the SAME timestamp works
151+
actor = TimerActor(self.instrument.id)
152+
self.engine.add_actor(actor)
153+
154+
start_time = pd.Timestamp("2024-01-01", tz="UTC")
155+
end_time = start_time + pd.Timedelta(seconds=10)
156+
timer_time = start_time + pd.Timedelta(seconds=5)
157+
158+
def timer1_callback(event):
159+
actor.timer_callback(event)
160+
# Schedule another one for the same time
161+
actor.clock.set_time_alert("timer2", timer_time, actor.timer_callback)
162+
163+
actor.clock.set_time_alert("timer1", timer_time, timer1_callback)
164+
165+
self.engine.run(start=start_time, end=end_time)
166+
167+
assert actor.timer_fired_count == 2
168+
assert actor.last_timer_time == timer_time.value
169+
170+
def test_timers_alphabetical_order_same_timestamp(self):
171+
# Test that multiple timers at the same timestamp fire regardless of name order
172+
actor = TimerActor(self.instrument.id)
173+
self.engine.add_actor(actor)
174+
175+
start_time = pd.Timestamp("2024-01-01", tz="UTC")
176+
end_time = start_time + pd.Timedelta(seconds=10)
177+
timer_time = start_time + pd.Timedelta(seconds=5)
178+
179+
fired_order = []
180+
181+
def callback_z(event):
182+
fired_order.append("z")
183+
184+
def callback_a(event):
185+
fired_order.append("a")
186+
187+
def callback_m(event):
188+
fired_order.append("m")
189+
190+
actor.clock.set_time_alert("z_timer", timer_time, callback_z)
191+
actor.clock.set_time_alert("a_timer", timer_time, callback_a)
192+
actor.clock.set_time_alert("m_timer", timer_time, callback_m)
193+
194+
self.engine.run(start=start_time, end=end_time)
195+
196+
# They should all fire. In current implementation, order might be alphabetical
197+
# due to Rust's BTreeMap/ordered keys, but the key thing is they ALL fire.
198+
assert len(fired_order) == 3
199+
assert fired_order == ["a", "m", "z"]
200+
201+
def test_timers_and_data_interwoven(self):
202+
# Test that timers and data are processed in the correct interleaved order
203+
start_time = pd.Timestamp("2024-01-01", tz="UTC")
204+
end_time = start_time + pd.Timedelta(seconds=10)
205+
206+
# Timeline:
207+
# T+2: Data1
208+
# T+4: Timer1
209+
# T+6: Data2
210+
# T+8: Timer2
211+
212+
events = []
213+
214+
def timer_callback(event):
215+
events.append(f"timer_{event.ts_event}")
216+
217+
class InterwovenActor(TimerActor):
218+
def on_quote_tick(self, tick):
219+
events.append(f"data_{tick.ts_init}")
220+
221+
actor = InterwovenActor(self.instrument.id)
222+
self.engine.add_actor(actor)
223+
224+
t2 = (start_time + pd.Timedelta(seconds=2)).value
225+
t4 = (start_time + pd.Timedelta(seconds=4)).value
226+
t6 = (start_time + pd.Timedelta(seconds=6)).value
227+
t8 = (start_time + pd.Timedelta(seconds=8)).value
228+
229+
self.engine.add_data(
230+
[
231+
TestDataStubs.quote_tick(self.instrument, ts_init=t2),
232+
TestDataStubs.quote_tick(self.instrument, ts_init=t6),
233+
],
234+
)
235+
actor.clock.set_time_alert("timer1", pd.Timestamp(t4, unit="ns", tz="UTC"), timer_callback)
236+
actor.clock.set_time_alert("timer2", pd.Timestamp(t8, unit="ns", tz="UTC"), timer_callback)
237+
238+
self.engine.run(start=start_time, end=end_time)
239+
240+
expected = [f"data_{t2}", f"timer_{t4}", f"data_{t6}", f"timer_{t8}"]
241+
assert events == expected

0 commit comments

Comments
 (0)