diff --git a/nautilus_trader/trading/config.py b/nautilus_trader/trading/config.py index 32a8e34bb9bd..dccc7a0d4c07 100644 --- a/nautilus_trader/trading/config.py +++ b/nautilus_trader/trading/config.py @@ -62,6 +62,13 @@ class StrategyConfig(NautilusConfig, kw_only=True, frozen=True): If commands should be logged by the strategy. log_rejected_due_post_only_as_warning : bool, default True If order rejected events where `due_post_only` is True should be logged as warnings. + inflight_check_interval_ms : int, default 100 + The interval in milliseconds to check for in-flight orders and open positions + during a market exit. + market_exit_max_attempts : int, default 100 + The maximum number of attempts to wait for orders and positions to close + during a market exit before forcing a stop. Defaults to 100 attempts + (10 seconds at 100ms intervals). """ @@ -76,6 +83,8 @@ class StrategyConfig(NautilusConfig, kw_only=True, frozen=True): log_events: bool = True log_commands: bool = True log_rejected_due_post_only_as_warning: bool = True + inflight_check_interval_ms: int = 100 + market_exit_max_attempts: int = 100 class ImportableStrategyConfig(NautilusConfig, frozen=True): diff --git a/nautilus_trader/trading/controller.py b/nautilus_trader/trading/controller.py index d39deb8d3328..639ad2144d6d 100644 --- a/nautilus_trader/trading/controller.py +++ b/nautilus_trader/trading/controller.py @@ -29,6 +29,7 @@ from nautilus_trader.trading.config import StrategyFactory from nautilus_trader.trading.messages import CreateActor from nautilus_trader.trading.messages import CreateStrategy +from nautilus_trader.trading.messages import MarketExitStrategy from nautilus_trader.trading.messages import RemoveActor from nautilus_trader.trading.messages import RemoveStrategy from nautilus_trader.trading.messages import StartActor @@ -93,6 +94,8 @@ def execute(self, command: Command) -> None: self.start_strategy_from_id(command.strategy_id) elif isinstance(command, StopStrategy): self.stop_strategy_from_id(command.strategy_id) + elif isinstance(command, MarketExitStrategy): + self.market_exit_strategy_from_id(command.strategy_id) elif isinstance(command, RemoveStrategy): self.remove_strategy_from_id(command.strategy_id) @@ -210,6 +213,25 @@ def stop_strategy(self, strategy: Strategy) -> None: """ self._trader.stop_strategy(strategy.id) + def market_exit_strategy(self, strategy: Strategy) -> None: + """ + Market exit the given `strategy`. + + Will log a warning if the strategy is not ``RUNNING``. + + Parameters + ---------- + strategy : Strategy + The strategy to market exit. + + Raises + ------ + ValueError + If `strategy` is not already registered with the trader. + + """ + self._trader.market_exit_strategy(strategy.id) + def remove_actor(self, actor: Actor) -> None: """ Remove the given `actor`. @@ -383,6 +405,25 @@ def stop_strategy_from_id(self, strategy_id: StrategyId) -> None: """ self._trader.stop_strategy(strategy_id) + def market_exit_strategy_from_id(self, strategy_id: StrategyId) -> None: + """ + Market exit the strategy corresponding to `strategy_id`. + + Will log a warning if the strategy is not ``RUNNING``. + + Parameters + ---------- + strategy_id : StrategyId + The ID of the strategy to market exit. + + Raises + ------ + ValueError + If `strategy` is not already registered with the trader. + + """ + self._trader.market_exit_strategy(strategy_id) + def remove_actor_from_id(self, actor_id: ComponentId) -> None: """ Remove the actor corresponding to `actor_id`. diff --git a/nautilus_trader/trading/messages.py b/nautilus_trader/trading/messages.py index 3cce7740d556..8e35a610d531 100644 --- a/nautilus_trader/trading/messages.py +++ b/nautilus_trader/trading/messages.py @@ -235,3 +235,29 @@ def __init__( super().__init__(command_id or UUID4(), ts_init) self.strategy_id = strategy_id + + +class MarketExitStrategy(Command): + """ + Represents a command to exit the market for a strategy. + + Parameters + ---------- + strategy_id : StrategyId + The ID of the strategy to exit the market for. + command_id : UUID4 + The command ID. + ts_init : int + UNIX timestamp (nanoseconds) when the object was initialized. + + """ + + def __init__( + self, + strategy_id: StrategyId, + command_id: UUID4 | None = None, + ts_init: int = 0, + ) -> None: + super().__init__(command_id or UUID4(), ts_init) + + self.strategy_id = strategy_id diff --git a/nautilus_trader/trading/strategy.pxd b/nautilus_trader/trading/strategy.pxd index 2f86097ceb66..69c9cea05337 100644 --- a/nautilus_trader/trading/strategy.pxd +++ b/nautilus_trader/trading/strategy.pxd @@ -67,6 +67,8 @@ cdef class Strategy(Actor): cdef bint _log_events cdef bint _log_commands cdef bint _log_rejected_due_post_only_as_warning + cdef bint _is_exiting + cdef int _market_exit_attempts cdef readonly OrderFactory order_factory """The order factory for the strategy.\n\n:returns: `OrderFactory`""" @@ -97,6 +99,9 @@ cdef class Strategy(Actor): ) cpdef void change_id(self, StrategyId strategy_id) cpdef void change_order_id_tag(self, str order_id_tag) + cpdef void on_market_exit(self) + cpdef void after_market_exit(self) + cpdef void market_exit(self) # -- ABSTRACT METHODS ----------------------------------------------------------------------------- @@ -170,6 +175,7 @@ cdef class Strategy(Actor): cdef str _get_gtd_expiry_timer_name(self, ClientOrderId client_order_id) cdef void _set_gtd_expiry(self, Order order) cpdef void _expire_gtd_order(self, TimeEvent event) + cpdef void _check_market_exit(self, TimeEvent event) # -- EVENTS --------------------------------------------------------------------------------------- diff --git a/nautilus_trader/trading/strategy.pyx b/nautilus_trader/trading/strategy.pyx index e94beb701692..7f3af75d574c 100644 --- a/nautilus_trader/trading/strategy.pyx +++ b/nautilus_trader/trading/strategy.pyx @@ -25,8 +25,11 @@ attempts to operate without a managing `Trader` instance. """ +import pandas as pd + from nautilus_trader.trading.config import ImportableStrategyConfig from nautilus_trader.trading.config import StrategyConfig +from nautilus_trader.trading.messages import MarketExitStrategy from libc.stdint cimport uint64_t @@ -163,6 +166,8 @@ cdef class Strategy(Actor): self.external_order_claims = self._parse_external_order_claims(config.external_order_claims) self.manage_contingent_orders = config.manage_contingent_orders self.manage_gtd_expiry = config.manage_gtd_expiry + self._is_exiting = False + self._market_exit_attempts = 0 # Public components self.clock = self._clock @@ -243,6 +248,29 @@ cdef class Strategy(Actor): "occur here, such as resetting indicators and other state" ) + cpdef void on_market_exit(self): + """ + Actions to be performed when a market exit has been initiated. + + Warnings + -------- + Override this method in a subclass to implement custom market exit logic. + + """ + # Optionally override in subclass + + cpdef void after_market_exit(self): + """ + Actions to be performed after a market exit has been completed. + + Warnings + -------- + Override this method in a subclass to implement custom logic after + market exit. + + """ + # Optionally override in subclass + # -- REGISTRATION --------------------------------------------------------------------------------- cpdef void register( @@ -404,6 +432,10 @@ cdef class Strategy(Actor): if self._manager: self._manager.reset() + # Reset market exit state + self._is_exiting = False + self._market_exit_attempts = 0 + self.on_reset() # -- ABSTRACT METHODS ----------------------------------------------------------------------------- @@ -1664,7 +1696,105 @@ cdef class Strategy(Actor): self._log.info(f"Expiring GTD order {order.client_order_id}", LogColor.BLUE) self.cancel_order(order) - # -- HANDLERS ------------------------------------------------------------------------------------- + cpdef void market_exit(self): + """ + Initiate an iterative market exit for the strategy. + + Will cancel all open orders and close all open positions, and wait for + all in-flight orders to resolve and positions to close before stopping + the strategy. + """ + if self._is_exiting: + return + + self._is_exiting = True + self._market_exit_attempts = 0 + + self._log.info("Initiating market exit...", LogColor.BLUE) + self.on_market_exit() + + # Get all instruments the strategy has open orders or positions for + cdef list open_orders = self.cache.orders_open(None, None, self.id) + cdef list open_positions = self.cache.positions_open(None, None, self.id) + + cdef set instruments = set() + cdef Order order + for order in open_orders: + instruments.add(order.instrument_id) + + cdef Position position + for position in open_positions: + instruments.add(position.instrument_id) + + cdef InstrumentId instrument_id + for instrument_id in instruments: + self.cancel_all_orders(instrument_id) + self.close_all_positions(instrument_id) + + # Start iterative check + self._log.info(f"Setting market exit timer for {self.id}") + self._clock.set_timer( + f"MARKET-EXIT-CHECK:{self.id}", + pd.Timedelta(milliseconds=self.config.inflight_check_interval_ms), + None, + None, + self._check_market_exit, + True, + False, + ) + + cpdef void _check_market_exit(self, TimeEvent event): + if self.state != ComponentState.RUNNING: + return + + self._market_exit_attempts += 1 + self._log.debug(f"Market exit check triggered: {event.name} (attempt {self._market_exit_attempts})") + + # Check if max attempts reached + if self._market_exit_attempts >= self.config.market_exit_max_attempts: + timer_name = f"MARKET-EXIT-CHECK:{self.id}" + if timer_name in self._clock.timer_names: + self._clock.cancel_timer(name=timer_name) + + self._log.warning( + f"Market exit max attempts ({self.config.market_exit_max_attempts}) reached. " + f"Forcing stop. Open orders: {len(self.cache.orders_open(None, None, self.id))}, " + f"inflight orders: {len(self.cache.orders_inflight(None, None, self.id))}, " + f"open positions: {len(self.cache.positions_open(None, None, self.id))}", + LogColor.YELLOW + ) + + # Reset before stopping + self._is_exiting = False + self._market_exit_attempts = 0 + self.after_market_exit() + self.stop() + return + + cdef list open_orders = self.cache.orders_open(None, None, self.id) + cdef list inflight_orders = self.cache.orders_inflight(None, None, self.id) + + if open_orders or inflight_orders: + return + + cdef list open_positions = self.cache.positions_open(None, None, self.id) + if open_positions: + # If there are open positions but no orders, we should re-send close orders + for position in open_positions: + self.close_position(position) + + return + + # All clear + timer_name = f"MARKET-EXIT-CHECK:{self.id}" + if timer_name in self._clock.timer_names: + self._clock.cancel_timer(name=timer_name) + + # Reset before stopping + self._is_exiting = False + self._market_exit_attempts = 0 + self.after_market_exit() + self.stop() cpdef void handle_event(self, Event event): """ diff --git a/nautilus_trader/trading/trader.py b/nautilus_trader/trading/trader.py index 3239ff2c63d6..cad2b33c22e8 100644 --- a/nautilus_trader/trading/trader.py +++ b/nautilus_trader/trading/trader.py @@ -635,6 +635,34 @@ def stop_strategy(self, strategy_id: StrategyId) -> None: strategy.stop() + def market_exit_strategy(self, strategy_id: StrategyId) -> None: + """ + Market exit the strategy with the given `strategy_id`. + + Parameters + ---------- + strategy_id : StrategyId + The strategy ID to market exit. + + Raises + ------ + ValueError + If a strategy with the given `strategy_id` is not found. + + """ + PyCondition.not_none(strategy_id, "strategy_id") + + strategy = self._strategies.get(strategy_id) + + if strategy is None: + raise ValueError(f"Cannot market exit strategy, {strategy_id} not found.") + + if not strategy.is_running: + self._log.warning(f"Strategy {strategy_id} not running") + return + + strategy.market_exit() + def remove_actor(self, actor_id: ComponentId) -> None: """ Remove the actor with the given `actor_id`. diff --git a/tests/unit_tests/trading/test_strategy.py b/tests/unit_tests/trading/test_strategy.py index b227b585709e..eaa8f50c3daf 100644 --- a/tests/unit_tests/trading/test_strategy.py +++ b/tests/unit_tests/trading/test_strategy.py @@ -201,6 +201,8 @@ def test_strategy_to_importable_config_with_no_specific_config(self) -> None: "log_events": True, "log_commands": True, "log_rejected_due_post_only_as_warning": True, + "inflight_check_interval_ms": 100, + "market_exit_max_attempts": 100, } def test_strategy_to_importable_config(self) -> None: @@ -235,6 +237,8 @@ def test_strategy_to_importable_config(self) -> None: "log_events": False, "log_commands": True, "log_rejected_due_post_only_as_warning": True, + "inflight_check_interval_ms": 100, + "market_exit_max_attempts": 100, } def test_strategy_equality(self) -> None: @@ -2059,3 +2063,82 @@ def test_managed_contingencies_when_filled_tp_then_cancels_contingent_order( assert entry_order.status == OrderStatus.FILLED assert tp_order.status == OrderStatus.FILLED assert sl_order.status == OrderStatus.CANCELED + + def test_market_exit(self) -> None: + # Arrange + config = StrategyConfig(inflight_check_interval_ms=10) + strategy = Strategy(config=config) + strategy.register( + trader_id=self.trader_id, + portfolio=self.portfolio, + msgbus=self.msgbus, + cache=self.cache, + clock=self.clock, + ) + strategy.start() + + # Submit an order that will remain open + order = strategy.order_factory.limit( + _USDJPY_SIM.id, + OrderSide.BUY, + Quantity.from_int(100_000), + price=Price.from_str("10.000"), # Far from market + ) + strategy.submit_order(order) + self.exchange.process(0) + assert order.status == OrderStatus.ACCEPTED + + # Act + strategy.market_exit() + + # 1st check: cancels orders immediately in market_exit() + self.exchange.process(0) # Process the cancel command + assert order.status == OrderStatus.CANCELED + + # Advance time to trigger check timer + for handler in self.clock.advance_time( + self.clock.timestamp_ns() + pd.Timedelta(milliseconds=20).value, + ): + handler.handle() + + # Assert + assert strategy.state == ComponentState.STOPPED + + def test_market_exit_with_position(self) -> None: + # Arrange + config = StrategyConfig(inflight_check_interval_ms=10) + strategy = Strategy(config=config) + strategy.register( + trader_id=self.trader_id, + portfolio=self.portfolio, + msgbus=self.msgbus, + cache=self.cache, + clock=self.clock, + ) + strategy.start() + + # Open a position + order = strategy.order_factory.market( + _USDJPY_SIM.id, + OrderSide.BUY, + Quantity.from_int(100_000), + ) + strategy.submit_order(order) + self.exchange.process(0) + assert not strategy.portfolio.is_completely_flat() + + # Act + strategy.market_exit() + + # 1st call to market_exit() calls close_all_positions immediately + self.exchange.process(0) # Process the closing market order + assert strategy.portfolio.is_completely_flat() + + # Advance time to trigger check timer + for handler in self.clock.advance_time( + self.clock.timestamp_ns() + pd.Timedelta(milliseconds=20).value, + ): + handler.handle() + + # Assert + assert strategy.state == ComponentState.STOPPED