Skip to content

Commit 310f71b

Browse files
committed
Fix Betfair duplicate cancel event race condition
- Add FifoCache to track terminal orders and prevent duplicate cancels - Guard both cancel and lapse paths against duplicate terminal events - Sync terminal orders cache on reconnect for closed orders - Add test for duplicate cancel event prevention
1 parent 14f5120 commit 310f71b

File tree

3 files changed

+53
-4
lines changed

3 files changed

+53
-4
lines changed

RELEASES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ Released on TBD (UTC).
5252
- Fixed `request_order_book_snapshot` and add Bybit support (#3416), thanks @dxwil
5353
- Fixed Redis cache buffer flushing during idle periods (#3426), thanks for reporting @santivazq
5454
- Fixed Betfair dropped fills from premature cache update
55+
- Fixed Betfair duplicate cancel event race condition
5556
- Fixed Binance Spot WebSocket subscription acknowledgment parsing (#3382), thanks @Johnkhk
5657
- Fixed Binance Futures instrument parsing for margin requirements (#3420), thanks @linimin
5758
- Fixed Binance algo order quantity `AttributeError` on _mem access

nautilus_trader/adapters/betfair/execution.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
from nautilus_trader.core.datetime import ensure_pydatetime_utc
7272
from nautilus_trader.core.datetime import millis_to_nanos
7373
from nautilus_trader.core.datetime import secs_to_nanos
74+
from nautilus_trader.core.nautilus_pyo3 import FifoCache
7475
from nautilus_trader.core.uuid import UUID4
7576
from nautilus_trader.execution.messages import CancelAllOrders
7677
from nautilus_trader.execution.messages import CancelOrder
@@ -188,6 +189,10 @@ def __init__(
188189
# Stores published executions per order to avoid duplicates and support reconciliation
189190
self._published_executions: dict[ClientOrderId, list[TradeId]] = defaultdict(list)
190191

192+
# Tracks orders for which a terminal event (cancel/expire) has been generated
193+
# to prevent duplicate events from race conditions with multiple event sources
194+
self._terminal_orders: FifoCache = FifoCache()
195+
191196
@property
192197
def instrument_provider(self) -> BetfairInstrumentProvider:
193198
"""
@@ -284,6 +289,9 @@ def _sync_fill_caches_from_orders(self) -> None:
284289
synced_count = 0
285290

286291
for order in orders:
292+
if order.is_closed:
293+
self._terminal_orders.add(order.client_order_id.value)
294+
287295
if order.filled_qty > 0:
288296
self._filled_qty_cache[order.client_order_id] = order.filled_qty
289297

@@ -300,6 +308,14 @@ def _sync_fill_caches_from_orders(self) -> None:
300308
LogColor.BLUE,
301309
)
302310

311+
def _try_mark_terminal_order(self, client_order_id: ClientOrderId) -> bool:
312+
key = client_order_id.value
313+
if key in self._terminal_orders:
314+
return False
315+
316+
self._terminal_orders.add(key)
317+
return True
318+
303319
# -- ACCOUNT HANDLERS -------------------------------------------------------------------------
304320

305321
async def _update_account_state(self) -> None:
@@ -1244,6 +1260,11 @@ def _handle_stream_execution_complete_order_update(
12441260
)
12451261
# If this is the result of a ModifyOrder, we don't want to emit a cancel
12461262
if key not in self._pending_update_order_client_ids:
1263+
# Guard against duplicate terminal events from race conditions
1264+
if not self._try_mark_terminal_order(client_order_id):
1265+
self._log.debug(f"Skipping duplicate cancel for {client_order_id!r}")
1266+
return
1267+
12471268
# The remainder of this order has been canceled
12481269
canceled_ts = self._get_canceled_timestamp(unmatched_order)
12491270
self.generate_order_canceled(
@@ -1273,11 +1294,13 @@ def _handle_stream_execution_complete_order_update(
12731294
self._log.error("Cannot handle cancel: {order.client_order_id!r} not found")
12741295
return
12751296

1276-
# Check if order is still open before generating a cancel.
1277-
# Note: A race condition exists where a closing event might still be en route
1278-
# to the execution engine. Running with this for now to avoid the complexity
1279-
# of another hot cache to deal with the lapsed bet sequencing.
1297+
# Check if order is still open before generating a cancel
12801298
if order.is_open:
1299+
# Guard against duplicate terminal events from race conditions
1300+
if not self._try_mark_terminal_order(client_order_id):
1301+
self._log.debug(f"Skipping duplicate lapse cancel for {client_order_id!r}")
1302+
return
1303+
12811304
canceled_ts = self._get_canceled_timestamp(unmatched_order)
12821305
self.generate_order_canceled(
12831306
strategy_id=order.strategy_id,

tests/integration_tests/adapters/betfair/test_betfair_execution.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,31 @@ async def test_duplicate_trade_id(exec_client, setup_order_state, fill_events, c
698698
assert fill3.trade_id.value == "75076f6b172799e168869d64df86b4d2717d"
699699

700700

701+
@pytest.mark.asyncio
702+
async def test_duplicate_cancel_events_prevented(exec_client, setup_order_state, cancel_events):
703+
"""
704+
Test that duplicate cancel events from the stream don't cause state transition
705+
errors.
706+
707+
This prevents InvalidStateTrigger: CANCELED -> CANCELED errors when the same
708+
cancel message arrives multiple times (e.g., from stream replay or reconnect).
709+
710+
"""
711+
# Arrange
712+
order_change_message = BetfairStreaming.ocm_CANCEL()
713+
await setup_order_state(order_change_message=order_change_message)
714+
715+
# Act - Send the same cancel message twice (simulating reconnect/replay)
716+
exec_client.handle_order_stream_update(order_change_message)
717+
await asyncio.sleep(0)
718+
exec_client.handle_order_stream_update(order_change_message)
719+
await asyncio.sleep(0)
720+
721+
# Assert - Only one cancel event should be generated
722+
assert len(cancel_events) == 1
723+
assert isinstance(cancel_events[0], OrderCanceled)
724+
725+
701726
@pytest.mark.parametrize(
702727
("side", "price", "quantity", "free"),
703728
[

0 commit comments

Comments
 (0)