Skip to content

Commit 5ff29cb

Browse files
authored
Refine handling of skip_first_non_full_bar in TimeBarAggregator (#3395)
1 parent c5df9ce commit 5ff29cb

File tree

4 files changed

+66
-44
lines changed

4 files changed

+66
-44
lines changed

nautilus_trader/data/aggregation.pxd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ cdef class TimeBarAggregator(BarAggregator):
149149
cdef readonly uint64_t interval_ns
150150
cdef readonly uint64_t next_close_ns
151151
cdef readonly uint64_t stored_open_ns
152+
cdef readonly uint64_t first_close_ns
152153

153154
cdef str _timer_name
154155
cdef bint _is_left_open

nautilus_trader/data/aggregation.pyx

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ cdef class BarBuilder:
189189
self._open = None
190190
self._high = None
191191
self._low = None
192+
self._close = None
192193

193194
self.volume = Quantity.zero_c(precision=self.size_precision)
194195
self.count = 0
@@ -1406,6 +1407,7 @@ cdef class TimeBarAggregator(BarAggregator):
14061407
self.interval_ns = self._get_interval_ns()
14071408
self.stored_open_ns = 0
14081409
self.next_close_ns = 0
1410+
self.first_close_ns = 0
14091411
self.historical_mode = False
14101412
self._historical_events = []
14111413

@@ -1436,46 +1438,55 @@ cdef class TimeBarAggregator(BarAggregator):
14361438
# Closing a partial bar at the transition from historical to backtest data
14371439
cdef bint fire_immediately = (start_time == now)
14381440

1439-
self._skip_first_non_full_bar = self._skip_first_non_full_bar and now > start_time
1440-
1441-
if self.bar_type.spec.aggregation not in (BarAggregation.MONTH, BarAggregation.YEAR):
1442-
self._clock.set_timer(
1443-
name=self._timer_name,
1444-
interval=self.interval,
1445-
start_time=start_time,
1446-
stop_time=None,
1447-
callback=self._build_bar,
1448-
allow_past=True,
1449-
fire_immediately=fire_immediately,
1450-
)
1441+
# Calculate the next close time based on aggregation type
1442+
cdef datetime close_time
1443+
if fire_immediately:
1444+
close_time = start_time
1445+
elif self.bar_type.spec.aggregation == BarAggregation.MONTH:
1446+
close_time = start_time + pd.DateOffset(months=self.bar_type.spec.step)
1447+
elif self.bar_type.spec.aggregation == BarAggregation.YEAR:
1448+
close_time = start_time + pd.DateOffset(years=self.bar_type.spec.step)
1449+
else:
1450+
close_time = start_time + self.interval
14511451

1452-
if fire_immediately:
1453-
self.next_close_ns = dt_to_unix_nanos(start_time)
1454-
else:
1455-
self.next_close_ns = dt_to_unix_nanos(start_time + self.interval)
1452+
self.next_close_ns = dt_to_unix_nanos(close_time)
14561453

1457-
self.stored_open_ns = self.next_close_ns - self.interval_ns
1454+
# The stored open time needs to be defined as a subtraction with respect to the first closing time
1455+
if self.bar_type.spec.aggregation == BarAggregation.MONTH:
1456+
self.stored_open_ns = dt_to_unix_nanos(close_time - pd.DateOffset(months=self.bar_type.spec.step))
1457+
elif self.bar_type.spec.aggregation == BarAggregation.YEAR:
1458+
self.stored_open_ns = dt_to_unix_nanos(close_time - pd.DateOffset(years=self.bar_type.spec.step))
14581459
else:
1459-
# The monthly/yearly alert time is defined iteratively at each alert time as there is no regular interval
1460-
if self.bar_type.spec.aggregation == BarAggregation.MONTH:
1461-
alert_time = start_time + (pd.DateOffset(months=self.bar_type.spec.step) if not fire_immediately else pd.Timedelta(0))
1462-
elif self.bar_type.spec.aggregation == BarAggregation.YEAR:
1463-
alert_time = start_time + (pd.DateOffset(years=self.bar_type.spec.step) if not fire_immediately else pd.Timedelta(0))
1464-
else:
1465-
alert_time = start_time
1460+
self.stored_open_ns = self.next_close_ns - self.interval_ns
1461+
1462+
if self._skip_first_non_full_bar:
1463+
self.first_close_ns = self.next_close_ns
14661464

1465+
if self.bar_type.spec.aggregation in (BarAggregation.MONTH, BarAggregation.YEAR):
1466+
# The monthly/yearly alert time is defined iteratively at each alert time as there is no regular interval
14671467
self._clock.set_time_alert(
14681468
name=self._timer_name,
1469-
alert_time=alert_time,
1469+
alert_time=close_time,
14701470
callback=self._build_bar,
14711471
override=True,
14721472
allow_past=True,
14731473
)
1474-
self.next_close_ns = alert_time.value
1475-
self.stored_open_ns = start_time.value
1474+
else:
1475+
self._clock.set_timer(
1476+
name=self._timer_name,
1477+
interval=self.interval,
1478+
start_time=start_time,
1479+
stop_time=None,
1480+
callback=self._build_bar,
1481+
allow_past=True,
1482+
fire_immediately=fire_immediately,
1483+
)
14761484

1477-
self._log.debug(f"Started timer {self._timer_name}, {start_time=}, {self.historical_mode=}, "
1478-
f"{fire_immediately=}, {start_time=}, {now=}, {self._bar_build_delay=}")
1485+
self._log.debug(f"[start_timer] fire_immediately={fire_immediately}, "
1486+
f"_skip_first_non_full_bar={self._skip_first_non_full_bar}, "
1487+
f"now={now}, start_time={start_time}, "
1488+
f"first_close_ns={unix_nanos_to_dt(self.first_close_ns)}, "
1489+
f"next_close_ns={unix_nanos_to_dt(self.next_close_ns)}")
14791490

14801491
cpdef void stop_timer(self):
14811492
cdef str timer_name = str(self.bar_type)
@@ -1631,10 +1642,11 @@ cdef class TimeBarAggregator(BarAggregator):
16311642
self.next_close_ns = self._clock.next_time_ns(self._timer_name)
16321643

16331644
cdef void _build_and_send(self, uint64_t ts_event, uint64_t ts_init):
1634-
if self._skip_first_non_full_bar:
1645+
if self._skip_first_non_full_bar and ts_init <= self.first_close_ns:
16351646
self._builder.reset()
1636-
self._skip_first_non_full_bar = False
16371647
else:
1648+
# We set set _skip_first_non_full_bar to False for the transition from historical to live data
1649+
self._skip_first_non_full_bar = False
16381650
BarAggregator._build_and_send(self, ts_event, ts_init)
16391651

16401652

nautilus_trader/data/engine.pyx

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2689,14 +2689,20 @@ cdef class DataEngine(Component):
26892689

26902690
if bar_type.spec.is_time_aggregated():
26912691
time_bars_origin_offset = self._time_bars_origin_offset.get(bar_type.spec.aggregation) or params.get("time_bars_origin_offset")
2692+
2693+
if "skip_first_non_full_bar" in params:
2694+
time_bars_skip_first_non_full_bar = params.get("skip_first_non_full_bar", False)
2695+
else:
2696+
time_bars_skip_first_non_full_bar = self._time_bars_skip_first_non_full_bar
2697+
26922698
aggregator = TimeBarAggregator(
26932699
instrument=instrument,
26942700
bar_type=aggregated_bar_type,
26952701
handler=self.process,
26962702
clock=self._clock,
26972703
interval_type=self._time_bars_interval_type,
26982704
timestamp_on_close=self._time_bars_timestamp_on_close,
2699-
skip_first_non_full_bar=self._time_bars_skip_first_non_full_bar,
2705+
skip_first_non_full_bar=time_bars_skip_first_non_full_bar,
27002706
build_with_no_updates=self._time_bars_build_with_no_updates,
27012707
time_bars_origin_offset=time_bars_origin_offset,
27022708
bar_build_delay=self._time_bars_build_delay,

tests/unit_tests/data/test_aggregation.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3524,7 +3524,7 @@ def test_skip_first_non_full_bar_when_starting_on_bar_boundary(self):
35243524
bar_spec = BarSpecification(2, BarAggregation.SECOND, PriceType.LAST)
35253525
bar_type = BarType(instrument_id, bar_spec, AggregationSource.INTERNAL)
35263526

3527-
# Start exactly at a 2-second boundary (2024-12-01 00:00:00)
3527+
# Act - process ticks and advance time to trigger bars
35283528
start_time_ns = dt_to_unix_nanos(pd.Timestamp("2024-12-01 00:00:00", tz="UTC"))
35293529
clock.set_time(start_time_ns)
35303530

@@ -3538,6 +3538,10 @@ def test_skip_first_non_full_bar_when_starting_on_bar_boundary(self):
35383538
)
35393539
aggregator.start_timer()
35403540

3541+
events = clock.advance_time(start_time_ns)
3542+
for event in events:
3543+
event.handle()
3544+
35413545
# Create trade ticks at 0.5 second intervals
35423546
tick1 = TradeTick(
35433547
instrument_id=instrument_id,
@@ -3549,6 +3553,9 @@ def test_skip_first_non_full_bar_when_starting_on_bar_boundary(self):
35493553
ts_init=start_time_ns + 500_000_000,
35503554
)
35513555

3556+
aggregator.handle_trade_tick(tick1)
3557+
clock.set_time(tick1.ts_event)
3558+
35523559
tick2 = TradeTick(
35533560
instrument_id=instrument_id,
35543561
price=Price.from_str("1.00002"),
@@ -3559,6 +3566,9 @@ def test_skip_first_non_full_bar_when_starting_on_bar_boundary(self):
35593566
ts_init=start_time_ns + 1_000_000_000,
35603567
)
35613568

3569+
aggregator.handle_trade_tick(tick2)
3570+
clock.set_time(tick2.ts_event)
3571+
35623572
tick3 = TradeTick(
35633573
instrument_id=instrument_id,
35643574
price=Price.from_str("1.00003"),
@@ -3569,25 +3579,18 @@ def test_skip_first_non_full_bar_when_starting_on_bar_boundary(self):
35693579
ts_init=start_time_ns + 1_500_000_000,
35703580
)
35713581

3572-
# Act - process ticks and advance time to trigger first bar
3573-
aggregator.handle_trade_tick(tick1)
3574-
clock.set_time(tick1.ts_event)
3575-
3576-
aggregator.handle_trade_tick(tick2)
3577-
clock.set_time(tick2.ts_event)
3578-
35793582
aggregator.handle_trade_tick(tick3)
35803583
clock.set_time(tick3.ts_event)
35813584

35823585
# Advance to exactly 2 seconds to trigger the first bar
35833586
events = clock.advance_time(start_time_ns + 2_000_000_000)
3584-
if events:
3585-
events[0].handle()
3587+
for event in events:
3588+
event.handle()
35863589

35873590
# Assert - we should have received the first bar since we had data for the full period
35883591
assert len(handler) == 1, f"Expected 1 bar but got {len(handler)} bars"
3589-
# The bar closes at start_time (00:00:00) because fire_immediately=True
3590-
assert handler[0].ts_event == start_time_ns
3592+
3593+
assert handler[0].ts_event == start_time_ns + 2_000_000_000
35913594
assert handler[0].open == Price.from_str("1.00001")
35923595
assert handler[0].close == Price.from_str("1.00003")
35933596
assert handler[0].volume == Quantity.from_int(300000)

0 commit comments

Comments
 (0)