Skip to content

Commit 4b1d3e3

Browse files
authored
Revert "Fix: OpenInterestProcessorManger (request data each day 8 AM)… (#31)
* Revert "Fix: OpenInterestProcessorManger (request data each day 8 AM) (#29)" This reverts commit 51367b9. * Minor unit test building fix
1 parent 51367b9 commit 4b1d3e3

File tree

4 files changed

+89
-186
lines changed

4 files changed

+89
-186
lines changed

QuantConnect.Polygon.Tests/PolygonOpenInterestProcessorManagerTests.cs

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
*/
1515

1616
using System;
17-
using System.Linq;
1817
using NUnit.Framework;
1918
using System.Threading;
2019
using QuantConnect.Data;
@@ -37,15 +36,23 @@ public class PolygonOpenInterestProcessorManagerTests : PolygonDataProviderBaseT
3736

3837
private readonly ManualTimeProvider _timeProviderInstance = new();
3938

40-
private readonly Lock _locker = new();
39+
private readonly EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager = new()
40+
{
41+
SubscribeImpl = (symbols, _) => { return true; },
42+
UnsubscribeImpl = (symbols, _) => { return true; }
43+
};
44+
45+
private object _locker = new();
4146

42-
[Test]
43-
public void GetOpenInterestOfOptionSymbolsByPolygonOpenInterestProcessorManager()
47+
[TestCase("2024-09-16T09:30:59", true, Description = "Market: After Opening")]
48+
[TestCase("2024-09-16T15:28:59", true, Description = "Market: Before Closing")]
49+
[TestCase("2024-09-16T16:28:59", false, Description = "Market: Closed")]
50+
public void GetOpenInterestInDifferentTimeExchangeTime(string mockDateTime, bool isShouldReturnData)
4451
{
45-
var resetEvent = new ManualResetEvent(false);
52+
var waitOneDelay = isShouldReturnData ? TimeSpan.FromSeconds(30) : TimeSpan.FromSeconds(5);
53+
var resetEvent = new AutoResetEvent(false);
4654
var cancellationTokenSource = new CancellationTokenSource();
4755
var optionContractsConfigs = GetConfigs();
48-
var amountOfConfigs = optionContractsConfigs.Count;
4956

5057
var symbolOpenInterest = new ConcurrentDictionary<Symbol, decimal>();
5158
Action<BaseData> callback = (baseData) =>
@@ -59,42 +66,41 @@ public void GetOpenInterestOfOptionSymbolsByPolygonOpenInterestProcessorManager(
5966
{
6067
symbolOpenInterest[baseData.Symbol] = baseData.Value;
6168

62-
if (symbolOpenInterest.Count == amountOfConfigs)
69+
if (symbolOpenInterest.Count > 5)
6370
{
6471
resetEvent.Set();
6572
}
6673
}
6774
};
6875

69-
_timeProviderInstance.SetCurrentTimeUtc(DateTime.UtcNow.ConvertFromUtc(TimeZones.NewYork).Date.AddHours(8).AddSeconds(-5).ConvertToUtc(TimeZones.NewYork));
70-
var processor = new PolygonOpenInterestProcessorManager(_timeProviderInstance, _restApiClient, symbolMapper, dataAggregator, GetTickTime);
71-
72-
processor.AddSymbols([.. optionContractsConfigs.Select(x => x.Symbol)]);
73-
7476
foreach (var config in optionContractsConfigs)
7577
{
78+
_subscriptionManager.Subscribe(config);
7679
ProcessFeed(
7780
Subscribe(dataAggregator, config, (sender, args) => { }),
7881
cancellationTokenSource.Token,
7982
callback: callback
8083
);
8184
}
8285

83-
// Internal delay to respect the 1-minute request interval for OpenInterest data.
84-
resetEvent.WaitOne(TimeSpan.FromSeconds(70), cancellationTokenSource.Token);
85-
86-
Assert.Greater(symbolOpenInterest.Count, 0);
87-
88-
// Advance the current UTC time by 1 day to simulate the next day.
89-
// This ensures that any new OpenInterest requests are validated against the updated time.
90-
_timeProviderInstance.SetCurrentTimeUtc(DateTime.UtcNow.AddDays(1));
91-
92-
resetEvent.Reset();
93-
symbolOpenInterest.Clear();
86+
var mockDateTimeAfterOpenExchange = DateTime.Parse(mockDateTime).ConvertTo(TimeZones.NewYork, TimeZones.Utc);
87+
_timeProviderInstance.SetCurrentTimeUtc(mockDateTimeAfterOpenExchange);
88+
var processor = new PolygonOpenInterestProcessorManager(_timeProviderInstance, _restApiClient, symbolMapper, _subscriptionManager, dataAggregator, GetTickTime);
89+
processor.ScheduleNextRun();
90+
resetEvent.WaitOne(waitOneDelay, cancellationTokenSource.Token);
9491

95-
resetEvent.WaitOne(TimeSpan.FromSeconds(30), cancellationTokenSource.Token);
96-
97-
Assert.Greater(symbolOpenInterest.Count, 0);
92+
if (isShouldReturnData)
93+
{
94+
Assert.Greater(symbolOpenInterest.Count, 0);
95+
foreach (var (symbol, openInterest) in symbolOpenInterest)
96+
{
97+
Assert.Greater(openInterest, 0);
98+
}
99+
}
100+
else
101+
{
102+
Assert.Zero(symbolOpenInterest.Count);
103+
}
98104

99105
cancellationTokenSource.Cancel();
100106
cancellationTokenSource.Dispose();
@@ -106,7 +112,7 @@ protected override List<SubscriptionDataConfig> GetConfigs(Resolution resolution
106112
{
107113
var configs = new List<SubscriptionDataConfig>();
108114

109-
var expiryContractDate = new DateTime(2025, 05, 16);
115+
var expiryContractDate = new DateTime(2024, 09, 20);
110116
var strikesAAPL = new decimal[] { 100m, 105m, 110m, 115m, 120m, 125m, 130m, 135m, 140m, 145m };
111117

112118
foreach (var strike in strikesAAPL)

QuantConnect.Polygon/PolygonDataProvider.cs

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ public partial class PolygonDataProvider : IDataQueueHandler
5858

5959
protected PolygonSubscriptionManager _subscriptionManager;
6060

61-
private PolygonOpenInterestProcessorManager _polygonOpenInterestProcessorManager;
62-
6361
private List<ExchangeMapping> _exchangeMappings;
6462
private readonly PolygonSymbolMapper _symbolMapper = new();
6563
private readonly MarketHoursDatabase _marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
@@ -152,17 +150,16 @@ private void Initialize(string apiKey, int maxSubscriptionsPerWebSocket, bool st
152150
// Initialize the exchange mappings
153151
_exchangeMappings = FetchExchangeMappings();
154152

155-
_polygonOpenInterestProcessorManager = new PolygonOpenInterestProcessorManager(TimeProvider, RestApiClient, _symbolMapper, _dataAggregator, GetTickTime);
156-
157153
// Initialize the subscription manager if this instance is going to be used as a data queue handler
158154
if (streamingEnabled)
159155
{
160156
_subscriptionManager = new PolygonSubscriptionManager(
161157
_supportedSecurityTypes,
162158
maxSubscriptionsPerWebSocket,
163-
(securityType) => new PolygonWebSocketClientWrapper(_apiKey, _symbolMapper, securityType, OnMessage),
164-
_polygonOpenInterestProcessorManager);
159+
(securityType) => new PolygonWebSocketClientWrapper(_apiKey, _symbolMapper, securityType, OnMessage));
165160
}
161+
var openInterestManager = new PolygonOpenInterestProcessorManager(TimeProvider, RestApiClient, _symbolMapper, _subscriptionManager, _dataAggregator, GetTickTime);
162+
openInterestManager.ScheduleNextRun();
166163
}
167164

168165
#region IDataQueueHandler implementation
@@ -328,7 +325,10 @@ private void ProcessTrade(TradeMessage trade)
328325
var time = GetTickTime(symbol, trade.Timestamp);
329326
// TODO: Map trade.Conditions to Lean sale conditions
330327
var tick = new Tick(time, symbol, string.Empty, GetExchangeCode(trade.ExchangeID), trade.Size, trade.Price);
331-
ProcessTickWithOpenInterestTick(tick);
328+
lock (_dataAggregator)
329+
{
330+
_dataAggregator.Update(tick);
331+
}
332332
}
333333

334334
/// <summary>
@@ -342,30 +342,9 @@ private void ProcessQuote(QuoteMessage quote)
342342
// Note: Polygon's quotes have bid/ask exchange IDs, but Lean only has one exchange per tick. We'll use the bid exchange.
343343
var tick = new Tick(time, symbol, string.Empty, GetExchangeCode(quote.BidExchangeID),
344344
quote.BidSize, quote.BidPrice, quote.AskSize, quote.AskPrice);
345-
ProcessTickWithOpenInterestTick(tick);
346-
}
347-
348-
/// <summary>
349-
/// Processes a tick and its corresponding open interest tick, if available.
350-
/// </summary>
351-
/// <param name="mainTick">The primary tick (trade or quote) to process.</param>
352-
/// <remarks>
353-
/// This method retrieves the open interest tick corresponding to the symbol and time of the provided tick.
354-
/// If an open interest tick is found, both the open interest tick and the main tick are passed to the data aggregator
355-
/// in a thread-safe manner.
356-
/// We do this to sync the open interest tick (which is one for the whole day) with the quote/trade tick.
357-
/// </remarks>
358-
private void ProcessTickWithOpenInterestTick(Tick mainTick)
359-
{
360-
var openInterest = _polygonOpenInterestProcessorManager.GetOpenInterestTick(mainTick.Symbol, mainTick.Time);
361345
lock (_dataAggregator)
362346
{
363-
if (openInterest != null)
364-
{
365-
_dataAggregator.Update(openInterest);
366-
}
367-
368-
_dataAggregator.Update(mainTick);
347+
_dataAggregator.Update(tick);
369348
}
370349
}
371350

0 commit comments

Comments
 (0)