Skip to content

Commit d329960

Browse files
authored
Enable trades results streaming (#9234)
* Add a temporary benchmark stats sample in live Allow trades to be written to result file on first day of the deployment before the first daily sample is done * Enable trades results streaming * Minor changes * Make Trade.Id a Guid * Cleanup * Sample temporary charts once per hour * Minor change * Minor fix * Minor fix * Minor fix * Add fix for potential race condition
1 parent 2d64537 commit d329960

6 files changed

Lines changed: 169 additions & 30 deletions

File tree

Common/Statistics/AlgorithmPerformance.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,15 @@ public AlgorithmPerformance()
8383
ClosedTrades = new List<Trade>();
8484
}
8585

86+
/// <summary>
87+
/// Initializes a new instance of the <see cref="AlgorithmPerformance"/> class
88+
/// </summary>
89+
/// <param name="other">The performance instance to use as a base</param>
90+
public AlgorithmPerformance(AlgorithmPerformance other)
91+
{
92+
TradeStatistics = other.TradeStatistics;
93+
PortfolioStatistics = other.PortfolioStatistics;
94+
ClosedTrades = other.ClosedTrades;
95+
}
8696
}
8797
}

Common/Statistics/Trade.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ public class Trade
2626
{
2727
private List<Symbol> _symbols;
2828

29+
/// <summary>
30+
/// A unique identifier for the trade
31+
/// </summary>
32+
public string Id { get; set; }
33+
2934
/// <summary>
3035
/// The symbol of the traded instrument
3136
/// </summary>

Common/Statistics/TradeBuilder.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,7 @@ private void AddNewTrade(Trade trade, OrderEvent fill)
615615
? fill.IsWin(security, trade.ProfitLoss)
616616
: trade.ProfitLoss > 0;
617617

618+
trade.Id = Guid.NewGuid().ToString();
618619
_closedTrades.Add(trade);
619620

620621
// Due to memory constraints in live mode, we cap the number of trades

Engine/Results/BacktestingResultHandler.cs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ private void Update()
197197
}
198198

199199
//Get the runtime statistics from the user algorithm:
200-
var summary = GenerateStatisticsResults(performanceCharts, estimatedStrategyCapacity: _capacityEstimate).Summary;
201-
var runtimeStatistics = GetAlgorithmRuntimeStatistics(summary, _capacityEstimate);
200+
var statisticsResult = GenerateStatisticsResults(performanceCharts, estimatedStrategyCapacity: _capacityEstimate);
201+
var runtimeStatistics = GetAlgorithmRuntimeStatistics(statisticsResult.Summary, _capacityEstimate);
202202

203203
var progress = _progressMonitor.Progress;
204204

@@ -225,8 +225,16 @@ private void Update()
225225
_nextS3Update = DateTime.UtcNow.AddSeconds(30);
226226
}
227227

228+
var deltaTrades = GetDeltaTrades(statisticsResult.TotalPerformance.ClosedTrades, LastTradeId, shouldStop: tradeCount => tradeCount >= 50);
229+
// Deliberately skip to the end of trade collection to prevent overloading backtesting UX
230+
if (statisticsResult.TotalPerformance.ClosedTrades.Count > 0)
231+
{
232+
LastTradeId = statisticsResult.TotalPerformance.ClosedTrades[^1].Id;
233+
}
234+
var algorithmPerformance = new AlgorithmPerformance(statisticsResult.TotalPerformance) { ClosedTrades = deltaTrades };
235+
228236
//2. Backtest Update -> Send the truncated packet to the backtester:
229-
var splitPackets = SplitPackets(deltaCharts, deltaOrders, runtimeStatistics, progress, serverStatistics);
237+
var splitPackets = SplitPackets(deltaCharts, deltaOrders, runtimeStatistics, progress, serverStatistics, algorithmPerformance);
230238

231239
foreach (var backtestingPacket in splitPackets)
232240
{
@@ -245,7 +253,9 @@ private void Update()
245253
/// <summary>
246254
/// Run over all the data and break it into smaller packets to ensure they all arrive at the terminal
247255
/// </summary>
248-
public virtual IEnumerable<BacktestResultPacket> SplitPackets(Dictionary<string, Chart> deltaCharts, Dictionary<int, Order> deltaOrders, SortedDictionary<string, string> runtimeStatistics, decimal progress, Dictionary<string, string> serverStatistics)
256+
public virtual IEnumerable<BacktestResultPacket> SplitPackets(Dictionary<string, Chart> deltaCharts, Dictionary<int, Order> deltaOrders,
257+
SortedDictionary<string, string> runtimeStatistics, decimal progress, Dictionary<string, string> serverStatistics,
258+
AlgorithmPerformance algorithmPerformance)
249259
{
250260
// break the charts into groups
251261
var splitPackets = new List<BacktestResultPacket>();
@@ -267,6 +277,13 @@ public virtual IEnumerable<BacktestResultPacket> SplitPackets(Dictionary<string,
267277
splitPackets.Add(new BacktestResultPacket(_job, new BacktestResult { Orders = deltaOrders }, Algorithm.EndDate, Algorithm.StartDate, progress));
268278
}
269279

280+
// only send trades if there is actually any update
281+
if (algorithmPerformance.ClosedTrades != null && algorithmPerformance.ClosedTrades.Count > 0)
282+
{
283+
// Add the trades into the charting packet:
284+
splitPackets.Add(new BacktestResultPacket(_job, new BacktestResult { TotalPerformance = algorithmPerformance }, Algorithm.EndDate, Algorithm.StartDate, progress));
285+
}
286+
270287
//Add any user runtime statistics into the backtest.
271288
splitPackets.Add(new BacktestResultPacket(_job, new BacktestResult { ServerStatistics = serverStatistics, RuntimeStatistics = runtimeStatistics }, Algorithm.EndDate, Algorithm.StartDate, progress));
272289

Engine/Results/BaseResultsHandler.cs

Lines changed: 116 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,6 @@
1414
*
1515
*/
1616

17-
using System;
18-
using System.Collections.Concurrent;
19-
using System.Collections.Generic;
20-
using System.IO;
21-
using System.Linq;
22-
using System.Threading;
2317
using Newtonsoft.Json;
2418
using Newtonsoft.Json.Serialization;
2519
using QuantConnect.Data.Market;
@@ -34,6 +28,12 @@
3428
using QuantConnect.Securities.Positions;
3529
using QuantConnect.Statistics;
3630
using QuantConnect.Util;
31+
using System;
32+
using System.Collections.Concurrent;
33+
using System.Collections.Generic;
34+
using System.IO;
35+
using System.Linq;
36+
using System.Threading;
3737

3838
namespace QuantConnect.Lean.Engine.Results
3939
{
@@ -59,6 +59,11 @@ public abstract class BaseResultsHandler
5959

6060
private Bar _currentAlgorithmEquity;
6161

62+
private List<ISeriesPoint> _temporaryPerformanceValues;
63+
private List<ISeriesPoint> _temporaryBenchmarkValues;
64+
private DateTime _temporaryChartsLastSampleTime;
65+
private object _temporaryChartsLock = new();
66+
6267
/// <summary>
6368
/// String message saying: Strategy Equity
6469
/// </summary>
@@ -114,6 +119,11 @@ public abstract class BaseResultsHandler
114119
/// </summary>
115120
protected int LastDeltaOrderPosition { get; set; }
116121

122+
/// <summary>
123+
/// The last position consumed from the <see cref="TradeBuilder.ClosedTrades"/> by <see cref="GetDeltaTrades"/>
124+
/// </summary>
125+
protected string LastTradeId { get; set; }
126+
117127
/// <summary>
118128
/// The last position consumed from the <see cref="ITransactionHandler.OrderEvents"/> while determining delta order events
119129
/// </summary>
@@ -122,7 +132,7 @@ public abstract class BaseResultsHandler
122132
/// <summary>
123133
/// Serializer settings to use
124134
/// </summary>
125-
protected JsonSerializerSettings SerializerSettings { get; set; } = new ()
135+
protected JsonSerializerSettings SerializerSettings { get; set; } = new()
126136
{
127137
ContractResolver = new DefaultContractResolver
128138
{
@@ -446,6 +456,29 @@ protected virtual Dictionary<int, Order> GetDeltaOrders(int orderEventsStartPosi
446456
return deltaOrders;
447457
}
448458

459+
/// <summary>
460+
/// Gets the trades generated starting from the provided <see cref="TradeBuilder.ClosedTrades"/> position,
461+
/// which is determined by the <see cref="LastTradeId"/> and the <see cref="Trade.Id"/>
462+
/// </summary>
463+
/// <returns>The delta trades</returns>
464+
protected virtual List<Trade> GetDeltaTrades(List<Trade> trades, string lastTradeId, Func<int, bool> shouldStop)
465+
{
466+
var lastTradeIndex = trades.FindIndex(x => x.Id == lastTradeId);
467+
List<Trade> deltaTrades = null;
468+
foreach (var trade in trades.Skip(lastTradeIndex + 1))
469+
{
470+
LastTradeId = trade.Id;
471+
deltaTrades ??= new List<Trade>();
472+
deltaTrades.Add(trade);
473+
if (shouldStop(deltaTrades.Count))
474+
{
475+
break;
476+
}
477+
}
478+
479+
return deltaTrades;
480+
}
481+
449482
/// <summary>
450483
/// Initialize the result handler with this result packet.
451484
/// </summary>
@@ -466,7 +499,7 @@ public virtual void Initialize(ResultHandlerInitializeParameters parameters)
466499

467500
SerializerSettings = new()
468501
{
469-
Converters = new [] { new OrderEventJsonConverter(AlgorithmId) },
502+
Converters = new[] { new OrderEventJsonConverter(AlgorithmId) },
470503
ContractResolver = new DefaultContractResolver
471504
{
472505
NamingStrategy = new CamelCaseNamingStrategy
@@ -636,9 +669,8 @@ public virtual void Sample(DateTime time)
636669
// Force an update for our values before doing our daily sample
637670
UpdatePortfolioValues(time);
638671
UpdateBenchmarkValue(time);
639-
640672
var currentPortfolioValue = GetPortfolioValue();
641-
var portfolioPerformance = DailyPortfolioValue == 0 ? 0 : Math.Round((currentPortfolioValue - DailyPortfolioValue) * 100 / DailyPortfolioValue, 10);
673+
var portfolioPerformance = GetPortfolioPerformance(currentPortfolioValue);
642674

643675
// Update our max portfolio value
644676
CumulativeMaxPortfolioValue = Math.Max(currentPortfolioValue, CumulativeMaxPortfolioValue);
@@ -659,6 +691,11 @@ public virtual void Sample(DateTime time)
659691
DailyPortfolioValue = currentPortfolioValue;
660692
}
661693

694+
private decimal GetPortfolioPerformance(decimal currentPortfolioValue)
695+
{
696+
return DailyPortfolioValue == 0 ? 0 : Math.Round((currentPortfolioValue - DailyPortfolioValue) * 100 / DailyPortfolioValue, 10);
697+
}
698+
662699
private void SamplePortfolioMargin(DateTime algorithmUtcTime, decimal currentPortfolioValue)
663700
{
664701
var state = PortfolioState.Create(Algorithm.Portfolio, algorithmUtcTime, currentPortfolioValue);
@@ -959,27 +996,82 @@ protected StatisticsResults GenerateStatisticsResults(Dictionary<string, Chart>
959996
// make sure we've taken samples for these series before just blindly requesting them
960997
if (charts.TryGetValue(StrategyEquityKey, out var strategyEquity) &&
961998
strategyEquity.Series.TryGetValue(EquityKey, out var equity) &&
962-
strategyEquity.Series.TryGetValue(ReturnKey, out var performance) &&
963-
charts.TryGetValue(BenchmarkKey, out var benchmarkChart) &&
964-
benchmarkChart.Series.TryGetValue(BenchmarkKey, out var benchmark))
999+
equity.Values.Count > 0)
9651000
{
966-
var trades = Algorithm.TradeBuilder.ClosedTrades;
967-
968-
BaseSeries portfolioTurnover;
969-
if (charts.TryGetValue(PortfolioTurnoverKey, out var portfolioTurnoverChart))
1001+
List<ISeriesPoint> performanceValues = null;
1002+
List<ISeriesPoint> benchmarkValues = null;
1003+
if (strategyEquity.Series.TryGetValue(ReturnKey, out var performance) &&
1004+
charts.TryGetValue(BenchmarkKey, out var benchmarkChart) &&
1005+
benchmarkChart.Series.TryGetValue(BenchmarkKey, out var benchmark))
9701006
{
971-
portfolioTurnoverChart.Series.TryGetValue(PortfolioTurnoverKey, out portfolioTurnover);
1007+
performanceValues = performance.Values;
1008+
benchmarkValues = benchmark.Values;
1009+
1010+
// Clear temporary values, free memory. We don't need them anymore
1011+
if (_temporaryPerformanceValues != null && _temporaryBenchmarkValues != null)
1012+
{
1013+
lock (_temporaryChartsLock)
1014+
{
1015+
_temporaryPerformanceValues = null;
1016+
_temporaryBenchmarkValues = null;
1017+
}
1018+
}
9721019
}
9731020
else
9741021
{
975-
portfolioTurnover = new Series();
1022+
lock (_temporaryChartsLock)
1023+
{
1024+
if (Algorithm.UtcTime - _temporaryChartsLastSampleTime >= TimeSpan.FromHours(1))
1025+
{
1026+
// We don't have performance and/or benchmark values sampled, likely because we are on the first day of the algo
1027+
// and we only sample at the end of the day. In this case we will create temporary values for performance and benchmark
1028+
// so that we can generate statistics and write trades to the result files
1029+
1030+
// Let's force update and sample both performance and benchmark at the current time since they need to be aligned
1031+
var currentPortfolioValue = GetPortfolioValue();
1032+
var portfolioPerformance = GetPortfolioPerformance(currentPortfolioValue);
1033+
1034+
if (portfolioPerformance != 0)
1035+
{
1036+
performanceValues = _temporaryPerformanceValues ??= new List<ISeriesPoint>();
1037+
performanceValues.Add(new ChartPoint(Algorithm.UtcTime, portfolioPerformance));
1038+
benchmarkValues = _temporaryBenchmarkValues ??= new List<ISeriesPoint>();
1039+
benchmarkValues.Add(new ChartPoint(Algorithm.UtcTime, GetBenchmarkValue()));
1040+
_temporaryChartsLastSampleTime = Algorithm.UtcTime;
1041+
}
1042+
}
1043+
1044+
if (performanceValues != null && benchmarkValues != null)
1045+
{
1046+
performanceValues = [.. performanceValues];
1047+
benchmarkValues = [.. benchmarkValues];
1048+
}
1049+
}
9761050
}
9771051

978-
statisticsResults = StatisticsBuilder.Generate(trades, profitLoss, equity.Values, performance.Values, benchmark.Values,
979-
portfolioTurnover.Values, StartingPortfolioValue, Algorithm.Portfolio.TotalFees, TotalTradesCount(),
980-
estimatedStrategyCapacity, AlgorithmCurrencySymbol, Algorithm.Transactions, Algorithm.RiskFreeInterestRateModel,
981-
Algorithm.Settings.TradingDaysPerYear.Value // already set in Brokerage|Backtesting-SetupHandler classes
982-
);
1052+
var trades = Algorithm.TradeBuilder.ClosedTrades;
1053+
if (performanceValues != null && benchmarkValues != null)
1054+
{
1055+
BaseSeries portfolioTurnover;
1056+
if (charts.TryGetValue(PortfolioTurnoverKey, out var portfolioTurnoverChart))
1057+
{
1058+
portfolioTurnoverChart.Series.TryGetValue(PortfolioTurnoverKey, out portfolioTurnover);
1059+
}
1060+
else
1061+
{
1062+
portfolioTurnover = new Series();
1063+
}
1064+
1065+
statisticsResults = StatisticsBuilder.Generate(trades, profitLoss, equity.Values, performanceValues, benchmarkValues,
1066+
portfolioTurnover.Values, StartingPortfolioValue, Algorithm.Portfolio.TotalFees, TotalTradesCount(),
1067+
estimatedStrategyCapacity, AlgorithmCurrencySymbol, Algorithm.Transactions, Algorithm.RiskFreeInterestRateModel,
1068+
Algorithm.Settings.TradingDaysPerYear.Value // already set in Brokerage|Backtesting-SetupHandler classes
1069+
);
1070+
}
1071+
else
1072+
{
1073+
statisticsResults.TotalPerformance.ClosedTrades = trades;
1074+
}
9831075
}
9841076

9851077
statisticsResults.AddCustomSummaryStatistics(_customSummaryStatistics);

Engine/Results/LiveTradingResultHandler.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,16 @@ private void Update()
219219
var statistics = GenerateStatisticsResults(performanceCharts);
220220
var runtimeStatistics = GetAlgorithmRuntimeStatistics(statistics.Summary);
221221

222+
AlgorithmPerformance algorithmPerformance;
223+
{
224+
var stopwatch = Stopwatch.StartNew();
225+
var deltaTrades = GetDeltaTrades(statistics.TotalPerformance.ClosedTrades, LastTradeId, shouldStop: _ => stopwatch.ElapsedMilliseconds > 15);
226+
algorithmPerformance = new AlgorithmPerformance(statistics.TotalPerformance) { ClosedTrades = deltaTrades };
227+
}
228+
222229
// since we're sending multiple packets, let's do it async and forget about it
223230
// chart data can get big so let's break them up into groups
224-
var splitPackets = SplitPackets(deltaCharts, deltaOrders, holdings, Algorithm.Portfolio.CashBook, runtimeStatistics, serverStatistics, deltaOrderEvents);
231+
var splitPackets = SplitPackets(deltaCharts, deltaOrders, holdings, Algorithm.Portfolio.CashBook, runtimeStatistics, serverStatistics, deltaOrderEvents, algorithmPerformance);
225232

226233
foreach (var liveResultPacket in splitPackets)
227234
{
@@ -463,7 +470,8 @@ private IEnumerable<LiveResultPacket> SplitPackets(Dictionary<string, Chart> del
463470
CashBook cashbook,
464471
SortedDictionary<string, string> runtimeStatistics,
465472
Dictionary<string, string> serverStatistics,
466-
List<OrderEvent> deltaOrderEvents)
473+
List<OrderEvent> deltaOrderEvents,
474+
AlgorithmPerformance algorithmPerformance)
467475
{
468476
// break the charts into groups
469477
var current = new Dictionary<string, Chart>();
@@ -518,6 +526,12 @@ private IEnumerable<LiveResultPacket> SplitPackets(Dictionary<string, Chart> del
518526
result = result.Concat(new[] { new LiveResultPacket(_job, new LiveResult { Orders = deltaOrders, OrderEvents = deltaOrderEvents }) });
519527
}
520528

529+
// only send trades packet if there is actually any update
530+
if (algorithmPerformance.ClosedTrades != null && algorithmPerformance.ClosedTrades.Count > 0)
531+
{
532+
result = result.Concat(new[] { new LiveResultPacket(_job, new LiveResult { TotalPerformance = algorithmPerformance }) });
533+
}
534+
521535
return result;
522536
}
523537

0 commit comments

Comments
 (0)