Skip to content

Commit ad948a6

Browse files
authored
Fix: Prevent duplicate message processing in WebSocket responses (#44)
* fix: skip duplication message in ws responses test:feat: check that duplication msg skip and remove from cache * feat: missed license in file * feat: handle missed Update order event by cache skip duplication
1 parent 324bf92 commit ad948a6

File tree

3 files changed

+310
-5
lines changed

3 files changed

+310
-5
lines changed

QuantConnect.AlpacaBrokerage.Tests/AlpacaBrokerageTests.cs

Lines changed: 215 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,17 @@
2525
using System.Collections.Generic;
2626
using System.Linq;
2727
using System.Threading;
28+
using TradeEvent = Alpaca.Markets.TradeEvent;
29+
using QuantConnect.Brokerages.Alpaca.Tests.Models;
2830
using static QuantConnect.Brokerages.Alpaca.Tests.AlpacaBrokerageAdditionalTests;
2931

3032
namespace QuantConnect.Brokerages.Alpaca.Tests
3133
{
3234
[TestFixture]
3335
public partial class AlpacaBrokerageTests : BrokerageTests
3436
{
37+
private TestAlpacaBrokerage AlpacaBrokerage => Brokerage as TestAlpacaBrokerage;
38+
3539
protected override Symbol Symbol { get; } = Symbols.AAPL;
3640
protected override SecurityType SecurityType { get; }
3741

@@ -46,7 +50,7 @@ protected override IBrokerage CreateBrokerage(IOrderProvider orderProvider, ISec
4650
protected override bool IsAsync() => false;
4751
protected override decimal GetAskPrice(Symbol symbol)
4852
{
49-
return (Brokerage as TestAlpacaBrokerage).GetLatestQuotePublic(symbol).AskPrice;
53+
return AlpacaBrokerage.GetLatestQuotePublic(symbol).AskPrice;
5054
}
5155

5256
/// <summary>
@@ -418,5 +422,215 @@ public void PlaceOutsideRegularHoursLimitOrders()
418422
Assert.IsTrue(Brokerage.PlaceOrder(limitOrder));
419423
Assert.IsTrue(submittedOrderEvent.WaitOne(TimeSpan.FromSeconds(5)));
420424
}
425+
426+
[Test]
427+
public void HandleTradeUpdateNormalCase()
428+
{
429+
// pending_new -> new -> partial_fill -> fill
430+
// the same to all trade updates
431+
var orderId = Guid.NewGuid();
432+
433+
var order = new LimitOrder(Symbols.AAPL, 3m, 1m, default);
434+
order.BrokerId.Add(orderId.ToString());
435+
OrderProvider.Add(order);
436+
437+
var tradeUpdates = new List<TestTradeUpdate>(4)
438+
{
439+
new(TradeEvent.PendingNew, null, new TestOrder(orderId)),
440+
new(TradeEvent.New, Guid.NewGuid(), new TestOrder(orderId)),
441+
new(TradeEvent.PartialFill, Guid.NewGuid(), new TestOrder(orderId, 1)),
442+
new(TradeEvent.Fill, Guid.NewGuid(), new TestOrder(orderId, 3))
443+
};
444+
445+
foreach (var tradeUpdate in tradeUpdates)
446+
{
447+
AlpacaBrokerage.HandleTradeUpdate(tradeUpdate);
448+
449+
switch (tradeUpdate.Event)
450+
{
451+
case TradeEvent.PendingNew:
452+
case TradeEvent.New:
453+
Assert.AreEqual(1, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId.Count);
454+
break;
455+
case TradeEvent.PartialFill:
456+
Assert.AreEqual(1, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId.Count);
457+
Assert.AreEqual(1, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId[orderId].Count);
458+
break;
459+
case TradeEvent.Fill:
460+
Assert.AreEqual(0, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId.Count);
461+
break;
462+
}
463+
}
464+
}
465+
466+
[Test]
467+
public void HandleTradeUpdateShouldSkipDuplication()
468+
{
469+
// pending_new -> new -> partial_fill -> fill
470+
// the same to all trade updates
471+
var orderId = Guid.NewGuid();
472+
473+
var order = new LimitOrder(Symbols.AAPL, 3m, 1m, default);
474+
order.BrokerId.Add(orderId.ToString());
475+
476+
OrderProvider.Add(order);
477+
478+
var tradeUpdates = new List<TestTradeUpdate>()
479+
{
480+
new(TradeEvent.PendingNew, null, new TestOrder(orderId)),
481+
new(TradeEvent.New, Guid.NewGuid(), new TestOrder(orderId))
482+
};
483+
484+
var partialFillExecutionId_1 = Guid.NewGuid();
485+
var partialFill_1 = new TestTradeUpdate(TradeEvent.PartialFill, partialFillExecutionId_1, new TestOrder(orderId, 1));
486+
487+
tradeUpdates.Add(partialFill_1);
488+
tradeUpdates.Add(partialFill_1);
489+
490+
var partialFillExecutionId_2 = Guid.NewGuid();
491+
var partialFill_2 = new TestTradeUpdate(TradeEvent.PartialFill, partialFillExecutionId_2, new TestOrder(orderId, 2));
492+
493+
tradeUpdates.Add(partialFill_2);
494+
tradeUpdates.Add(partialFill_2);
495+
496+
var fillExecutionId = Guid.NewGuid();
497+
var fill = new TestTradeUpdate(TradeEvent.Fill, fillExecutionId, new TestOrder(orderId, 3));
498+
499+
tradeUpdates.Add(fill);
500+
tradeUpdates.Add(fill);
501+
502+
foreach (var tradeUpdate in tradeUpdates)
503+
{
504+
AlpacaBrokerage.HandleTradeUpdate(tradeUpdate);
505+
506+
switch (tradeUpdate.Event)
507+
{
508+
case TradeEvent.PendingNew:
509+
case TradeEvent.New:
510+
Assert.AreEqual(1, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId.Count);
511+
break;
512+
case TradeEvent.PartialFill when tradeUpdate.ExecutionId.Equals(partialFillExecutionId_1):
513+
Assert.AreEqual(1, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId.Count);
514+
Assert.AreEqual(1, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId[orderId].Count);
515+
break;
516+
case TradeEvent.PartialFill when tradeUpdate.ExecutionId.Equals(partialFillExecutionId_2):
517+
Assert.AreEqual(1, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId.Count);
518+
Assert.AreEqual(2, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId[orderId].Count);
519+
break;
520+
case TradeEvent.Fill when tradeUpdate.ExecutionId.Equals(fillExecutionId):
521+
Assert.AreEqual(0, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId.Count);
522+
break;
523+
}
524+
}
525+
}
526+
527+
[Test]
528+
public void HandleTradeUpdateShouldSkipCanceledDuplication()
529+
{
530+
// pending_new -> new -> partial_fill -> fill
531+
// the same to all trade updates
532+
var orderId = Guid.NewGuid();
533+
534+
var order = new LimitOrder(Symbols.AAPL, 3m, 1m, default);
535+
order.BrokerId.Add(orderId.ToString());
536+
OrderProvider.Add(order);
537+
538+
var tradeUpdates = new List<TestTradeUpdate>()
539+
{
540+
new(TradeEvent.PendingNew, null, new TestOrder(orderId)),
541+
new(TradeEvent.New, Guid.NewGuid(), new TestOrder(orderId))
542+
};
543+
544+
var pendingCancelExecutionId = Guid.NewGuid();
545+
var pendingCancel = new TestTradeUpdate(TradeEvent.PendingCancel, pendingCancelExecutionId, new TestOrder(orderId, 1));
546+
547+
tradeUpdates.Add(pendingCancel);
548+
tradeUpdates.Add(pendingCancel);
549+
550+
var cancelExecutionId = Guid.NewGuid();
551+
var cancel = new TestTradeUpdate(TradeEvent.Canceled, cancelExecutionId, new TestOrder(orderId, 1));
552+
553+
tradeUpdates.Add(cancel);
554+
tradeUpdates.Add(cancel);
555+
556+
foreach (var tradeUpdate in tradeUpdates)
557+
{
558+
AlpacaBrokerage.HandleTradeUpdate(tradeUpdate);
559+
560+
switch (tradeUpdate.Event)
561+
{
562+
case TradeEvent.PendingNew:
563+
case TradeEvent.New:
564+
Assert.AreEqual(1, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId.Count);
565+
Assert.AreEqual(0, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId[orderId].Count);
566+
break;
567+
case TradeEvent.PendingCancel:
568+
Assert.AreEqual(1, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId.Count);
569+
Assert.AreEqual(0, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId[orderId].Count);
570+
break;
571+
case TradeEvent.Canceled:
572+
Assert.AreEqual(0, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId.Count);
573+
break;
574+
}
575+
}
576+
}
577+
578+
[Test]
579+
public void HandleTradeUpdateShouldSkipUpdateDuplication()
580+
{
581+
// pending_new -> new -> replaced -> canceled
582+
// the same to all trade updates
583+
var orderId = Guid.NewGuid();
584+
585+
var order = new LimitOrder(Symbols.AAPL, 3m, 1m, default);
586+
order.BrokerId.Add(orderId.ToString());
587+
OrderProvider.Add(order);
588+
589+
// Call: GetOpenOrders() has added already order
590+
AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId[orderId] = [];
591+
592+
var tradeUpdates = new List<TestTradeUpdate>()
593+
{
594+
};
595+
596+
// Call: UpdateOrder
597+
var oldOrderId = orderId;
598+
orderId = Guid.NewGuid();
599+
order.BrokerId.Add(orderId.ToString());
600+
601+
var replacedExecutionId = Guid.NewGuid();
602+
var replaced = new TestTradeUpdate(TradeEvent.Replaced, replacedExecutionId, new TestOrder(oldOrderId, 0) { ReplacedByOrderId = orderId });
603+
604+
tradeUpdates.Add(replaced);
605+
tradeUpdates.Add(replaced);
606+
607+
var pendingCancelExecutionId = Guid.NewGuid();
608+
var pendingCancel = new TestTradeUpdate(TradeEvent.PendingCancel, pendingCancelExecutionId, new TestOrder(orderId, 1));
609+
610+
tradeUpdates.Add(pendingCancel);
611+
tradeUpdates.Add(pendingCancel);
612+
613+
var cancelExecutionId = Guid.NewGuid();
614+
var cancel = new TestTradeUpdate(TradeEvent.Canceled, cancelExecutionId, new TestOrder(orderId, 1));
615+
616+
tradeUpdates.Add(cancel);
617+
tradeUpdates.Add(cancel);
618+
619+
foreach (var tradeUpdate in tradeUpdates)
620+
{
621+
AlpacaBrokerage.HandleTradeUpdate(tradeUpdate);
622+
623+
switch (tradeUpdate.Event)
624+
{
625+
case TradeEvent.Replaced:
626+
Assert.AreEqual(1, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId.Count);
627+
Assert.AreEqual(0, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId[orderId].Count);
628+
break;
629+
case TradeEvent.Canceled:
630+
Assert.AreEqual(0, AlpacaBrokerage._duplicationExecutionOrderIdByBrokerageOrderId.Count);
631+
break;
632+
}
633+
}
634+
}
421635
}
422636
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3+
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System;
17+
using Alpaca.Markets;
18+
using System.Collections.Generic;
19+
20+
namespace QuantConnect.Brokerages.Alpaca.Tests.Models;
21+
22+
public record TestTradeUpdate(TradeEvent Event, Guid? ExecutionId, IOrder Order) : ITradeUpdate
23+
{
24+
public decimal? Price { get; init; } = 1;
25+
public DateTime? TimestampUtc { get; init; }
26+
public decimal? PositionQuantity { get; init; }
27+
public long? PositionIntegerQuantity { get; init; }
28+
public decimal? TradeQuantity { get; init; }
29+
public long? TradeIntegerQuantity { get; init; }
30+
}
31+
32+
public record TestOrder(Guid OrderId, decimal FilledQuantity = 0) : IOrder
33+
{
34+
public string Symbol { get; init; } = "AAPL";
35+
public OrderSide OrderSide { get; init; }
36+
public AssetClass AssetClass { get; init; }
37+
public string ClientOrderId { get; init; }
38+
public DateTime? CreatedAtUtc { get; init; }
39+
public DateTime? UpdatedAtUtc { get; init; }
40+
public DateTime? SubmittedAtUtc { get; init; }
41+
public DateTime? FilledAtUtc { get; init; }
42+
public DateTime? ExpiredAtUtc { get; init; }
43+
public DateTime? CancelledAtUtc { get; init; }
44+
public DateTime? FailedAtUtc { get; init; }
45+
public DateTime? ReplacedAtUtc { get; init; }
46+
public Guid AssetId { get; init; }
47+
public decimal? Notional { get; init; }
48+
public decimal? Quantity { get; init; }
49+
public long IntegerQuantity { get; init; }
50+
public long IntegerFilledQuantity { get; init; }
51+
public OrderType OrderType { get; init; }
52+
public OrderClass OrderClass { get; init; }
53+
public TimeInForce TimeInForce { get; init; }
54+
public decimal? LimitPrice { get; init; }
55+
public decimal? StopPrice { get; init; }
56+
public decimal? TrailOffsetInDollars { get; init; }
57+
public decimal? TrailOffsetInPercent { get; init; }
58+
public decimal? HighWaterMark { get; init; }
59+
public decimal? AverageFillPrice { get; init; }
60+
public OrderStatus OrderStatus { get; init; }
61+
public Guid? ReplacedByOrderId { get; init; }
62+
public Guid? ReplacesOrderId { get; init; }
63+
public IReadOnlyList<IOrder> Legs { get; init; }
64+
}

QuantConnect.AlpacaBrokerage/AlpacaBrokerage.cs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@
4040
using QuantConnect.Brokerages.CrossZero;
4141
using System.Collections.Concurrent;
4242
using System.Threading;
43+
using System.Runtime.CompilerServices;
44+
45+
[assembly: InternalsVisibleTo("QuantConnect.Brokerages.Alpaca.Tests")]
4346

4447
namespace QuantConnect.Brokerages.Alpaca
4548
{
@@ -74,6 +77,11 @@ public partial class AlpacaBrokerage : Brokerage
7477
private readonly ManualResetEvent _reconnectionResetEvent = new(false);
7578
private readonly CancellationTokenSource _cancellationTokenSource = new();
7679

80+
/// <summary>
81+
/// Maps each brokerage order ID to a set of execution IDs, used to detect and skip duplicate trade updates.
82+
/// </summary>
83+
internal readonly Dictionary<Guid, HashSet<Guid>> _duplicationExecutionOrderIdByBrokerageOrderId = [];
84+
7785
/// <summary>
7886
/// Returns true if we're currently connected to the broker
7987
/// </summary>
@@ -319,7 +327,9 @@ public override List<Order> GetOpenOrders()
319327
leanOrder.Status = Orders.OrderStatus.PartiallyFilled;
320328
}
321329

322-
leanOrder.BrokerId.Add(brokerageOrder.OrderId.ToString());
330+
var brokerageOrderId = brokerageOrder.OrderId;
331+
_duplicationExecutionOrderIdByBrokerageOrderId[brokerageOrderId] = [];
332+
leanOrder.BrokerId.Add(brokerageOrderId.ToString());
323333
leanOrders.Add(leanOrder);
324334
}
325335

@@ -403,7 +413,7 @@ public override bool PlaceOrder(Order order)
403413
return true;
404414
}
405415

406-
private void HandleTradeUpdate(ITradeUpdate obj)
416+
internal void HandleTradeUpdate(ITradeUpdate obj)
407417
{
408418
try
409419
{
@@ -427,15 +437,32 @@ private void HandleTradeUpdate(ITradeUpdate obj)
427437
case TradeEvent.New:
428438
case TradeEvent.PendingNew:
429439
// we don't send anything for this event
440+
_duplicationExecutionOrderIdByBrokerageOrderId.TryAdd(obj.Order.OrderId, []);
430441
return;
431442
case TradeEvent.Rejected:
432443
case TradeEvent.Canceled:
433444
case TradeEvent.Replaced:
434-
OnOrderEvent(new OrderEvent(leanOrder, DateTime.UtcNow, OrderFee.Zero, $"{nameof(AlpacaBrokerage)} Order Event") { Status = newLeanOrderStatus });
445+
if (_duplicationExecutionOrderIdByBrokerageOrderId.Remove(obj.Order.OrderId))
446+
{
447+
if (newLeanOrderStatus == Orders.OrderStatus.UpdateSubmitted)
448+
{
449+
_duplicationExecutionOrderIdByBrokerageOrderId[obj.Order.ReplacedByOrderId.Value] = [];
450+
}
451+
OnOrderEvent(new OrderEvent(leanOrder, DateTime.UtcNow, OrderFee.Zero, $"{nameof(AlpacaBrokerage)} Order Event") { Status = newLeanOrderStatus });
452+
}
435453
return;
436454
case TradeEvent.Fill:
455+
if (_duplicationExecutionOrderIdByBrokerageOrderId.Remove(obj.Order.OrderId))
456+
{
457+
break;
458+
}
459+
return;
437460
case TradeEvent.PartialFill:
438-
break;
461+
if (_duplicationExecutionOrderIdByBrokerageOrderId[obj.Order.OrderId].Add(obj.ExecutionId.Value))
462+
{
463+
break;
464+
}
465+
return;
439466
case TradeEvent.Accepted:
440467
case TradeEvent.PendingReplace:
441468
case TradeEvent.PendingCancel:

0 commit comments

Comments
 (0)