Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions nautilus_trader/cache/cache.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ cdef class Cache(CacheFacade):
cpdef void add_index_price(self, IndexPriceUpdate index_price)
cpdef void add_funding_rate(self, FundingRateUpdate funding_rate)
cpdef void add_bar(self, Bar bar)
cpdef void replace_last_bar(self, Bar bar)
cpdef void add_quote_ticks(self, list ticks)
cpdef void add_trade_ticks(self, list ticks)
cpdef void add_bars(self, list bars)
Expand Down Expand Up @@ -213,6 +214,7 @@ cdef class Cache(CacheFacade):
cpdef void audit_own_order_books(self)

cdef timedelta _get_timedelta(self, BarType bar_type)
cdef void _update_latest_bid_ask_bar(self, Bar bar)

cpdef list bar_types(
self,
Expand Down
29 changes: 29 additions & 0 deletions nautilus_trader/cache/cache.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1730,12 +1730,41 @@ cdef class Cache(CacheFacade):
self._bars[bar.bar_type] = bars

bars.appendleft(bar)
self._update_latest_bid_ask_bar(bar)

cdef void _update_latest_bid_ask_bar(self, Bar bar):
cdef PriceType price_type = bar.bar_type.spec.price_type
if price_type == PriceType.BID:
self._bars_bid[bar.bar_type.instrument_id] = bar
elif price_type == PriceType.ASK:
self._bars_ask[bar.bar_type.instrument_id] = bar

cpdef void replace_last_bar(self, Bar bar):
"""
Replace the last cached bar for the bars type.

Parameters
----------
bar : Bar
The bar to cache as the last bar.

Notes
-----
This assumes the bar type is already present in the cache. If it is not,
then this method falls back to `add_bar`.

"""
Condition.not_none(bar, "bar")

bars = self._bars.get(bar.bar_type)

if not bars:
self.add_bar(bar)
return

bars[0] = bar
self._update_latest_bid_ask_bar(bar)

cpdef void add_quote_ticks(self, list ticks):
"""
Add the given quotes to the cache.
Expand Down
2 changes: 1 addition & 1 deletion nautilus_trader/common/actor.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4355,7 +4355,7 @@ cdef class Actor(Component):

# Update indicators
cdef list indicators = self._indicators_for_bars.get(bar.bar_type.id_spec_key())
if indicators:
if indicators and not bar.is_revision:
self._handle_indicators_for_bar(indicators, bar)

if historical:
Expand Down
7 changes: 7 additions & 0 deletions nautilus_trader/data/aggregation.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ cdef class BarBuilder:
cdef Price _close
cdef Quantity volume

cpdef Price open(self)
cpdef Price high(self)
cpdef Price low(self)
cpdef Price close(self)

cpdef void update(self, Price price, Quantity size, uint64_t ts_init)
cpdef void update_bar(self, Bar bar, Quantity volume, uint64_t ts_init)
cpdef void reset(self)
Expand Down Expand Up @@ -156,6 +161,7 @@ cdef class TimeBarAggregator(BarAggregator):
cdef bint _timestamp_on_close
cdef bint _skip_first_non_full_bar
cdef bint _build_with_no_updates
cdef bint _handle_revised_bars
cdef int _bar_build_delay
cdef object _time_bars_origin_offset
cdef list _historical_events
Expand All @@ -167,6 +173,7 @@ cdef class TimeBarAggregator(BarAggregator):
cpdef void stop_timer(self)
cdef void _pre_process_historical_events(self, uint64_t ts_init)
cdef void _post_process_historical_events(self)
cdef void _build_and_send_revision(self, uint64_t ts_init)
cpdef void _build_bar(self, TimeEvent event)


Expand Down
53 changes: 53 additions & 0 deletions nautilus_trader/data/aggregation.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,18 @@ cdef class BarBuilder:
self.volume = Quantity.zero_c(precision=self.size_precision)
self.count = 0

cpdef Price open(self):
return self._open

cpdef Price high(self):
return self._high

cpdef Price low(self):
return self._low

cpdef Price close(self):
return self._close

cpdef Bar build_now(self):
"""
Return the aggregated bar and reset.
Expand Down Expand Up @@ -1390,6 +1402,7 @@ cdef class TimeBarAggregator(BarAggregator):
bint build_with_no_updates = True,
object time_bars_origin_offset: pd.Timedelta | pd.DateOffset = None,
int bar_build_delay = 0,
bint handle_revised_bars = False,
) -> None:
super().__init__(
instrument=instrument,
Expand All @@ -1400,6 +1413,7 @@ cdef class TimeBarAggregator(BarAggregator):
self._timestamp_on_close = timestamp_on_close
self._skip_first_non_full_bar = skip_first_non_full_bar
self._build_with_no_updates = build_with_no_updates
self._handle_revised_bars = handle_revised_bars
self._bar_build_delay = bar_build_delay
self._time_bars_origin_offset = time_bars_origin_offset or 0
self._timer_name = f"time_bar_{self.bar_type}"
Expand Down Expand Up @@ -1580,6 +1594,7 @@ cdef class TimeBarAggregator(BarAggregator):
self._pre_process_historical_events(ts_init)

self._builder.update(price, size, ts_init)
self._build_and_send_revision(ts_init)

if self.historical_mode:
self._post_process_historical_events()
Expand All @@ -1589,10 +1604,43 @@ cdef class TimeBarAggregator(BarAggregator):
self._pre_process_historical_events(ts_init)

self._builder.update_bar(bar, volume, ts_init)
self._build_and_send_revision(ts_init)

if self.historical_mode:
self._post_process_historical_events()

cdef void _build_and_send_revision(self, uint64_t ts_init):
if not self._handle_revised_bars:
return

if self._skip_first_non_full_bar and ts_init <= self.first_close_ns:
return

if self._builder.ts_last != ts_init:
return # This update did not advance the builder (stale/out-of-order timestamp)

if self._builder.count == 0:
return

cdef uint64_t ts_event
if self._is_left_open:
ts_event = self.next_close_ns if self._timestamp_on_close else self.stored_open_ns
else:
ts_event = self.stored_open_ns

cdef Bar revision = Bar(
bar_type=self.bar_type,
open=self._builder.open(),
high=self._builder.high(),
low=self._builder.low(),
close=self._builder.close(),
volume=Quantity.from_raw_c(self._builder.volume._mem.raw, self._builder.size_precision),
ts_event=ts_event,
ts_init=ts_init,
is_revision=True,
)
self._handler(revision)

cdef void _pre_process_historical_events(self, uint64_t ts_init):
if self._clock.timestamp_ns() == 0:
self._clock.set_time(ts_init)
Expand Down Expand Up @@ -1623,6 +1671,11 @@ cdef class TimeBarAggregator(BarAggregator):
return # Do not build bar when no update

cdef uint64_t ts_init = event.ts_event
# When revisions are enabled, `ts_init` should reflect the latest contributing update
# rather than the timer boundary (`event.ts_event`). This prevents the final bar from
# having a `ts_init` earlier than the last emitted revision (sequence validation would drop it).
if self._handle_revised_bars and self._builder.ts_last > ts_init:
ts_init = self._builder.ts_last
cdef uint64_t ts_event

if self._is_left_open:
Expand Down
75 changes: 43 additions & 32 deletions nautilus_trader/data/engine.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2199,40 +2199,50 @@ cdef class DataEngine(Component):
cpdef void _handle_bar(self, Bar bar, bint historical = False):
cdef:
BarType bar_type = bar.bar_type
Bar cached_bar
Bar last_bar
list bars
int i
if self._validate_data_sequence:
last_bar = self._cache.bar(bar_type)
if last_bar is not None:
if bar.ts_event < last_bar.ts_event:
self._log.warning(
f"Bar {bar} was prior to last bar `ts_event` {last_bar.ts_event}",
)
return # `bar` is out of sequence
last_bar = self._cache.bar(bar_type)
if self._validate_data_sequence and last_bar is not None:
if bar.ts_event < last_bar.ts_event:
self._log.warning(
f"Bar {bar} was prior to last bar `ts_event` {last_bar.ts_event}",
)
return # `bar` is out of sequence

if bar.ts_init < last_bar.ts_init:
self._log.warning(
f"Bar {bar} was prior to last bar `ts_init` {last_bar.ts_init}",
)
return # `bar` is out of sequence

if bar.is_revision:
if bar.ts_event == last_bar.ts_event:
# Replace `last_bar`, previously cached bar will fall out of scope
self._cache._bars.get(bar_type)[0] = bar # noqa
elif bar.ts_event > last_bar.ts_event:
# Bar is latest, consider as new bar
self._cache.add_bar(bar)
else:
self._log.warning(
f"Bar revision {bar} was not at last bar `ts_event` {last_bar.ts_event}",
)
return # Revision SHOULD be at `last_bar.ts_event`

if not bar.is_revision and not (historical and self._disable_historical_cache):
self._cache.add_bar(bar)
if bar.ts_init < last_bar.ts_init:
self._log.warning(
f"Bar {bar} was prior to last bar `ts_init` {last_bar.ts_init}",
)
return # `bar` is out of sequence

if not (historical and self._disable_historical_cache):
is_update_to_last_bar = last_bar is not None and bar.ts_event == last_bar.ts_event

if is_update_to_last_bar:
# Do not allow a stale in-progress revision from an internal time-bar aggregator
# to overwrite the already finalized bar for the same interval.
is_stale_internal_revision = (
bar.is_revision
and not last_bar.is_revision
and bar_type.is_internally_aggregated()
)
if is_stale_internal_revision:
return

# Preserve behavior: external revisions are cached only when sequence validation is enabled.
# A non-revision always replaces. For revisions, we replace if it's from an internal
# aggregator, or if it's an external revision and sequence validation is enabled.
if not bar.is_revision:
should_replace = True
else:
should_replace = bar_type.is_internally_aggregated() or self._validate_data_sequence
if should_replace:
# Replace `last_bar`, previously cached bar will fall out of scope
self._cache.replace_last_bar(bar)
else:
# Cache revisions only if they are for a newer interval, otherwise treat as stale.
should_add = not bar.is_revision or last_bar is None or bar.ts_event > last_bar.ts_event
if should_add:
self._cache.add_bar(bar)

self._msgbus.publish_c(topic=self._topic_cache.get_bars_topic(bar_type, historical), msg=bar)

Expand Down Expand Up @@ -2716,6 +2726,7 @@ cdef class DataEngine(Component):
build_with_no_updates=self._time_bars_build_with_no_updates,
time_bars_origin_offset=time_bars_origin_offset,
bar_build_delay=self._time_bars_build_delay,
handle_revised_bars=params.get("handle_revised_bars", False),
)
elif bar_type.spec.aggregation == BarAggregation.TICK:
aggregator = TickBarAggregator(
Expand Down
45 changes: 45 additions & 0 deletions tests/unit_tests/common/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from nautilus_trader.data.engine import DataEngine
from nautilus_trader.data.messages import DataResponse
from nautilus_trader.execution.engine import ExecutionEngine
from nautilus_trader.indicators.base import Indicator
from nautilus_trader.model.currencies import EUR
from nautilus_trader.model.currencies import USD
from nautilus_trader.model.data import Bar
Expand Down Expand Up @@ -1208,6 +1209,50 @@ def test_handle_bar_when_running_sends_to_on_bar(self) -> None:
assert actor.calls == ["on_start", "on_bar"]
assert actor.store[0] == bar

def test_handle_bar_does_not_update_indicators_for_revision_bars(self) -> None:
class CountingIndicator(Indicator):
def __init__(self):
super().__init__([])
self.calls = 0

def handle_bar(self, bar):
self.calls += 1

def _reset(self):
self.calls = 0

# Arrange
actor = Actor(config=ActorConfig(component_id=self.component_id))
actor.register_base(
portfolio=self.portfolio,
msgbus=self.msgbus,
cache=self.cache,
clock=self.clock,
)

bar = TestDataStubs.bar_5decimal()
actor.register_indicator_for_bars(bar.bar_type, CountingIndicator())
indicator = actor.registered_indicators[0]

revision_bar = Bar(
bar_type=bar.bar_type,
open=bar.open,
high=bar.high,
low=bar.low,
close=bar.close,
volume=bar.volume,
ts_event=bar.ts_event,
ts_init=bar.ts_init,
is_revision=True,
)

# Act
actor.handle_bar(revision_bar)
actor.handle_bar(bar)

# Assert
assert indicator.calls == 1

def test_handle_bars(self) -> None:
# Arrange
actor = MockActor()
Expand Down
Loading
Loading