Skip to content

Commit a4354bd

Browse files
committed
Connect throttling...
1 parent 28cf78b commit a4354bd

File tree

1 file changed

+49
-47
lines changed

1 file changed

+49
-47
lines changed

src/Sportradar.Mbs.Sdk/Internal/Connection/WebSocketConnection.cs

Lines changed: 49 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,12 @@ internal class WebSocketConnection : IDisposable
1414
private const long InitVersion = 0;
1515

1616
private readonly IWebSocketConnectionConfig _config;
17-
1817
private readonly Channel<WsInputMessage> _inputBuffer;
1918
private readonly Channel<WsOutputMessage> _outputBuffer;
2019
private readonly TokenProvider _tokenProvider;
20+
private readonly SemaphoreSlim _semaphore;
2121

2222
private long _connectedVersion = InitVersion;
23-
private long _reservedVersion = InitVersion;
2423
private int _connectFailCount = 0;
2524
private long _connectAttemptTs = TimeUtils.NowInUtcMillis();
2625

@@ -32,19 +31,20 @@ internal WebSocketConnection(
3231
_inputBuffer = inputBuffer;
3332
_outputBuffer = outputBuffer;
3433
_config = config;
34+
_semaphore = new SemaphoreSlim(1);
3535
}
3636

3737
public void Dispose()
3838
{
39-
var version = Interlocked.Increment(ref _reservedVersion);
40-
Volatile.Write(ref _connectedVersion, version);
39+
Interlocked.Increment(ref _connectedVersion);
40+
ExcSuppress.Dispose(_semaphore);
4141
}
4242

4343
internal async Task ConnectAsync(CancellationToken cancellationToken)
4444
{
4545
try
4646
{
47-
await VersionedConnectAsync(InitVersion, cancellationToken).ConfigureAwait(false);
47+
await VersionedConnectAsync(InitVersion).ConfigureAwait(false);
4848
}
4949
catch (SdkException)
5050
{
@@ -56,41 +56,56 @@ internal async Task ConnectAsync(CancellationToken cancellationToken)
5656
}
5757
}
5858

59-
private async Task VersionedConnectAsync(long version, CancellationToken cancellationToken)
59+
private async Task VersionedConnectAsync(long version)
6060
{
61-
var nextVersion = version + 1;
62-
if (Interlocked.CompareExchange(ref _reservedVersion, nextVersion, version) != version) return;
63-
6461
try
6562
{
66-
int delay = CalculateDelay();
67-
if (delay > 0)
68-
{
69-
await Task.Delay(delay);
70-
}
71-
Volatile.Write(ref _connectAttemptTs, TimeUtils.NowInUtcMillis());
72-
73-
ClientWebSocket? webSocket = null;
74-
try
63+
if (await _semaphore.WaitAsync(_config.WsReconnectTimeout).ConfigureAwait(false))
7564
{
76-
webSocket = await CreateSocketAsync(cancellationToken).ConfigureAwait(false);
77-
await webSocket.ConnectAsync(_config.WsServer, cancellationToken).ConfigureAwait(false);
78-
Volatile.Write(ref _connectFailCount, 0);
79-
}
80-
catch
81-
{
82-
Interlocked.Increment(ref this._connectFailCount);
83-
ExcSuppress.Dispose(webSocket);
84-
throw;
65+
var nextVersion = version + 1;
66+
ClientWebSocket? webSocket = null;
67+
try
68+
{
69+
if (_connectedVersion != version) return;
70+
71+
if (_connectFailCount > 0)
72+
{
73+
long maxSleep = 125L * (long)Math.Pow(2, _connectFailCount);
74+
long diffTs = TimeUtils.NowInUtcMillis() - _connectAttemptTs;
75+
int delay = (int)(maxSleep - diffTs);
76+
if (delay > 0)
77+
{
78+
await Task.Delay(delay);
79+
}
80+
}
81+
_connectAttemptTs = TimeUtils.NowInUtcMillis();
82+
83+
try
84+
{
85+
using var source = new CancellationTokenSource(_config.WsReconnectTimeout);
86+
var cancellationToken = source.Token;
87+
webSocket = await CreateSocketAsync(cancellationToken).ConfigureAwait(false);
88+
await webSocket.ConnectAsync(_config.WsServer, cancellationToken).ConfigureAwait(false);
89+
_connectFailCount = 0;
90+
}
91+
catch
92+
{
93+
_connectFailCount = Math.Min(8, _connectFailCount + 1);
94+
ExcSuppress.Dispose(webSocket);
95+
throw;
96+
}
97+
Volatile.Write(ref _connectedVersion, nextVersion);
98+
}
99+
finally
100+
{
101+
_semaphore.Release();
102+
}
103+
StartProcessing(webSocket, nextVersion);
85104
}
86-
87-
Volatile.Write(ref _connectedVersion, nextVersion);
88-
StartProcessing(webSocket, nextVersion);
89105
}
90-
catch
106+
catch (ObjectDisposedException)
91107
{
92-
Interlocked.CompareExchange(ref _reservedVersion, version, nextVersion);
93-
throw;
108+
94109
}
95110
}
96111

@@ -103,8 +118,7 @@ private async void ReconnectAsync(long version)
103118
{
104119
try
105120
{
106-
using var source = new CancellationTokenSource(_config.WsReconnectTimeout);
107-
await VersionedConnectAsync(version, source.Token).ConfigureAwait(false);
121+
await VersionedConnectAsync(version).ConfigureAwait(false);
108122
}
109123
catch (Exception e)
110124
{
@@ -113,18 +127,6 @@ private async void ReconnectAsync(long version)
113127
}
114128
}
115129

116-
private int CalculateDelay()
117-
{
118-
int count = Math.Min(8, Volatile.Read(ref this._connectFailCount));
119-
if (count == 0)
120-
{
121-
return 0;
122-
}
123-
long maxSleep = 125L * ((long)Math.Pow(2, count));
124-
long diffTs = TimeUtils.NowInUtcMillis() - Volatile.Read(ref this._connectAttemptTs);
125-
return (int)(maxSleep - diffTs);
126-
}
127-
128130
private async void StartProcessing(ClientWebSocket webSocket, long version)
129131
{
130132
try

0 commit comments

Comments
 (0)