Skip to content

Commit 86abc51

Browse files
committed
Refactor and add enumerable overloads for subscription and instrument types.
1 parent 947a476 commit 86abc51

File tree

2 files changed

+89
-65
lines changed

2 files changed

+89
-65
lines changed

src/Org.Openfeed.Client/ClientImpl.cs

Lines changed: 57 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ public async ValueTask<IOpenfeedConnection> GetConnectionAsync(CancellationToken
292292

293293
private readonly Dictionary<long, CancellationTokenSource> _subscriptions = new Dictionary<long, CancellationTokenSource>();
294294

295-
private async void RunSubscribeLoop(Service service, SubscriptionType subscriptionType, InstrumentType? instrumentType, int snapshotIntervalSeconds, List<string>? symbols, List<long>? marketIds, List<string>? exchanges, List<int>? channels, CancellationToken ct) {
295+
private async void RunSubscribeLoop(Service service, IEnumerable<SubscriptionType> subscriptionTypes, IEnumerable<InstrumentType> instrumentTypes, int snapshotIntervalSeconds, List<string>? symbols, List<long>? marketIds, List<string>? exchanges, List<int>? channels, CancellationToken ct) {
296296
var combined = CancellationTokenSource.CreateLinkedTokenSource(ct, _disposedSource.Token).Token;
297297

298298
if (combined.IsCancellationRequested) return;
@@ -315,9 +315,8 @@ private async void RunSubscribeLoop(Service service, SubscriptionType subscripti
315315

316316
long? subscriptionId = null;
317317
try {
318-
subscriptionId = instrumentType.HasValue ?
319-
connection.Subscribe(service, subscriptionType, instrumentType.Value, snapshotIntervalSeconds, symbols, marketIds, exchanges, channels) :
320-
connection.Subscribe(service, subscriptionType, snapshotIntervalSeconds, symbols, marketIds, exchanges, channels);
318+
subscriptionId = connection.Subscribe(service, subscriptionTypes, instrumentTypes, snapshotIntervalSeconds, symbols, marketIds, exchanges, channels);
319+
321320
await connection.WhenDisconnectedAsync(ct);
322321
}
323322
catch (OperationCanceledException) when (ct.IsCancellationRequested) {
@@ -339,7 +338,7 @@ public long Subscribe(Service service, SubscriptionType subscriptionType, int sn
339338
_subscriptions.Add(id, cts);
340339
}
341340

342-
RunSubscribeLoop(service, subscriptionType, null, snapshotIntervalSeconds, symbols?.ToList(), marketIds?.ToList(), exchanges?.ToList(), channels?.ToList(), cts.Token);
341+
RunSubscribeLoop(service, Enumerable.Repeat(subscriptionType, 1), Enumerable.Empty<InstrumentType>(), snapshotIntervalSeconds, symbols?.ToList(), marketIds?.ToList(), exchanges?.ToList(), channels?.ToList(), cts.Token);
343342

344343
return id;
345344
}
@@ -352,7 +351,21 @@ public long Subscribe(Service service, SubscriptionType subscriptionType, Instru
352351
_subscriptions.Add(id, cts);
353352
}
354353

355-
RunSubscribeLoop(service, subscriptionType, instrumentType, snapshotIntervalSeconds, symbols?.ToList(), marketIds?.ToList(), exchanges?.ToList(), channels?.ToList(), cts.Token);
354+
RunSubscribeLoop(service, Enumerable.Repeat(subscriptionType, 1), Enumerable.Repeat(instrumentType, 1), snapshotIntervalSeconds, symbols?.ToList(), marketIds?.ToList(), exchanges?.ToList(), channels?.ToList(), cts.Token);
355+
356+
return id;
357+
}
358+
359+
public long Subscribe(Service service, IEnumerable<SubscriptionType> subscriptionTypes, IEnumerable<InstrumentType> instrumentTypes, int snapshotIntervalSeconds, IEnumerable<string>? symbols, IEnumerable<long>? marketIds, IEnumerable<string>? exchanges, IEnumerable<int>? channels) {
360+
long id = CorrelationId.Create();
361+
var cts = new CancellationTokenSource();
362+
363+
lock (_subscriptions)
364+
{
365+
_subscriptions.Add(id, cts);
366+
}
367+
368+
RunSubscribeLoop(service, subscriptionTypes, instrumentTypes, snapshotIntervalSeconds, symbols?.ToList(), marketIds?.ToList(), exchanges?.ToList(), channels?.ToList(), cts.Token);
356369

357370
return id;
358371
}
@@ -651,42 +664,56 @@ public Task<InstrumentReferenceResponse> GetInstrumentReferenceAsync(InstrumentR
651664
return tcs.Task;
652665
}
653666

654-
public long Subscribe(Service service, SubscriptionType subscriptionType, int snapshotIntervalSeconds, IEnumerable<string>? symbols, IEnumerable<long>? marketIds, IEnumerable<string>? exchanges, IEnumerable<int>? channels) {
667+
private long SubscribeImpl(Service service, IEnumerable<SubscriptionType> subscriptionTypes, IEnumerable<InstrumentType> instrumentTypes, int snapshotIntervalSeconds, IEnumerable<string>? symbols, IEnumerable<long>? marketIds, IEnumerable<string>? exchanges, IEnumerable<int>? channels)
668+
{
655669
_disposedToken.ThrowIfCancellationRequested();
656670

657671
long correlationId = CorrelationId.Create();
658672

659673
var subReq = new SubscriptionRequest { Service = service, CorrelationId = correlationId, Token = _token };
660-
if (symbols != null) {
661-
foreach (var symbol in symbols) {
674+
if (symbols != null)
675+
{
676+
foreach (var symbol in symbols)
677+
{
662678
var req = new SubscriptionRequest.Types.Request { Symbol = symbol, SnapshotIntervalSeconds = snapshotIntervalSeconds };
663-
req.SubscriptionType.Add(subscriptionType);
679+
req.SubscriptionType.AddRange(subscriptionTypes);
680+
req.InstrumentType.AddRange(instrumentTypes);
664681
subReq.Requests.Add(req);
665682
}
666683
}
667-
if (marketIds != null) {
668-
foreach (var marketId in marketIds) {
684+
if (marketIds != null)
685+
{
686+
foreach (var marketId in marketIds)
687+
{
669688
var req = new SubscriptionRequest.Types.Request { MarketId = marketId, SnapshotIntervalSeconds = snapshotIntervalSeconds };
670-
req.SubscriptionType.Add(subscriptionType);
689+
req.SubscriptionType.AddRange(subscriptionTypes);
690+
req.InstrumentType.AddRange(instrumentTypes);
671691
subReq.Requests.Add(req);
672692
}
673693
}
674-
if (exchanges != null) {
675-
foreach (var exchange in exchanges) {
694+
if (exchanges != null)
695+
{
696+
foreach (var exchange in exchanges)
697+
{
676698
var req = new SubscriptionRequest.Types.Request { Exchange = exchange, SnapshotIntervalSeconds = snapshotIntervalSeconds };
677-
req.SubscriptionType.Add(subscriptionType);
699+
req.SubscriptionType.AddRange(subscriptionTypes);
700+
req.InstrumentType.AddRange(instrumentTypes);
678701
subReq.Requests.Add(req);
679702
}
680703
}
681-
if (channels != null) {
682-
foreach (var channel in channels) {
704+
if (channels != null)
705+
{
706+
foreach (var channel in channels)
707+
{
683708
var req = new SubscriptionRequest.Types.Request { ChannelId = channel, SnapshotIntervalSeconds = snapshotIntervalSeconds };
684-
req.SubscriptionType.Add(subscriptionType);
709+
req.SubscriptionType.AddRange(subscriptionTypes);
710+
req.InstrumentType.AddRange(instrumentTypes);
685711
subReq.Requests.Add(req);
686712
}
687713
}
688714

689-
lock (_lock) {
715+
lock (_lock)
716+
{
690717
if (_disconnected) throw new OpenfeedDisconnectedException();
691718
_subscriptions.Add(correlationId, subReq);
692719
QueueRequest(new OpenfeedGatewayRequest { SubscriptionRequest = subReq });
@@ -695,55 +722,20 @@ public long Subscribe(Service service, SubscriptionType subscriptionType, int sn
695722
return correlationId;
696723
}
697724

698-
public long Subscribe(Service service, SubscriptionType subscriptionType, InstrumentType instrumentType, int snapshotIntervalSeconds, IEnumerable<string>? symbols, IEnumerable<long>? marketIds, IEnumerable<string>? exchanges, IEnumerable<int>? channels) {
699-
_disposedToken.ThrowIfCancellationRequested();
700-
701-
long correlationId = CorrelationId.Create();
702-
703-
var subReq = new SubscriptionRequest { Service = service, CorrelationId = correlationId, Token = _token };
704-
if (symbols != null) {
705-
foreach (var symbol in symbols) {
706-
var req = new SubscriptionRequest.Types.Request { Symbol = symbol, SnapshotIntervalSeconds = snapshotIntervalSeconds };
707-
req.SubscriptionType.Add(subscriptionType);
708-
req.InstrumentType.Add(instrumentType);
709-
subReq.Requests.Add(req);
710-
}
711-
}
712-
if (marketIds != null) {
713-
foreach (var marketId in marketIds) {
714-
var req = new SubscriptionRequest.Types.Request { MarketId = marketId, SnapshotIntervalSeconds = snapshotIntervalSeconds };
715-
req.SubscriptionType.Add(subscriptionType);
716-
req.InstrumentType.Add(instrumentType);
717-
subReq.Requests.Add(req);
718-
}
719-
}
720-
if (exchanges != null) {
721-
foreach (var exchange in exchanges) {
722-
var req = new SubscriptionRequest.Types.Request { Exchange = exchange, SnapshotIntervalSeconds = snapshotIntervalSeconds };
723-
req.SubscriptionType.Add(subscriptionType);
724-
req.InstrumentType.Add(instrumentType);
725-
subReq.Requests.Add(req);
726-
}
727-
}
728-
if (channels != null) {
729-
foreach (var channel in channels) {
730-
var req = new SubscriptionRequest.Types.Request { ChannelId = channel, SnapshotIntervalSeconds = snapshotIntervalSeconds };
731-
req.SubscriptionType.Add(subscriptionType);
732-
req.InstrumentType.Add(instrumentType);
733-
subReq.Requests.Add(req);
734-
}
735-
}
725+
public long Subscribe(Service service, SubscriptionType subscriptionType, int snapshotIntervalSeconds, IEnumerable<string>? symbols, IEnumerable<long>? marketIds, IEnumerable<string>? exchanges, IEnumerable<int>? channels) {
726+
return SubscribeImpl(service, Enumerable.Repeat(subscriptionType, 1), Enumerable.Empty<InstrumentType>(), snapshotIntervalSeconds, symbols, marketIds, exchanges, channels);
727+
}
736728

737-
lock (_lock) {
738-
if (_disconnected) throw new OpenfeedDisconnectedException();
739-
_subscriptions.Add(correlationId, subReq);
740-
QueueRequest(new OpenfeedGatewayRequest { SubscriptionRequest = subReq });
741-
}
729+
public long Subscribe(Service service, SubscriptionType subscriptionType, InstrumentType instrumentType, int snapshotIntervalSeconds, IEnumerable<string>? symbols, IEnumerable<long>? marketIds, IEnumerable<string>? exchanges, IEnumerable<int>? channels) {
730+
return SubscribeImpl(service, Enumerable.Repeat(subscriptionType, 1), Enumerable.Repeat(instrumentType, 1), snapshotIntervalSeconds, symbols, marketIds, exchanges, channels);
731+
}
742732

743-
return correlationId;
733+
public long Subscribe(Service service, IEnumerable<SubscriptionType> subscriptionTypes, IEnumerable<InstrumentType> instrumentTypes, int snapshotIntervalSeconds, IEnumerable<string>? symbols = null, IEnumerable<long>? marketIds = null, IEnumerable<string>? exchanges = null, IEnumerable<int>? channels = null) {
734+
return SubscribeImpl(service, subscriptionTypes, instrumentTypes, snapshotIntervalSeconds, symbols, marketIds, exchanges, channels);
744735
}
745736

746-
public void Unsubscribe(long subscriptionId) {
737+
738+
public void Unsubscribe(long subscriptionId) {
747739
lock (_lock) {
748740
if (!_subscriptions.TryGetValue(subscriptionId, out var subscription)) throw new ArgumentException($"Subscription ID {subscriptionId} does not exist.");
749741

src/Org.Openfeed.Client/Interfaces.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,22 @@ public interface IOpenfeedClient : IDisposable {
229229
/// <returns>The ID of the subscription which can be used in a call to <see cref="IOpenfeedClient.Unsubscribe(long)"/> to terminate the subscription.</returns>
230230
long Subscribe(Service service, SubscriptionType subscriptionType, InstrumentType instrumentType, int snapshotIntervalSeconds, IEnumerable<string>? symbols = null, IEnumerable<long>? marketIds = null, IEnumerable<string>? exchanges = null, IEnumerable<int>? channels = null);
231231

232+
/// <summary>
233+
/// Sends a <see cref="SubscriptionRequest"/> to the server to which we are currently connected. If the client not currently
234+
/// connected then waits for the connection to be established and then sends the <see cref="SubscriptionRequest"/>.
235+
/// If the client gets disconnected then it waits for the reconnect and sends the <see cref="SubscriptionRequest"/>.
236+
/// </summary>
237+
/// <param name="service">The <see cref="Service"/> to which to subscribe.</param>
238+
/// <param name="subscriptionTypes">A collection of <see cref="SubscriptionType"/>.</param>
239+
/// <param name="instrumentTypes">A collection of <see cref="InstrumentType"/>.</param>
240+
/// <param name="snapshotIntervalSeconds">Setting of the cadence at which the snapshots will be sent. If zero the the snapshot is only
241+
/// sent once.</param>
242+
/// <param name="symbols">A collection of symbols to which to subscribe, or null if no symbol subscription is to be made.</param>
243+
/// <param name="marketIds">A collection of market ID's to which to subscribe, or null if no subscription by market ID's is to be made.</param>
244+
/// <param name="exchanges">A collection of exchanges to which to subscribe, or null if no subscription by exchange is to be made.</param>
245+
/// <param name="channels">A collection of channels to which to subscribe, or null if no subscription by channel is to be made.</param>
246+
/// <returns>The ID of the subscription which can be used in a call to <see cref="IOpenfeedClient.Unsubscribe(long)"/> to terminate the subscription.</returns>
247+
long Subscribe(Service service, IEnumerable<SubscriptionType> subscriptionTypes, IEnumerable<InstrumentType> instrumentTypes, int snapshotIntervalSeconds, IEnumerable<string>? symbols = null, IEnumerable<long>? marketIds = null, IEnumerable<string>? exchanges = null, IEnumerable<int>? channels = null);
232248

233249
/// <summary>
234250
/// Unsubscribes from the feed for a given <paramref name="subscriptionId"/>.
@@ -299,6 +315,22 @@ public interface IOpenfeedConnection {
299315
/// <returns>The ID of the subscription which can be used in a call to <see cref="IOpenfeedConnection.Unsubscribe(long)"/> to terminate the subscription.</returns>
300316
long Subscribe(Service service, SubscriptionType subscriptionType, InstrumentType instrumentType, int snapshotIntervalSeconds, IEnumerable<string>? symbols = null, IEnumerable<long>? marketIds = null, IEnumerable<string>? exchanges = null, IEnumerable<int>? channels = null);
301317

318+
/// <summary>
319+
/// Sends a <see cref="SubscriptionRequest"/> to the server to which we are currently connected. If the connection is no longer
320+
/// connected throws a <see cref="OpenfeedDisconnectedException"/>.
321+
/// </summary>
322+
/// <param name="service">The <see cref="Service"/> to which to subscribe.</param>
323+
/// <param name="subscriptionTypes">A collection of <see cref="SubscriptionType"/>.</param>
324+
/// <param name="instrumentTypes">A collection of <see cref="InstrumentType"/>.</param>
325+
/// <param name="snapshotIntervalSeconds">Setting of the cadence at which the snapshots will be sent. If zero the the snapshot is only
326+
/// sent once.</param>
327+
/// <param name="symbols">A collection of symbols to which to subscribe, or null if no symbol subscription is to be made.</param>
328+
/// <param name="marketIds">A collection of market ID's to which to subscribe, or null if no subscription by market ID's is to be made.</param>
329+
/// <param name="exchanges">A collection of exchanges to which to subscribe, or null if no subscription by exchange is to be made.</param>
330+
/// <param name="channels">A collection of channels to which to subscribe, or null if no subscription by channel is to be made.</param>
331+
/// <returns>The ID of the subscription which can be used in a call to <see cref="IOpenfeedConnection.Unsubscribe(long)"/> to terminate the subscription.</returns>
332+
long Subscribe(Service service, IEnumerable<SubscriptionType> subscriptionTypes, IEnumerable<InstrumentType> instrumentTypes, int snapshotIntervalSeconds, IEnumerable<string>? symbols = null, IEnumerable<long>? marketIds = null, IEnumerable<string>? exchanges = null, IEnumerable<int>? channels = null);
333+
302334
/// <summary>
303335
/// Sends the <see cref="SubscriptionRequest"/> to the server with <see cref="SubscriptionRequest.Unsubscribe"/> set to true.
304336
/// </summary>

0 commit comments

Comments
 (0)