Skip to content

Commit c55a064

Browse files
creatidyAdrian TkaczMartin-Molinero
authored
Fix combo order queue affinity to prevent duplicate brokerage submissions (#9293)
* Enhance order request processing by grouping requests and adding concurrency tests * Minor adjustment --------- Co-authored-by: Adrian Tkacz <adrian.tkacz@creatdy.com> Co-authored-by: Martin Molinero <martin.molinero1@gmail.com>
1 parent 192a093 commit c55a064

2 files changed

Lines changed: 63 additions & 7 deletions

File tree

Engine/TransactionHandlers/BrokerageTransactionHandler.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ public OrderTicket AddOrder(SubmitOrderRequest request)
333333
order.OrderSubmissionData = new OrderSubmissionData(security.BidPrice, security.AskPrice, security.Close);
334334
_openOrders[order.Id] = new OpenOrderState(order, ticket, security);
335335

336-
EnqueueOrderRequest(request);
336+
EnqueueOrderRequest(request, order);
337337

338338
WaitForOrderSubmission(ticket);
339339
}
@@ -449,7 +449,7 @@ public OrderTicket UpdateOrder(UpdateOrderRequest request)
449449
else
450450
{
451451
request.SetResponse(OrderResponse.Success(request), OrderRequestStatus.Processing);
452-
EnqueueOrderRequest(request);
452+
EnqueueOrderRequest(request, order);
453453
}
454454
}
455455
catch (Exception err)
@@ -521,7 +521,7 @@ public OrderTicket CancelOrder(CancelOrderRequest request)
521521

522522
// send the request to be processed
523523
request.SetResponse(OrderResponse.Success(request), OrderRequestStatus.Processing);
524-
EnqueueOrderRequest(request);
524+
EnqueueOrderRequest(request, order);
525525
}
526526
}
527527
catch (Exception err)
@@ -1931,9 +1931,14 @@ private string GetShortableErrorMessage(Symbol symbol, decimal quantity)
19311931
return $"Order exceeds shortable quantity {shortableQuantity} for Symbol {symbol} requested {quantity})";
19321932
}
19331933

1934-
private void EnqueueOrderRequest(OrderRequest request)
1934+
private void EnqueueOrderRequest(OrderRequest request, Order order)
19351935
{
1936-
_orderRequestQueues[request.OrderId % _orderRequestQueues.Count].Add(request);
1936+
var queueKey = request.OrderId;
1937+
if (order.GroupOrderManager?.Id > 0)
1938+
{
1939+
queueKey = order.GroupOrderManager.Id;
1940+
}
1941+
_orderRequestQueues[queueKey % _orderRequestQueues.Count].Add(request);
19371942
}
19381943

19391944
/// <summary>
@@ -1956,4 +1961,3 @@ public OpenOrderState(Order order, OrderTicket ticket, Security security)
19561961
}
19571962
}
19581963
}
1959-

Tests/Engine/BrokerageTransactionHandlerTests/BrokerageTransactionHandlerTests.cs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2466,6 +2466,54 @@ public void ProcessesOrdersConcurrently()
24662466
}
24672467
}
24682468

2469+
[Test]
2470+
public void ProcessesComboRequestsOnSameThreadWhenConcurrencyIsEnabled()
2471+
{
2472+
var algorithm = new TestAlgorithm();
2473+
using var brokerage = new TestingConcurrentBrokerage();
2474+
2475+
const int expectedOrdersCount = 2;
2476+
using var finishedEvent = new ManualResetEventSlim(false);
2477+
var transactionHandler = new TestableConcurrentBrokerageTransactionHandler(expectedOrdersCount, finishedEvent);
2478+
transactionHandler.Initialize(algorithm, brokerage, new BacktestingResultHandler());
2479+
2480+
try
2481+
{
2482+
algorithm.Transactions.SetOrderProcessor(transactionHandler);
2483+
algorithm.SetCash(100000);
2484+
algorithm.SetFinishedWarmingUp();
2485+
2486+
var security1 = (Security)algorithm.AddEquity("SPY");
2487+
var security2 = (Security)algorithm.AddEquity("AAPL");
2488+
2489+
var reference = new DateTime(2025, 07, 03, 10, 0, 0);
2490+
security1.SetMarketPrice(new Tick(reference, security1.Symbol, 500, 500));
2491+
security2.SetMarketPrice(new Tick(reference, security2.Symbol, 200, 200));
2492+
2493+
var groupOrderManager = new GroupOrderManager(1, 2, -1, 1m);
2494+
var orderRequest1 = new SubmitOrderRequest(OrderType.ComboLimit, security1.Type, security1.Symbol, -1, 1m, 0, reference, "",
2495+
groupOrderManager: groupOrderManager);
2496+
var orderRequest2 = new SubmitOrderRequest(OrderType.ComboLimit, security2.Type, security2.Symbol, 1, 1m, 0, reference, "",
2497+
groupOrderManager: groupOrderManager);
2498+
2499+
orderRequest1.SetOrderId(1);
2500+
orderRequest2.SetOrderId(2);
2501+
2502+
transactionHandler.Process(orderRequest1);
2503+
transactionHandler.Process(orderRequest2);
2504+
2505+
Assert.IsTrue(finishedEvent.Wait(10000));
2506+
2507+
Assert.IsTrue(transactionHandler.RequestProcessingThreads.TryGetValue(orderRequest1.OrderId, out var order1Thread));
2508+
Assert.IsTrue(transactionHandler.RequestProcessingThreads.TryGetValue(orderRequest2.OrderId, out var order2Thread));
2509+
Assert.AreEqual(order1Thread, order2Thread);
2510+
}
2511+
finally
2512+
{
2513+
transactionHandler.Exit();
2514+
}
2515+
}
2516+
24692517
[TestCase("OnAccountChanged")]
24702518
[TestCase("OnOptionNotification")]
24712519
[TestCase("OnNewBrokerageOrderNotification")]
@@ -2825,6 +2873,8 @@ private class TestableConcurrentBrokerageTransactionHandler : BrokerageTransacti
28252873

28262874
public ConcurrentBag<OrderRequest> ProcessedRequests = new();
28272875

2876+
public ConcurrentDictionary<int, string> RequestProcessingThreads = new();
2877+
28282878
public TestableConcurrentBrokerageTransactionHandler(int expectedOrdersCount, ManualResetEventSlim finishedEvent)
28292879
{
28302880
_expectedOrdersCount = expectedOrdersCount;
@@ -2836,10 +2886,12 @@ public override void HandleOrderRequest(OrderRequest request)
28362886
base.HandleOrderRequest(request);
28372887

28382888
// Capture the thread name for debugging purposes
2889+
var threadName = Thread.CurrentThread.Name ?? Environment.CurrentManagedThreadId.ToString();
28392890
lock (ProcessingThreadNames)
28402891
{
2841-
ProcessingThreadNames.Add(Thread.CurrentThread.Name ?? Environment.CurrentManagedThreadId.ToString());
2892+
ProcessingThreadNames.Add(threadName);
28422893
}
2894+
RequestProcessingThreads[request.OrderId] = threadName;
28432895

28442896
ProcessedRequests.Add(request);
28452897

0 commit comments

Comments
 (0)